From 95fc6de88975781abfe576977c960197a9743442 Mon Sep 17 00:00:00 2001 From: Alexander Neonxp Kiryukhin Date: Sat, 24 Jan 2026 17:46:54 +0300 Subject: v1.0.0 --- bus.go | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 bus.go (limited to 'bus.go') diff --git a/bus.go b/bus.go new file mode 100644 index 0000000..7ce4860 --- /dev/null +++ b/bus.go @@ -0,0 +1,85 @@ +// Package eventbus представляет собой асинхронную шину событий +package eventbus + +import ( + "strings" + "sync" +) + +const ( + defaultCapacity = 32 +) + +type bus struct { + noCopy + + listeners node[Listener] + nameSeparator string + wildcard string +} + +func New(opts ...Opt) *bus { + b := &bus{ + nameSeparator: ".", + wildcard: "*", + } + + for _, o := range opts { + o.Apply(b) + } + + if cap(b.listeners.values) == 0 { + b.listeners = node[Listener]{ + children: make(map[string]*node[Listener], defaultCapacity), + values: make([]Listener, 0, defaultCapacity), + } + } + + return b +} + +type Listener chan Event + +type Event interface { + Event() string +} + +func (b *bus) Subscribe(path string) Listener { + splitedPath := strings.Split(path, b.nameSeparator) + ch := make(Listener, 1) + b.listeners.Put(splitedPath, ch) + return ch +} + +func (b *bus) Unsubscribe(l Listener) { + close(l) + b.listeners.Remove(l) +} + +func (b *bus) Close() { + b.listeners.Clear(func(value Listener) { + close(value) + }) +} + +func (b *bus) Fire(ev Event) { + splitedEventName := strings.Split(ev.Event(), b.nameSeparator) + listeners := b.listeners.Get(splitedEventName, b.wildcard) + + wg := sync.WaitGroup{} + for _, l := range listeners { + wg.Go(func() { + select { + case l <- ev: + default: + } + }) + } + wg.Wait() +} + +type noCopy struct{} + +func (*noCopy) Lock() {} + +func (*noCopy) Unlock() {} -- cgit v1.2.3