aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author2026-03-22 16:13:20 +0300
committer2026-03-22 16:13:20 +0300
commit485f49c2323a95297c8b5ae5f44320825d66ae82 (patch)
tree3db739918a0a19387f8e85fb30cc7d2fcf31c669
parentv1.0.1 (diff)
downloadeventbus-485f49c2323a95297c8b5ae5f44320825d66ae82.tar.gz
eventbus-485f49c2323a95297c8b5ae5f44320825d66ae82.tar.bz2
eventbus-485f49c2323a95297c8b5ae5f44320825d66ae82.tar.xz
eventbus-485f49c2323a95297c8b5ae5f44320825d66ae82.zip
new versionHEADv1.1.0master
-rw-r--r--README.md50
-rw-r--r--bus.go32
-rw-r--r--bus_test.go13
3 files changed, 40 insertions, 55 deletions
diff --git a/README.md b/README.md
index 6f777fe..fa45aaf 100644
--- a/README.md
+++ b/README.md
@@ -59,16 +59,13 @@ func main() {
)
// Подписываемся на конкретное событие
- ch1 := bus.Subscribe("user.login")
- defer bus.Unsubscribe(ch1)
+ ch1 := bus.Subscribe(context.Background(), "user.login")
// Подписываемся на группу событий с wildcard
- ch2 := bus.Subscribe("user.*")
- defer bus.Unsubscribe(ch2)
+ ch2 := bus.Subscribe(context.Background(), "user.*")
// Подписываемся на группу событий с кастомным wildcard
- ch3 := customBus.Subscribe("/user/#")
- defer customBus.Unsubscribe(ch3)
+ ch3 := customBus.Subscribe(context.Background(), "/user/#")
// Запускаем горутины для обработки событий
go func() {
@@ -90,9 +87,9 @@ func main() {
}()
// Отправляем события
- bus.Fire(MyEvent{name: "user.login", data: "user123"})
- bus.Fire(MyEvent{name: "user.logout", data: "user123"})
- customBus.Fire(MyEvent{name: "/user/login", data: "user123"})
+ bus.Publish(MyEvent{name: "user.login", data: "user123"})
+ bus.Publish(MyEvent{name: "user.logout", data: "user123"})
+ customBus.Publish(MyEvent{name: "/user/login", data: "user123"})
// Ждем немного для обработки событий
time.Sleep(100 * time.Millisecond)
@@ -111,20 +108,16 @@ func main() {
- `Capacity` - задает начальную емкость для узлов дерева подписчиков (по
умолчанию 32)
-### `func (b *bus) Subscribe(path string) Listener`
+### `func (b *bus) Subscribe(ctx context.Context, path string) Listener`
Подписывается на событие по указанному пути. Возвращает канал-подписчик для
получения событий.
-### `func (b *bus) Unsubscribe(l Listener)`
-
-Отписывается от события и закрывает канал подписчика.
-
### `func (b *bus) Close()`
Закрывает шину событий и все каналы подписчиков.
-### `func (b *bus) Fire(ev Event)`
+### `func (b *bus) Publish(ev Event)`
Отправляет событие всем подписчикам, которые подписаны на соответствующий путь
события.
@@ -204,16 +197,13 @@ func main() {
)
// Subscribe to specific event
- ch1 := bus.Subscribe("user.login")
- defer bus.Unsubscribe(ch1)
+ ch1 := bus.Subscribe(context.Background(), "user.login")
// Subscribe to event group with wildcard
- ch2 := bus.Subscribe("user.*")
- defer bus.Unsubscribe(ch2)
+ ch2 := bus.Subscribe(context.Background(), "user.*")
// Subscribe to event group with custom wildcard
- ch3 := customBus.Subscribe("/user/#")
- defer customBus.Unsubscribe(ch3)
+ ch3 := customBus.Subscribe(context.Background(), "/user/#")
// Start goroutines for event handling
go func() {
@@ -234,10 +224,10 @@ func main() {
}
}()
- // Fire events
- bus.Fire(MyEvent{name: "user.login", data: "user123"})
- bus.Fire(MyEvent{name: "user.logout", data: "user123"})
- customBus.Fire(MyEvent{name: "/user/login", data: "user123"})
+ // Publish events
+ bus.Publish(MyEvent{name: "user.login", data: "user123"})
+ bus.Publish(MyEvent{name: "user.logout", data: "user123"})
+ customBus.Publish(MyEvent{name: "/user/login", data: "user123"})
// Wait a bit for event processing
time.Sleep(100 * time.Millisecond)
@@ -254,22 +244,18 @@ Creates a new event bus with optional settings. Supports the following options:
- `Wildcard` - sets the wildcard character for subscriptions (default "\*")
- `Capacity` - sets the initial capacity for listeners trie (default 32)
-### `func (b *bus) Subscribe(path string) Listener`
+### `func (b *bus) Subscribe(ctx context.Context, path string) Listener`
Subscribes to an event at the specified path. Returns a listener channel for
receiving events.
-### `func (b *bus) Unsubscribe(l Listener)`
-
-Unsubscribes from an event and closes the listener channel.
-
### `func (b *bus) Close()`
Closes the event bus and all listener channels.
-### `func (b *bus) Fire(ev Event)`
+### `func (b *bus) Publish(ev Event)`
-Fires an event to all subscribers who are subscribed to the corresponding event
+Publishs an event to all subscribers who are subscribed to the corresponding event
path.
## License
diff --git a/bus.go b/bus.go
index 9ab1980..86e9cf7 100644
--- a/bus.go
+++ b/bus.go
@@ -2,6 +2,7 @@
package eventbus
import (
+ "context"
"strings"
"sync"
)
@@ -46,29 +47,21 @@ type Event interface {
// Subscribe to an event at the specified path. Returns a listener channel for
// receiving events.
-func (b *Bus) Subscribe(path string) Listener {
+func (b *Bus) Subscribe(ctx context.Context, path string) Listener {
splitedPath := strings.Split(path, b.nameSeparator)
ch := make(Listener, 1)
b.listeners.Put(splitedPath, ch)
+ go func() {
+ <-ctx.Done()
+ close(ch)
+ b.listeners.Remove(ch)
+ }()
return ch
}
-// Unsubscribe from an event and closes the listener channel.
-func (b *Bus) Unsubscribe(l Listener) {
- close(l)
- b.listeners.Remove(l)
-}
-
-// Close the event bus and all listener channels.
-func (b *Bus) Close() {
- b.listeners.Clear(func(value Listener) {
- close(value)
- })
-}
-
-// Fire an event to all subscribers who are subscribed to the corresponding
+// Publish an event to all subscribers who are subscribed to the corresponding
// event path.
-func (b *Bus) Fire(ev Event) {
+func (b *Bus) Publish(ev Event) {
splitedEventName := strings.Split(ev.Event(), b.nameSeparator)
listeners := b.listeners.Get(splitedEventName, b.wildcard)
@@ -84,6 +77,13 @@ func (b *Bus) Fire(ev Event) {
wg.Wait()
}
+// Close the event bus and all listener channels.
+func (b *Bus) Close() {
+ b.listeners.Clear(func(value Listener) {
+ close(value)
+ })
+}
+
type noCopy struct{}
func (*noCopy) Lock() {}
diff --git a/bus_test.go b/bus_test.go
index d7d5a08..bfead50 100644
--- a/bus_test.go
+++ b/bus_test.go
@@ -15,12 +15,11 @@ func (e testEvent) Event() string {
return e.name
}
-func TestBusFire(t *testing.T) {
+func TestBusPublish(t *testing.T) {
bus := eventbus.New()
// Подписываемся на событие
- ch := bus.Subscribe("test.event")
- defer bus.Unsubscribe(ch)
+ ch := bus.Subscribe(t.Context(), "test.event")
// Создаем канал для проверки получения события
received := make(chan bool, 1)
@@ -38,7 +37,7 @@ func TestBusFire(t *testing.T) {
}()
// Отправляем событие
- bus.Fire(testEvent{name: "test.event"})
+ bus.Publish(testEvent{name: "test.event"})
// Проверяем получение события
select {
@@ -51,12 +50,12 @@ func TestBusFire(t *testing.T) {
}
}
-func TestBusFireWithWildcard(t *testing.T) {
+func TestBusPublishWithWildcard(t *testing.T) {
bus := eventbus.New(eventbus.NameSeparator("/"), eventbus.Wildcard("#"), eventbus.Capacity(32))
defer bus.Close()
// Подписываемся на wildcard событие
- ch := bus.Subscribe("/test/#")
+ ch := bus.Subscribe(t.Context(), "/test/#")
// Создаем канал для проверки получения события
received := make(chan bool, 1)
@@ -74,7 +73,7 @@ func TestBusFireWithWildcard(t *testing.T) {
}()
// Отправляем событие
- bus.Fire(testEvent{name: "/test/event"})
+ bus.Publish(testEvent{name: "/test/event"})
// Проверяем получение события
select {