diff options
Diffstat (limited to '')
| -rw-r--r-- | bus.go | 32 |
1 files changed, 16 insertions, 16 deletions
@@ -2,6 +2,7 @@ package eventbus import ( + "context" "strings" "sync" ) @@ -46,29 +47,21 @@ type Event interface { // Subscribe to an event at the specified path. Returns a listener channel for // receiving events. -func (b *Bus) Subscribe(path string) Listener { +func (b *Bus) Subscribe(ctx context.Context, path string) Listener { splitedPath := strings.Split(path, b.nameSeparator) ch := make(Listener, 1) b.listeners.Put(splitedPath, ch) + go func() { + <-ctx.Done() + close(ch) + b.listeners.Remove(ch) + }() return ch } -// Unsubscribe from an event and closes the listener channel. -func (b *Bus) Unsubscribe(l Listener) { - close(l) - b.listeners.Remove(l) -} - -// Close the event bus and all listener channels. -func (b *Bus) Close() { - b.listeners.Clear(func(value Listener) { - close(value) - }) -} - -// Fire an event to all subscribers who are subscribed to the corresponding +// Publish an event to all subscribers who are subscribed to the corresponding // event path. -func (b *Bus) Fire(ev Event) { +func (b *Bus) Publish(ev Event) { splitedEventName := strings.Split(ev.Event(), b.nameSeparator) listeners := b.listeners.Get(splitedEventName, b.wildcard) @@ -84,6 +77,13 @@ func (b *Bus) Fire(ev Event) { wg.Wait() } +// Close the event bus and all listener channels. +func (b *Bus) Close() { + b.listeners.Clear(func(value Listener) { + close(value) + }) +} + type noCopy struct{} func (*noCopy) Lock() {} |
