// Package eventbus представляет собой асинхронную шину событий package eventbus import ( "context" "strings" "sync" ) const ( defaultCapacity = 32 ) type Bus struct { noCopy listeners node[Listener] nameSeparator string wildcard string } func New(opts ...Opt) *Bus { b := &Bus{ nameSeparator: ".", wildcard: "*", } for _, o := range opts { o.Apply(b) } if cap(b.listeners.values) == 0 { b.listeners = node[Listener]{ children: make(map[string]*node[Listener], defaultCapacity), values: make([]Listener, 0, defaultCapacity), } } return b } type Listener chan Event type Event interface { Event() string } // Subscribe to an event at the specified path. Returns a listener channel for // receiving events. func (b *Bus) Subscribe(ctx context.Context, path string) Listener { splitedPath := strings.Split(path, b.nameSeparator) ch := make(Listener, 1) b.listeners.Put(splitedPath, ch) go func() { <-ctx.Done() close(ch) b.listeners.Remove(ch) }() return ch } // Publish an event to all subscribers who are subscribed to the corresponding // event path. func (b *Bus) Publish(ev Event) { splitedEventName := strings.Split(ev.Event(), b.nameSeparator) listeners := b.listeners.Get(splitedEventName, b.wildcard) wg := sync.WaitGroup{} for _, l := range listeners { wg.Go(func() { select { case l <- ev: default: } }) } wg.Wait() } // Close the event bus and all listener channels. func (b *Bus) Close() { b.listeners.Clear(func(value Listener) { close(value) }) } type noCopy struct{} func (*noCopy) Lock() {} func (*noCopy) Unlock() {}