aboutsummaryrefslogtreecommitdiff
path: root/bus.go
diff options
context:
space:
mode:
author2026-01-24 17:46:54 +0300
committer2026-01-24 17:46:54 +0300
commit95fc6de88975781abfe576977c960197a9743442 (patch)
tree00ed76b0ff22f71af4b450c604f16ada72ed5241 /bus.go
downloadeventbus-95fc6de88975781abfe576977c960197a9743442.tar.gz
eventbus-95fc6de88975781abfe576977c960197a9743442.tar.bz2
eventbus-95fc6de88975781abfe576977c960197a9743442.tar.xz
eventbus-95fc6de88975781abfe576977c960197a9743442.zip
v1.0.0v1.0.0
Diffstat (limited to '')
-rw-r--r--bus.go85
1 files changed, 85 insertions, 0 deletions
diff --git a/bus.go b/bus.go
new file mode 100644
index 0000000..7ce4860
--- /dev/null
+++ b/bus.go
@@ -0,0 +1,85 @@
+// Package eventbus представляет собой асинхронную шину событий
+package eventbus
+
+import (
+ "strings"
+ "sync"
+)
+
+const (
+ defaultCapacity = 32
+)
+
+type bus struct {
+ noCopy
+
+ listeners node[Listener]
+ nameSeparator string
+ wildcard string
+}
+
+func New(opts ...Opt) *bus {
+ b := &bus{
+ nameSeparator: ".",
+ wildcard: "*",
+ }
+
+ for _, o := range opts {
+ o.Apply(b)
+ }
+
+ if cap(b.listeners.values) == 0 {
+ b.listeners = node[Listener]{
+ children: make(map[string]*node[Listener], defaultCapacity),
+ values: make([]Listener, 0, defaultCapacity),
+ }
+ }
+
+ return b
+}
+
+type Listener chan Event
+
+type Event interface {
+ Event() string
+}
+
+func (b *bus) Subscribe(path string) Listener {
+ splitedPath := strings.Split(path, b.nameSeparator)
+ ch := make(Listener, 1)
+ b.listeners.Put(splitedPath, ch)
+ return ch
+}
+
+func (b *bus) Unsubscribe(l Listener) {
+ close(l)
+ b.listeners.Remove(l)
+}
+
+func (b *bus) Close() {
+ b.listeners.Clear(func(value Listener) {
+ close(value)
+ })
+}
+
+func (b *bus) Fire(ev Event) {
+ splitedEventName := strings.Split(ev.Event(), b.nameSeparator)
+ listeners := b.listeners.Get(splitedEventName, b.wildcard)
+
+ wg := sync.WaitGroup{}
+ for _, l := range listeners {
+ wg.Go(func() {
+ select {
+ case l <- ev:
+ default:
+ }
+ })
+ }
+ wg.Wait()
+}
+
+type noCopy struct{}
+
+func (*noCopy) Lock() {}
+
+func (*noCopy) Unlock() {}