package events import ( "sync" "go.neonxp.dev/djson/internal/model" ) type subscriberNode struct { parent *subscriberNode children map[string]*subscriberNode channels map[string]chan model.Mutation mu sync.RWMutex } func (sn *subscriberNode) subscribe(path []string, id string, ch chan model.Mutation) { sn.mu.Lock() defer sn.mu.Unlock() if len(path) == 0 { sn.channels[id] = ch return } head, rest := path[0], path[1:] child, ok := sn.children[head] if !ok { child = &subscriberNode{ parent: sn, children: make(map[string]*subscriberNode), channels: make(map[string]chan model.Mutation), } sn.children[head] = child } if len(rest) == 0 { child.channels[id] = ch return } child.subscribe(rest, id, ch) } func (sn *subscriberNode) unsubscribe(path []string, id string) { sn.mu.Lock() defer sn.mu.Unlock() if len(path) == 0 { close(sn.channels[id]) delete(sn.channels, id) return } head, rest := path[0], path[1:] if child, ok := sn.children[head]; ok { child.unsubscribe(rest, id) } } func (sn *subscriberNode) notify(path []string, event *model.Mutation) { sn.mu.RLock() defer sn.mu.RUnlock() for _, ch := range sn.channels { go func(ch chan model.Mutation) { ch <- *event }(ch) } if len(path) == 0 { return } head, rest := path[0], path[1:] if child, ok := sn.children[head]; ok { child.notify(rest, event) } }