diff options
| -rw-r--r-- | README.md | 50 | ||||
| -rw-r--r-- | bus.go | 32 | ||||
| -rw-r--r-- | bus_test.go | 13 |
3 files changed, 40 insertions, 55 deletions
@@ -59,16 +59,13 @@ func main() { ) // Подписываемся на конкретное событие - ch1 := bus.Subscribe("user.login") - defer bus.Unsubscribe(ch1) + ch1 := bus.Subscribe(context.Background(), "user.login") // Подписываемся на группу событий с wildcard - ch2 := bus.Subscribe("user.*") - defer bus.Unsubscribe(ch2) + ch2 := bus.Subscribe(context.Background(), "user.*") // Подписываемся на группу событий с кастомным wildcard - ch3 := customBus.Subscribe("/user/#") - defer customBus.Unsubscribe(ch3) + ch3 := customBus.Subscribe(context.Background(), "/user/#") // Запускаем горутины для обработки событий go func() { @@ -90,9 +87,9 @@ func main() { }() // Отправляем события - bus.Fire(MyEvent{name: "user.login", data: "user123"}) - bus.Fire(MyEvent{name: "user.logout", data: "user123"}) - customBus.Fire(MyEvent{name: "/user/login", data: "user123"}) + bus.Publish(MyEvent{name: "user.login", data: "user123"}) + bus.Publish(MyEvent{name: "user.logout", data: "user123"}) + customBus.Publish(MyEvent{name: "/user/login", data: "user123"}) // Ждем немного для обработки событий time.Sleep(100 * time.Millisecond) @@ -111,20 +108,16 @@ func main() { - `Capacity` - задает начальную емкость для узлов дерева подписчиков (по умолчанию 32) -### `func (b *bus) Subscribe(path string) Listener` +### `func (b *bus) Subscribe(ctx context.Context, path string) Listener` Подписывается на событие по указанному пути. Возвращает канал-подписчик для получения событий. -### `func (b *bus) Unsubscribe(l Listener)` - -Отписывается от события и закрывает канал подписчика. - ### `func (b *bus) Close()` Закрывает шину событий и все каналы подписчиков. -### `func (b *bus) Fire(ev Event)` +### `func (b *bus) Publish(ev Event)` Отправляет событие всем подписчикам, которые подписаны на соответствующий путь события. @@ -204,16 +197,13 @@ func main() { ) // Subscribe to specific event - ch1 := bus.Subscribe("user.login") - defer bus.Unsubscribe(ch1) + ch1 := bus.Subscribe(context.Background(), "user.login") // Subscribe to event group with wildcard - ch2 := bus.Subscribe("user.*") - defer bus.Unsubscribe(ch2) + ch2 := bus.Subscribe(context.Background(), "user.*") // Subscribe to event group with custom wildcard - ch3 := customBus.Subscribe("/user/#") - defer customBus.Unsubscribe(ch3) + ch3 := customBus.Subscribe(context.Background(), "/user/#") // Start goroutines for event handling go func() { @@ -234,10 +224,10 @@ func main() { } }() - // Fire events - bus.Fire(MyEvent{name: "user.login", data: "user123"}) - bus.Fire(MyEvent{name: "user.logout", data: "user123"}) - customBus.Fire(MyEvent{name: "/user/login", data: "user123"}) + // Publish events + bus.Publish(MyEvent{name: "user.login", data: "user123"}) + bus.Publish(MyEvent{name: "user.logout", data: "user123"}) + customBus.Publish(MyEvent{name: "/user/login", data: "user123"}) // Wait a bit for event processing time.Sleep(100 * time.Millisecond) @@ -254,22 +244,18 @@ Creates a new event bus with optional settings. Supports the following options: - `Wildcard` - sets the wildcard character for subscriptions (default "\*") - `Capacity` - sets the initial capacity for listeners trie (default 32) -### `func (b *bus) Subscribe(path string) Listener` +### `func (b *bus) Subscribe(ctx context.Context, path string) Listener` Subscribes to an event at the specified path. Returns a listener channel for receiving events. -### `func (b *bus) Unsubscribe(l Listener)` - -Unsubscribes from an event and closes the listener channel. - ### `func (b *bus) Close()` Closes the event bus and all listener channels. -### `func (b *bus) Fire(ev Event)` +### `func (b *bus) Publish(ev Event)` -Fires an event to all subscribers who are subscribed to the corresponding event +Publishs an event to all subscribers who are subscribed to the corresponding event path. ## License @@ -2,6 +2,7 @@ package eventbus import ( + "context" "strings" "sync" ) @@ -46,29 +47,21 @@ type Event interface { // Subscribe to an event at the specified path. Returns a listener channel for // receiving events. -func (b *Bus) Subscribe(path string) Listener { +func (b *Bus) Subscribe(ctx context.Context, path string) Listener { splitedPath := strings.Split(path, b.nameSeparator) ch := make(Listener, 1) b.listeners.Put(splitedPath, ch) + go func() { + <-ctx.Done() + close(ch) + b.listeners.Remove(ch) + }() return ch } -// Unsubscribe from an event and closes the listener channel. -func (b *Bus) Unsubscribe(l Listener) { - close(l) - b.listeners.Remove(l) -} - -// Close the event bus and all listener channels. -func (b *Bus) Close() { - b.listeners.Clear(func(value Listener) { - close(value) - }) -} - -// Fire an event to all subscribers who are subscribed to the corresponding +// Publish an event to all subscribers who are subscribed to the corresponding // event path. -func (b *Bus) Fire(ev Event) { +func (b *Bus) Publish(ev Event) { splitedEventName := strings.Split(ev.Event(), b.nameSeparator) listeners := b.listeners.Get(splitedEventName, b.wildcard) @@ -84,6 +77,13 @@ func (b *Bus) Fire(ev Event) { wg.Wait() } +// Close the event bus and all listener channels. +func (b *Bus) Close() { + b.listeners.Clear(func(value Listener) { + close(value) + }) +} + type noCopy struct{} func (*noCopy) Lock() {} diff --git a/bus_test.go b/bus_test.go index d7d5a08..bfead50 100644 --- a/bus_test.go +++ b/bus_test.go @@ -15,12 +15,11 @@ func (e testEvent) Event() string { return e.name } -func TestBusFire(t *testing.T) { +func TestBusPublish(t *testing.T) { bus := eventbus.New() // Подписываемся на событие - ch := bus.Subscribe("test.event") - defer bus.Unsubscribe(ch) + ch := bus.Subscribe(t.Context(), "test.event") // Создаем канал для проверки получения события received := make(chan bool, 1) @@ -38,7 +37,7 @@ func TestBusFire(t *testing.T) { }() // Отправляем событие - bus.Fire(testEvent{name: "test.event"}) + bus.Publish(testEvent{name: "test.event"}) // Проверяем получение события select { @@ -51,12 +50,12 @@ func TestBusFire(t *testing.T) { } } -func TestBusFireWithWildcard(t *testing.T) { +func TestBusPublishWithWildcard(t *testing.T) { bus := eventbus.New(eventbus.NameSeparator("/"), eventbus.Wildcard("#"), eventbus.Capacity(32)) defer bus.Close() // Подписываемся на wildcard событие - ch := bus.Subscribe("/test/#") + ch := bus.Subscribe(t.Context(), "/test/#") // Создаем канал для проверки получения события received := make(chan bool, 1) @@ -74,7 +73,7 @@ func TestBusFireWithWildcard(t *testing.T) { }() // Отправляем событие - bus.Fire(testEvent{name: "/test/event"}) + bus.Publish(testEvent{name: "/test/event"}) // Проверяем получение события select { |
