summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeonXP <i@neonxp.dev>2022-12-09 03:36:06 +0300
committerNeonXP <i@neonxp.dev>2022-12-09 03:36:06 +0300
commit2055e57de00865607223401a890125909080f4a3 (patch)
tree73a74ec3370f7211601302bd77bada10b75ff665
parent9d46ca252151a2c48434f9ec201bcb3c9133ec78 (diff)
events added
-rw-r--r--cmd/api/flags.go1
-rw-r--r--cmd/api/main.go23
-rw-r--r--go.mod2
-rw-r--r--go.sum4
-rw-r--r--internal/events/contract.go11
-rw-r--r--internal/events/events.go34
-rw-r--r--internal/events/node.go69
-rw-r--r--internal/handler/contract.go3
-rw-r--r--internal/handler/crud.go (renamed from internal/handler/tree.go)54
-rw-r--r--internal/handler/events.go67
-rw-r--r--internal/handler/handler.go39
-rw-r--r--internal/handler/hearthbeat.go22
-rw-r--r--internal/tree/contract.go2
-rw-r--r--internal/tree/core.go21
-rw-r--r--internal/tree/mutations.go13
15 files changed, 287 insertions, 78 deletions
diff --git a/cmd/api/flags.go b/cmd/api/flags.go
index b4910fa..9cdf9ec 100644
--- a/cmd/api/flags.go
+++ b/cmd/api/flags.go
@@ -4,6 +4,7 @@ import "flag"
var (
cluster = flag.String("cluster", "", "Master node address (ex: 192.168.1.10:5657)")
+ clusterID = flag.String("cluster_id", "", "Cluster id")
clusterAddr = flag.String("cluster_addr", ":5657", "Self address for cluster")
apiAddr = flag.String("api_addr", ":5656", "API address")
dbPath = flag.String("db_path", "/var/djson.db", "DB file")
diff --git a/cmd/api/main.go b/cmd/api/main.go
index 796837b..51e16af 100644
--- a/cmd/api/main.go
+++ b/cmd/api/main.go
@@ -13,6 +13,7 @@ import (
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
+ "go.neonxp.dev/djson/internal/events"
"go.neonxp.dev/djson/internal/handler"
"go.neonxp.dev/djson/internal/storage"
"go.neonxp.dev/djson/internal/tree"
@@ -29,29 +30,30 @@ func main() {
LogLevel: "debug",
})
- // TBD raft cluster initialization
- if cluster != nil {
- logger.Info().Str("cluster connect", *cluster).Send()
- } else {
- logger.Info().Str("cluster create", *clusterAddr).Send()
- }
-
// Tree storage
storage, err := storage.New(*dbPath, logger)
if err != nil {
panic(err)
}
-
+ eventsDispatcher := events.New()
// Tree engine
core := tree.New(
storage,
+ eventsDispatcher,
)
if err := core.Init(); err != nil {
panic(err)
}
+ // TBD raft cluster initialization
+ if cluster != nil {
+ logger.Info().Str("cluster connect", *cluster).Send()
+ } else {
+ logger.Info().Str("cluster create", *clusterAddr).Send()
+ }
+
// Tree HTTP wrapper
- treeHandler := handler.New(core)
+ treeHandler := handler.New(core, eventsDispatcher)
// HTTP router
r := chi.NewRouter()
@@ -62,7 +64,8 @@ func main() {
r.Use(middleware.Recoverer)
r.Get("/health", treeHandler.Hearthbeat)
- r.Route("/tree", treeHandler.Handle)
+ r.Route("/tree", treeHandler.HandleCRUD)
+ r.Route("/events", treeHandler.HandleEvents)
server := &http.Server{
Addr: *apiAddr,
diff --git a/go.mod b/go.mod
index 9c9f7d6..2067f5e 100644
--- a/go.mod
+++ b/go.mod
@@ -12,6 +12,6 @@ require (
require (
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
- github.com/rs/zerolog v1.27.0 // indirect
+ github.com/rs/zerolog v1.27.0
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
)
diff --git a/go.sum b/go.sum
index 214bd7d..7e3ae08 100644
--- a/go.sum
+++ b/go.sum
@@ -12,8 +12,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.27.0 h1:1T7qCieN22GVc8S4Q2yuexzBb1EqjbgjSH9RohbMjKs=
github.com/rs/zerolog v1.27.0/go.mod h1:7frBqO0oezxmnO7GF86FY++uy8I0Tk/If5ni1G9Qc0U=
-go.neonxp.dev/json v0.0.2 h1:wrq3Xm70qWrAVUQdfpCR5+cwzJ2QaVFMiGTzQn2dtAw=
-go.neonxp.dev/json v0.0.2/go.mod h1:d+el/XRG46QCbCsly5PgUqyiwuU9GzBxFTGCuGHyd0w=
+go.neonxp.dev/json v0.0.4 h1:PHLJANFNXv8vn79YWpWEW7l0dSxZ7XmuRQsHs9JC20w=
+go.neonxp.dev/json v0.0.4/go.mod h1:d+el/XRG46QCbCsly5PgUqyiwuU9GzBxFTGCuGHyd0w=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
diff --git a/internal/events/contract.go b/internal/events/contract.go
new file mode 100644
index 0000000..dc5003f
--- /dev/null
+++ b/internal/events/contract.go
@@ -0,0 +1,11 @@
+package events
+
+import "go.neonxp.dev/djson/internal/model"
+
+type Dispatcher interface {
+ Subscribe(path []string, id string, ch chan model.Mutation)
+
+ Unsubscribe(path []string, id string)
+
+ Notify(path []string, event *model.Mutation)
+}
diff --git a/internal/events/events.go b/internal/events/events.go
new file mode 100644
index 0000000..49731b8
--- /dev/null
+++ b/internal/events/events.go
@@ -0,0 +1,34 @@
+package events
+
+import (
+ "sync"
+
+ "go.neonxp.dev/djson/internal/model"
+)
+
+type stdDispatcher struct {
+ tree subscriberNode
+}
+
+func New() Dispatcher {
+ return &stdDispatcher{
+ tree: subscriberNode{
+ children: make(map[string]*subscriberNode),
+ channels: make(map[string]chan model.Mutation),
+ parent: nil,
+ mu: sync.RWMutex{},
+ },
+ }
+}
+
+func (ed *stdDispatcher) Subscribe(path []string, id string, ch chan model.Mutation) {
+ ed.tree.subscribe(path, id, ch)
+}
+
+func (ed *stdDispatcher) Unsubscribe(path []string, id string) {
+ ed.tree.unsubscribe(path, id)
+}
+
+func (ed *stdDispatcher) Notify(path []string, event *model.Mutation) {
+ ed.tree.notify(path, event)
+}
diff --git a/internal/events/node.go b/internal/events/node.go
new file mode 100644
index 0000000..a1d9c3e
--- /dev/null
+++ b/internal/events/node.go
@@ -0,0 +1,69 @@
+package events
+
+import (
+ "sync"
+
+ "go.neonxp.dev/djson/internal/model"
+)
+
+type subscriberNode struct {
+ parent *subscriberNode
+ children map[string]*subscriberNode
+ channels map[string]chan model.Mutation
+ mu sync.RWMutex
+}
+
+func (sn *subscriberNode) subscribe(path []string, id string, ch chan model.Mutation) {
+ sn.mu.Lock()
+ defer sn.mu.Unlock()
+ if len(path) == 0 {
+ sn.channels[id] = ch
+ return
+ }
+ head, rest := path[0], path[1:]
+ child, ok := sn.children[head]
+ if !ok {
+ child = &subscriberNode{
+ parent: sn,
+ children: make(map[string]*subscriberNode),
+ channels: make(map[string]chan model.Mutation),
+ }
+ sn.children[head] = child
+ }
+ if len(rest) == 0 {
+ child.channels[id] = ch
+ return
+ }
+ child.subscribe(rest, id, ch)
+}
+
+func (sn *subscriberNode) unsubscribe(path []string, id string) {
+ sn.mu.Lock()
+ defer sn.mu.Unlock()
+ if len(path) == 0 {
+ close(sn.channels[id])
+ delete(sn.channels, id)
+ return
+ }
+ head, rest := path[0], path[1:]
+ if child, ok := sn.children[head]; ok {
+ child.unsubscribe(rest, id)
+ }
+}
+
+func (sn *subscriberNode) notify(path []string, event *model.Mutation) {
+ sn.mu.RLock()
+ defer sn.mu.RUnlock()
+ for _, ch := range sn.channels {
+ go func(ch chan model.Mutation) {
+ ch <- *event
+ }(ch)
+ }
+ if len(path) == 0 {
+ return
+ }
+ head, rest := path[0], path[1:]
+ if child, ok := sn.children[head]; ok {
+ child.notify(rest, event)
+ }
+}
diff --git a/internal/handler/contract.go b/internal/handler/contract.go
index ed79158..8887be8 100644
--- a/internal/handler/contract.go
+++ b/internal/handler/contract.go
@@ -7,6 +7,7 @@ import (
)
type Handler interface {
- Handle(r chi.Router)
+ HandleCRUD(r chi.Router)
+ HandleEvents(r chi.Router)
Hearthbeat(w http.ResponseWriter, r *http.Request)
}
diff --git a/internal/handler/tree.go b/internal/handler/crud.go
index e574b2b..4ab0c7f 100644
--- a/internal/handler/tree.go
+++ b/internal/handler/crud.go
@@ -3,29 +3,15 @@ package handler
import (
"io"
"net/http"
- "strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
- "go.neonxp.dev/json"
- jsonModel "go.neonxp.dev/json/model"
-
"go.neonxp.dev/djson/internal/model"
- "go.neonxp.dev/djson/internal/tree"
+ "go.neonxp.dev/json"
)
-func New(core tree.Core) Handler {
- return &handler{
- core: core,
- }
-}
-
-type handler struct {
- core tree.Core
-}
-
-func (h *handler) Handle(r chi.Router) {
+func (h *handler) HandleCRUD(r chi.Router) {
r.Use(middleware.CleanPath)
r.Use(middleware.StripSlashes)
@@ -65,7 +51,7 @@ func (h *handler) Handle(r chi.Router) {
Path: path,
Body: node,
}
- if err := h.core.Mutation(r.Context(), mutation); err != nil {
+ if err := h.core.Mutate(r.Context(), mutation); err != nil {
writeError(http.StatusInternalServerError, err, w)
return
}
@@ -92,7 +78,7 @@ func (h *handler) Handle(r chi.Router) {
Path: path,
Body: node,
}
- if err := h.core.Mutation(r.Context(), mutation); err != nil {
+ if err := h.core.Mutate(r.Context(), mutation); err != nil {
writeError(http.StatusInternalServerError, err, w)
return
}
@@ -108,40 +94,10 @@ func (h *handler) Handle(r chi.Router) {
Path: path,
Body: nil,
}
- if err := h.core.Mutation(r.Context(), mutation); err != nil {
+ if err := h.core.Mutate(r.Context(), mutation); err != nil {
writeError(http.StatusInternalServerError, err, w)
return
}
w.WriteHeader(http.StatusNoContent)
})
}
-
-func (h *handler) Hearthbeat(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "text/plain")
- switch h.core.State() {
- case tree.Ready:
- w.WriteHeader(http.StatusOK)
- _, _ = w.Write([]byte("."))
- case tree.Failed:
- w.WriteHeader(http.StatusInternalServerError)
- _, _ = w.Write([]byte("start failed"))
- case tree.Running:
- w.WriteHeader(http.StatusServiceUnavailable)
- _, _ = w.Write([]byte("starting..."))
- }
-}
-
-func writeError(code int, err error, w http.ResponseWriter) {
- jsonErr, _ := json.Marshal(jsonModel.NewNode(err.Error()))
- _, _ = w.Write(jsonErr)
-}
-
-func parsePath(nodePath string) []string {
- arr := []string{}
- for _, v := range strings.Split(nodePath, "/") {
- if v != "" {
- arr = append(arr, v)
- }
- }
- return arr
-}
diff --git a/internal/handler/events.go b/internal/handler/events.go
new file mode 100644
index 0000000..299971d
--- /dev/null
+++ b/internal/handler/events.go
@@ -0,0 +1,67 @@
+package handler
+
+import (
+ "fmt"
+ "net/http"
+ "strings"
+
+ "github.com/go-chi/chi/v5"
+ "github.com/go-chi/chi/v5/middleware"
+ dmodel "go.neonxp.dev/djson/internal/model"
+)
+
+func (h *handler) HandleEvents(r chi.Router) {
+ r.Route("/sse", func(r chi.Router) {
+ r.Get("/*", func(w http.ResponseWriter, r *http.Request) {
+ h.receiveEvents(w, r, func(ev *dmodel.Mutation) {
+ message, _ := ev.Body.MarshalJSON()
+ fmt.Fprintf(
+ w,
+ `event: %s\ndata: {"path":"%s","data":%s}\n\n`,
+ ev.Type.String(),
+ strings.Join(ev.Path, "/"),
+ message,
+ )
+ })
+ })
+ })
+ r.Route("/json", func(r chi.Router) {
+ r.Get("/*", func(w http.ResponseWriter, r *http.Request) {
+ h.receiveEvents(w, r, func(ev *dmodel.Mutation) {
+ message, _ := ev.Body.MarshalJSON()
+ fmt.Fprintf(
+ w,
+ `{"event":"%s","path":"%s","data":"%s"}\n`,
+ ev.Type.String(),
+ strings.Join(ev.Path, "/"),
+ message,
+ )
+ })
+ })
+ })
+}
+
+func (h *handler) receiveEvents(
+ w http.ResponseWriter,
+ r *http.Request,
+ render func(ev *dmodel.Mutation),
+) {
+ flusher := w.(http.Flusher)
+ w.Header().Set("Content-Type", "application/x-ndjson")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ rctx := chi.RouteContext(r.Context())
+ path := parsePath(rctx.RoutePath)
+ ch := make(chan dmodel.Mutation)
+ reqID := middleware.GetReqID(r.Context())
+ h.events.Subscribe(path, reqID, ch)
+ defer h.events.Unsubscribe(path, reqID)
+ go func() {
+ for ev := range ch {
+ render(&ev)
+ flusher.Flush()
+ }
+ }()
+ <-r.Context().Done()
+}
diff --git a/internal/handler/handler.go b/internal/handler/handler.go
new file mode 100644
index 0000000..11c1936
--- /dev/null
+++ b/internal/handler/handler.go
@@ -0,0 +1,39 @@
+package handler
+
+import (
+ "net/http"
+ "strings"
+
+ "go.neonxp.dev/json"
+ jsonModel "go.neonxp.dev/json/model"
+
+ "go.neonxp.dev/djson/internal/events"
+ "go.neonxp.dev/djson/internal/tree"
+)
+
+func New(core tree.Core, eventsDispatcher events.Dispatcher) Handler {
+ return &handler{
+ core: core,
+ events: eventsDispatcher,
+ }
+}
+
+type handler struct {
+ core tree.Core
+ events events.Dispatcher
+}
+
+func writeError(code int, err error, w http.ResponseWriter) {
+ jsonErr, _ := json.Marshal(jsonModel.NewNode(err.Error()))
+ _, _ = w.Write(jsonErr)
+}
+
+func parsePath(nodePath string) []string {
+ arr := []string{}
+ for _, v := range strings.Split(nodePath, "/") {
+ if v != "" {
+ arr = append(arr, v)
+ }
+ }
+ return arr
+}
diff --git a/internal/handler/hearthbeat.go b/internal/handler/hearthbeat.go
new file mode 100644
index 0000000..014880e
--- /dev/null
+++ b/internal/handler/hearthbeat.go
@@ -0,0 +1,22 @@
+package handler
+
+import (
+ "net/http"
+
+ "go.neonxp.dev/djson/internal/tree"
+)
+
+func (h *handler) Hearthbeat(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/plain")
+ switch h.core.State() {
+ case tree.Ready:
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte("."))
+ case tree.Failed:
+ w.WriteHeader(http.StatusInternalServerError)
+ _, _ = w.Write([]byte("start failed"))
+ case tree.Running:
+ w.WriteHeader(http.StatusServiceUnavailable)
+ _, _ = w.Write([]byte("starting..."))
+ }
+}
diff --git a/internal/tree/contract.go b/internal/tree/contract.go
index b949339..29748cb 100644
--- a/internal/tree/contract.go
+++ b/internal/tree/contract.go
@@ -10,7 +10,7 @@ import (
type Core interface {
Init() error
Get(nodes []string) (model.Node, error)
- Mutation(ctx context.Context, mut *dmodel.Mutation) error
+ Mutate(ctx context.Context, mut *dmodel.Mutation) error
State() CoreState
}
diff --git a/internal/tree/core.go b/internal/tree/core.go
index aa47fb6..d2cace3 100644
--- a/internal/tree/core.go
+++ b/internal/tree/core.go
@@ -3,23 +3,26 @@ package tree
import (
"sync"
+ "go.neonxp.dev/djson/internal/events"
"go.neonxp.dev/djson/internal/storage"
"go.neonxp.dev/json/model"
)
type stdCore struct {
- Root model.ObjectNode
- state CoreState
- mu sync.RWMutex
- storage storage.Storage
+ Root model.ObjectNode
+ state CoreState
+ mu sync.RWMutex
+ storage storage.Storage
+ eventDispatcher events.Dispatcher
}
-func New(storage storage.Storage) Core {
+func New(storage storage.Storage, eventsDispatcher events.Dispatcher) Core {
return &stdCore{
- Root: model.ObjectNode{},
- state: Running,
- mu: sync.RWMutex{},
- storage: storage,
+ Root: model.ObjectNode{},
+ state: Running,
+ mu: sync.RWMutex{},
+ storage: storage,
+ eventDispatcher: eventsDispatcher,
}
}
diff --git a/internal/tree/mutations.go b/internal/tree/mutations.go
index 9aa9056..173828d 100644
--- a/internal/tree/mutations.go
+++ b/internal/tree/mutations.go
@@ -9,7 +9,7 @@ import (
json "go.neonxp.dev/json/model"
)
-func (t *stdCore) Mutation(ctx context.Context, mut *model.Mutation) error {
+func (t *stdCore) Mutate(ctx context.Context, mut *model.Mutation) error {
t.mu.Lock()
defer t.mu.Unlock()
if err := t.execute(mut); err != nil {
@@ -40,7 +40,9 @@ func (t *stdCore) execute(mut *model.Mutation) error {
if !ok {
return fmt.Errorf("node %s is not object", strings.Join(path, "/"))
}
- return targetObject.Value.Set(key, mut.Body)
+ if err := targetObject.Value.Set(key, mut.Body); err != nil {
+ return err
+ }
case model.Merge:
inObject, ok := mut.Body.(*json.ObjectNode)
if !ok {
@@ -60,7 +62,6 @@ func (t *stdCore) execute(mut *model.Mutation) error {
return fmt.Errorf("patch allowed only for objects")
}
targetObject.Merge(inObject)
- return nil
case model.Remove:
if len(mut.Path) == 0 {
return fmt.Errorf("can't remove root node. Only replace or create avaliable")
@@ -80,7 +81,9 @@ func (t *stdCore) execute(mut *model.Mutation) error {
return fmt.Errorf("remove allowed only from objects")
}
targetObject.Remove(key)
- return nil
+ default:
+ return fmt.Errorf("invalid command type: %d", mut.Type)
}
- return fmt.Errorf("invalid command type: %d", mut.Type)
+ t.eventDispatcher.Notify(mut.Path, mut)
+ return nil
}