summaryrefslogtreecommitdiff
path: root/internal/events
diff options
context:
space:
mode:
Diffstat (limited to 'internal/events')
-rw-r--r--internal/events/contract.go11
-rw-r--r--internal/events/events.go34
-rw-r--r--internal/events/node.go69
3 files changed, 114 insertions, 0 deletions
diff --git a/internal/events/contract.go b/internal/events/contract.go
new file mode 100644
index 0000000..dc5003f
--- /dev/null
+++ b/internal/events/contract.go
@@ -0,0 +1,11 @@
+package events
+
+import "go.neonxp.dev/djson/internal/model"
+
+type Dispatcher interface {
+ Subscribe(path []string, id string, ch chan model.Mutation)
+
+ Unsubscribe(path []string, id string)
+
+ Notify(path []string, event *model.Mutation)
+}
diff --git a/internal/events/events.go b/internal/events/events.go
new file mode 100644
index 0000000..49731b8
--- /dev/null
+++ b/internal/events/events.go
@@ -0,0 +1,34 @@
+package events
+
+import (
+ "sync"
+
+ "go.neonxp.dev/djson/internal/model"
+)
+
+type stdDispatcher struct {
+ tree subscriberNode
+}
+
+func New() Dispatcher {
+ return &stdDispatcher{
+ tree: subscriberNode{
+ children: make(map[string]*subscriberNode),
+ channels: make(map[string]chan model.Mutation),
+ parent: nil,
+ mu: sync.RWMutex{},
+ },
+ }
+}
+
+func (ed *stdDispatcher) Subscribe(path []string, id string, ch chan model.Mutation) {
+ ed.tree.subscribe(path, id, ch)
+}
+
+func (ed *stdDispatcher) Unsubscribe(path []string, id string) {
+ ed.tree.unsubscribe(path, id)
+}
+
+func (ed *stdDispatcher) Notify(path []string, event *model.Mutation) {
+ ed.tree.notify(path, event)
+}
diff --git a/internal/events/node.go b/internal/events/node.go
new file mode 100644
index 0000000..a1d9c3e
--- /dev/null
+++ b/internal/events/node.go
@@ -0,0 +1,69 @@
+package events
+
+import (
+ "sync"
+
+ "go.neonxp.dev/djson/internal/model"
+)
+
+type subscriberNode struct {
+ parent *subscriberNode
+ children map[string]*subscriberNode
+ channels map[string]chan model.Mutation
+ mu sync.RWMutex
+}
+
+func (sn *subscriberNode) subscribe(path []string, id string, ch chan model.Mutation) {
+ sn.mu.Lock()
+ defer sn.mu.Unlock()
+ if len(path) == 0 {
+ sn.channels[id] = ch
+ return
+ }
+ head, rest := path[0], path[1:]
+ child, ok := sn.children[head]
+ if !ok {
+ child = &subscriberNode{
+ parent: sn,
+ children: make(map[string]*subscriberNode),
+ channels: make(map[string]chan model.Mutation),
+ }
+ sn.children[head] = child
+ }
+ if len(rest) == 0 {
+ child.channels[id] = ch
+ return
+ }
+ child.subscribe(rest, id, ch)
+}
+
+func (sn *subscriberNode) unsubscribe(path []string, id string) {
+ sn.mu.Lock()
+ defer sn.mu.Unlock()
+ if len(path) == 0 {
+ close(sn.channels[id])
+ delete(sn.channels, id)
+ return
+ }
+ head, rest := path[0], path[1:]
+ if child, ok := sn.children[head]; ok {
+ child.unsubscribe(rest, id)
+ }
+}
+
+func (sn *subscriberNode) notify(path []string, event *model.Mutation) {
+ sn.mu.RLock()
+ defer sn.mu.RUnlock()
+ for _, ch := range sn.channels {
+ go func(ch chan model.Mutation) {
+ ch <- *event
+ }(ch)
+ }
+ if len(path) == 0 {
+ return
+ }
+ head, rest := path[0], path[1:]
+ if child, ok := sn.children[head]; ok {
+ child.notify(rest, event)
+ }
+}