diff options
Diffstat (limited to 'internal/events')
-rw-r--r-- | internal/events/contract.go | 11 | ||||
-rw-r--r-- | internal/events/events.go | 34 | ||||
-rw-r--r-- | internal/events/node.go | 69 |
3 files changed, 114 insertions, 0 deletions
diff --git a/internal/events/contract.go b/internal/events/contract.go new file mode 100644 index 0000000..dc5003f --- /dev/null +++ b/internal/events/contract.go @@ -0,0 +1,11 @@ +package events + +import "go.neonxp.dev/djson/internal/model" + +type Dispatcher interface { + Subscribe(path []string, id string, ch chan model.Mutation) + + Unsubscribe(path []string, id string) + + Notify(path []string, event *model.Mutation) +} diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 0000000..49731b8 --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,34 @@ +package events + +import ( + "sync" + + "go.neonxp.dev/djson/internal/model" +) + +type stdDispatcher struct { + tree subscriberNode +} + +func New() Dispatcher { + return &stdDispatcher{ + tree: subscriberNode{ + children: make(map[string]*subscriberNode), + channels: make(map[string]chan model.Mutation), + parent: nil, + mu: sync.RWMutex{}, + }, + } +} + +func (ed *stdDispatcher) Subscribe(path []string, id string, ch chan model.Mutation) { + ed.tree.subscribe(path, id, ch) +} + +func (ed *stdDispatcher) Unsubscribe(path []string, id string) { + ed.tree.unsubscribe(path, id) +} + +func (ed *stdDispatcher) Notify(path []string, event *model.Mutation) { + ed.tree.notify(path, event) +} diff --git a/internal/events/node.go b/internal/events/node.go new file mode 100644 index 0000000..a1d9c3e --- /dev/null +++ b/internal/events/node.go @@ -0,0 +1,69 @@ +package events + +import ( + "sync" + + "go.neonxp.dev/djson/internal/model" +) + +type subscriberNode struct { + parent *subscriberNode + children map[string]*subscriberNode + channels map[string]chan model.Mutation + mu sync.RWMutex +} + +func (sn *subscriberNode) subscribe(path []string, id string, ch chan model.Mutation) { + sn.mu.Lock() + defer sn.mu.Unlock() + if len(path) == 0 { + sn.channels[id] = ch + return + } + head, rest := path[0], path[1:] + child, ok := sn.children[head] + if !ok { + child = &subscriberNode{ + parent: sn, + children: make(map[string]*subscriberNode), + channels: make(map[string]chan model.Mutation), + } + sn.children[head] = child + } + if len(rest) == 0 { + child.channels[id] = ch + return + } + child.subscribe(rest, id, ch) +} + +func (sn *subscriberNode) unsubscribe(path []string, id string) { + sn.mu.Lock() + defer sn.mu.Unlock() + if len(path) == 0 { + close(sn.channels[id]) + delete(sn.channels, id) + return + } + head, rest := path[0], path[1:] + if child, ok := sn.children[head]; ok { + child.unsubscribe(rest, id) + } +} + +func (sn *subscriberNode) notify(path []string, event *model.Mutation) { + sn.mu.RLock() + defer sn.mu.RUnlock() + for _, ch := range sn.channels { + go func(ch chan model.Mutation) { + ch <- *event + }(ch) + } + if len(path) == 0 { + return + } + head, rest := path[0], path[1:] + if child, ok := sn.children[head]; ok { + child.notify(rest, event) + } +} |