diff options
| author | 2026-01-24 17:46:54 +0300 | |
|---|---|---|
| committer | 2026-01-24 17:46:54 +0300 | |
| commit | 95fc6de88975781abfe576977c960197a9743442 (patch) | |
| tree | 00ed76b0ff22f71af4b450c604f16ada72ed5241 /bus.go | |
| download | eventbus-95fc6de88975781abfe576977c960197a9743442.tar.gz eventbus-95fc6de88975781abfe576977c960197a9743442.tar.bz2 eventbus-95fc6de88975781abfe576977c960197a9743442.tar.xz eventbus-95fc6de88975781abfe576977c960197a9743442.zip | |
v1.0.0v1.0.0
Diffstat (limited to 'bus.go')
| -rw-r--r-- | bus.go | 85 |
1 files changed, 85 insertions, 0 deletions
@@ -0,0 +1,85 @@ +// Package eventbus представляет собой асинхронную шину событий +package eventbus + +import ( + "strings" + "sync" +) + +const ( + defaultCapacity = 32 +) + +type bus struct { + noCopy + + listeners node[Listener] + nameSeparator string + wildcard string +} + +func New(opts ...Opt) *bus { + b := &bus{ + nameSeparator: ".", + wildcard: "*", + } + + for _, o := range opts { + o.Apply(b) + } + + if cap(b.listeners.values) == 0 { + b.listeners = node[Listener]{ + children: make(map[string]*node[Listener], defaultCapacity), + values: make([]Listener, 0, defaultCapacity), + } + } + + return b +} + +type Listener chan Event + +type Event interface { + Event() string +} + +func (b *bus) Subscribe(path string) Listener { + splitedPath := strings.Split(path, b.nameSeparator) + ch := make(Listener, 1) + b.listeners.Put(splitedPath, ch) + return ch +} + +func (b *bus) Unsubscribe(l Listener) { + close(l) + b.listeners.Remove(l) +} + +func (b *bus) Close() { + b.listeners.Clear(func(value Listener) { + close(value) + }) +} + +func (b *bus) Fire(ev Event) { + splitedEventName := strings.Split(ev.Event(), b.nameSeparator) + listeners := b.listeners.Get(splitedEventName, b.wildcard) + + wg := sync.WaitGroup{} + for _, l := range listeners { + wg.Go(func() { + select { + case l <- ev: + default: + } + }) + } + wg.Wait() +} + +type noCopy struct{} + +func (*noCopy) Lock() {} + +func (*noCopy) Unlock() {} |
