aboutsummaryrefslogtreecommitdiff
path: root/bus.go
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--bus.go32
1 files changed, 16 insertions, 16 deletions
diff --git a/bus.go b/bus.go
index 9ab1980..86e9cf7 100644
--- a/bus.go
+++ b/bus.go
@@ -2,6 +2,7 @@
package eventbus
import (
+ "context"
"strings"
"sync"
)
@@ -46,29 +47,21 @@ type Event interface {
// Subscribe to an event at the specified path. Returns a listener channel for
// receiving events.
-func (b *Bus) Subscribe(path string) Listener {
+func (b *Bus) Subscribe(ctx context.Context, path string) Listener {
splitedPath := strings.Split(path, b.nameSeparator)
ch := make(Listener, 1)
b.listeners.Put(splitedPath, ch)
+ go func() {
+ <-ctx.Done()
+ close(ch)
+ b.listeners.Remove(ch)
+ }()
return ch
}
-// Unsubscribe from an event and closes the listener channel.
-func (b *Bus) Unsubscribe(l Listener) {
- close(l)
- b.listeners.Remove(l)
-}
-
-// Close the event bus and all listener channels.
-func (b *Bus) Close() {
- b.listeners.Clear(func(value Listener) {
- close(value)
- })
-}
-
-// Fire an event to all subscribers who are subscribed to the corresponding
+// Publish an event to all subscribers who are subscribed to the corresponding
// event path.
-func (b *Bus) Fire(ev Event) {
+func (b *Bus) Publish(ev Event) {
splitedEventName := strings.Split(ev.Event(), b.nameSeparator)
listeners := b.listeners.Get(splitedEventName, b.wildcard)
@@ -84,6 +77,13 @@ func (b *Bus) Fire(ev Event) {
wg.Wait()
}
+// Close the event bus and all listener channels.
+func (b *Bus) Close() {
+ b.listeners.Clear(func(value Listener) {
+ close(value)
+ })
+}
+
type noCopy struct{}
func (*noCopy) Lock() {}