diff options
author | NeonXP <i@neonxp.dev> | 2022-12-09 03:36:06 +0300 |
---|---|---|
committer | NeonXP <i@neonxp.dev> | 2022-12-09 03:36:06 +0300 |
commit | 2055e57de00865607223401a890125909080f4a3 (patch) | |
tree | 73a74ec3370f7211601302bd77bada10b75ff665 | |
parent | 9d46ca252151a2c48434f9ec201bcb3c9133ec78 (diff) |
events added
-rw-r--r-- | cmd/api/flags.go | 1 | ||||
-rw-r--r-- | cmd/api/main.go | 23 | ||||
-rw-r--r-- | go.mod | 2 | ||||
-rw-r--r-- | go.sum | 4 | ||||
-rw-r--r-- | internal/events/contract.go | 11 | ||||
-rw-r--r-- | internal/events/events.go | 34 | ||||
-rw-r--r-- | internal/events/node.go | 69 | ||||
-rw-r--r-- | internal/handler/contract.go | 3 | ||||
-rw-r--r-- | internal/handler/crud.go (renamed from internal/handler/tree.go) | 54 | ||||
-rw-r--r-- | internal/handler/events.go | 67 | ||||
-rw-r--r-- | internal/handler/handler.go | 39 | ||||
-rw-r--r-- | internal/handler/hearthbeat.go | 22 | ||||
-rw-r--r-- | internal/tree/contract.go | 2 | ||||
-rw-r--r-- | internal/tree/core.go | 21 | ||||
-rw-r--r-- | internal/tree/mutations.go | 13 |
15 files changed, 287 insertions, 78 deletions
diff --git a/cmd/api/flags.go b/cmd/api/flags.go index b4910fa..9cdf9ec 100644 --- a/cmd/api/flags.go +++ b/cmd/api/flags.go @@ -4,6 +4,7 @@ import "flag" var ( cluster = flag.String("cluster", "", "Master node address (ex: 192.168.1.10:5657)") + clusterID = flag.String("cluster_id", "", "Cluster id") clusterAddr = flag.String("cluster_addr", ":5657", "Self address for cluster") apiAddr = flag.String("api_addr", ":5656", "API address") dbPath = flag.String("db_path", "/var/djson.db", "DB file") diff --git a/cmd/api/main.go b/cmd/api/main.go index 796837b..51e16af 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -13,6 +13,7 @@ import ( "github.com/rs/zerolog" "golang.org/x/sync/errgroup" + "go.neonxp.dev/djson/internal/events" "go.neonxp.dev/djson/internal/handler" "go.neonxp.dev/djson/internal/storage" "go.neonxp.dev/djson/internal/tree" @@ -29,29 +30,30 @@ func main() { LogLevel: "debug", }) - // TBD raft cluster initialization - if cluster != nil { - logger.Info().Str("cluster connect", *cluster).Send() - } else { - logger.Info().Str("cluster create", *clusterAddr).Send() - } - // Tree storage storage, err := storage.New(*dbPath, logger) if err != nil { panic(err) } - + eventsDispatcher := events.New() // Tree engine core := tree.New( storage, + eventsDispatcher, ) if err := core.Init(); err != nil { panic(err) } + // TBD raft cluster initialization + if cluster != nil { + logger.Info().Str("cluster connect", *cluster).Send() + } else { + logger.Info().Str("cluster create", *clusterAddr).Send() + } + // Tree HTTP wrapper - treeHandler := handler.New(core) + treeHandler := handler.New(core, eventsDispatcher) // HTTP router r := chi.NewRouter() @@ -62,7 +64,8 @@ func main() { r.Use(middleware.Recoverer) r.Get("/health", treeHandler.Hearthbeat) - r.Route("/tree", treeHandler.Handle) + r.Route("/tree", treeHandler.HandleCRUD) + r.Route("/events", treeHandler.HandleEvents) server := &http.Server{ Addr: *apiAddr, @@ -12,6 +12,6 @@ require ( require ( github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect - github.com/rs/zerolog v1.27.0 // indirect + github.com/rs/zerolog v1.27.0 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect ) @@ -12,8 +12,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.27.0 h1:1T7qCieN22GVc8S4Q2yuexzBb1EqjbgjSH9RohbMjKs= github.com/rs/zerolog v1.27.0/go.mod h1:7frBqO0oezxmnO7GF86FY++uy8I0Tk/If5ni1G9Qc0U= -go.neonxp.dev/json v0.0.2 h1:wrq3Xm70qWrAVUQdfpCR5+cwzJ2QaVFMiGTzQn2dtAw= -go.neonxp.dev/json v0.0.2/go.mod h1:d+el/XRG46QCbCsly5PgUqyiwuU9GzBxFTGCuGHyd0w= +go.neonxp.dev/json v0.0.4 h1:PHLJANFNXv8vn79YWpWEW7l0dSxZ7XmuRQsHs9JC20w= +go.neonxp.dev/json v0.0.4/go.mod h1:d+el/XRG46QCbCsly5PgUqyiwuU9GzBxFTGCuGHyd0w= golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/events/contract.go b/internal/events/contract.go new file mode 100644 index 0000000..dc5003f --- /dev/null +++ b/internal/events/contract.go @@ -0,0 +1,11 @@ +package events + +import "go.neonxp.dev/djson/internal/model" + +type Dispatcher interface { + Subscribe(path []string, id string, ch chan model.Mutation) + + Unsubscribe(path []string, id string) + + Notify(path []string, event *model.Mutation) +} diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 0000000..49731b8 --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,34 @@ +package events + +import ( + "sync" + + "go.neonxp.dev/djson/internal/model" +) + +type stdDispatcher struct { + tree subscriberNode +} + +func New() Dispatcher { + return &stdDispatcher{ + tree: subscriberNode{ + children: make(map[string]*subscriberNode), + channels: make(map[string]chan model.Mutation), + parent: nil, + mu: sync.RWMutex{}, + }, + } +} + +func (ed *stdDispatcher) Subscribe(path []string, id string, ch chan model.Mutation) { + ed.tree.subscribe(path, id, ch) +} + +func (ed *stdDispatcher) Unsubscribe(path []string, id string) { + ed.tree.unsubscribe(path, id) +} + +func (ed *stdDispatcher) Notify(path []string, event *model.Mutation) { + ed.tree.notify(path, event) +} diff --git a/internal/events/node.go b/internal/events/node.go new file mode 100644 index 0000000..a1d9c3e --- /dev/null +++ b/internal/events/node.go @@ -0,0 +1,69 @@ +package events + +import ( + "sync" + + "go.neonxp.dev/djson/internal/model" +) + +type subscriberNode struct { + parent *subscriberNode + children map[string]*subscriberNode + channels map[string]chan model.Mutation + mu sync.RWMutex +} + +func (sn *subscriberNode) subscribe(path []string, id string, ch chan model.Mutation) { + sn.mu.Lock() + defer sn.mu.Unlock() + if len(path) == 0 { + sn.channels[id] = ch + return + } + head, rest := path[0], path[1:] + child, ok := sn.children[head] + if !ok { + child = &subscriberNode{ + parent: sn, + children: make(map[string]*subscriberNode), + channels: make(map[string]chan model.Mutation), + } + sn.children[head] = child + } + if len(rest) == 0 { + child.channels[id] = ch + return + } + child.subscribe(rest, id, ch) +} + +func (sn *subscriberNode) unsubscribe(path []string, id string) { + sn.mu.Lock() + defer sn.mu.Unlock() + if len(path) == 0 { + close(sn.channels[id]) + delete(sn.channels, id) + return + } + head, rest := path[0], path[1:] + if child, ok := sn.children[head]; ok { + child.unsubscribe(rest, id) + } +} + +func (sn *subscriberNode) notify(path []string, event *model.Mutation) { + sn.mu.RLock() + defer sn.mu.RUnlock() + for _, ch := range sn.channels { + go func(ch chan model.Mutation) { + ch <- *event + }(ch) + } + if len(path) == 0 { + return + } + head, rest := path[0], path[1:] + if child, ok := sn.children[head]; ok { + child.notify(rest, event) + } +} diff --git a/internal/handler/contract.go b/internal/handler/contract.go index ed79158..8887be8 100644 --- a/internal/handler/contract.go +++ b/internal/handler/contract.go @@ -7,6 +7,7 @@ import ( ) type Handler interface { - Handle(r chi.Router) + HandleCRUD(r chi.Router) + HandleEvents(r chi.Router) Hearthbeat(w http.ResponseWriter, r *http.Request) } diff --git a/internal/handler/tree.go b/internal/handler/crud.go index e574b2b..4ab0c7f 100644 --- a/internal/handler/tree.go +++ b/internal/handler/crud.go @@ -3,29 +3,15 @@ package handler import ( "io" "net/http" - "strings" "time" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" - "go.neonxp.dev/json" - jsonModel "go.neonxp.dev/json/model" - "go.neonxp.dev/djson/internal/model" - "go.neonxp.dev/djson/internal/tree" + "go.neonxp.dev/json" ) -func New(core tree.Core) Handler { - return &handler{ - core: core, - } -} - -type handler struct { - core tree.Core -} - -func (h *handler) Handle(r chi.Router) { +func (h *handler) HandleCRUD(r chi.Router) { r.Use(middleware.CleanPath) r.Use(middleware.StripSlashes) @@ -65,7 +51,7 @@ func (h *handler) Handle(r chi.Router) { Path: path, Body: node, } - if err := h.core.Mutation(r.Context(), mutation); err != nil { + if err := h.core.Mutate(r.Context(), mutation); err != nil { writeError(http.StatusInternalServerError, err, w) return } @@ -92,7 +78,7 @@ func (h *handler) Handle(r chi.Router) { Path: path, Body: node, } - if err := h.core.Mutation(r.Context(), mutation); err != nil { + if err := h.core.Mutate(r.Context(), mutation); err != nil { writeError(http.StatusInternalServerError, err, w) return } @@ -108,40 +94,10 @@ func (h *handler) Handle(r chi.Router) { Path: path, Body: nil, } - if err := h.core.Mutation(r.Context(), mutation); err != nil { + if err := h.core.Mutate(r.Context(), mutation); err != nil { writeError(http.StatusInternalServerError, err, w) return } w.WriteHeader(http.StatusNoContent) }) } - -func (h *handler) Hearthbeat(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/plain") - switch h.core.State() { - case tree.Ready: - w.WriteHeader(http.StatusOK) - _, _ = w.Write([]byte(".")) - case tree.Failed: - w.WriteHeader(http.StatusInternalServerError) - _, _ = w.Write([]byte("start failed")) - case tree.Running: - w.WriteHeader(http.StatusServiceUnavailable) - _, _ = w.Write([]byte("starting...")) - } -} - -func writeError(code int, err error, w http.ResponseWriter) { - jsonErr, _ := json.Marshal(jsonModel.NewNode(err.Error())) - _, _ = w.Write(jsonErr) -} - -func parsePath(nodePath string) []string { - arr := []string{} - for _, v := range strings.Split(nodePath, "/") { - if v != "" { - arr = append(arr, v) - } - } - return arr -} diff --git a/internal/handler/events.go b/internal/handler/events.go new file mode 100644 index 0000000..299971d --- /dev/null +++ b/internal/handler/events.go @@ -0,0 +1,67 @@ +package handler + +import ( + "fmt" + "net/http" + "strings" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + dmodel "go.neonxp.dev/djson/internal/model" +) + +func (h *handler) HandleEvents(r chi.Router) { + r.Route("/sse", func(r chi.Router) { + r.Get("/*", func(w http.ResponseWriter, r *http.Request) { + h.receiveEvents(w, r, func(ev *dmodel.Mutation) { + message, _ := ev.Body.MarshalJSON() + fmt.Fprintf( + w, + `event: %s\ndata: {"path":"%s","data":%s}\n\n`, + ev.Type.String(), + strings.Join(ev.Path, "/"), + message, + ) + }) + }) + }) + r.Route("/json", func(r chi.Router) { + r.Get("/*", func(w http.ResponseWriter, r *http.Request) { + h.receiveEvents(w, r, func(ev *dmodel.Mutation) { + message, _ := ev.Body.MarshalJSON() + fmt.Fprintf( + w, + `{"event":"%s","path":"%s","data":"%s"}\n`, + ev.Type.String(), + strings.Join(ev.Path, "/"), + message, + ) + }) + }) + }) +} + +func (h *handler) receiveEvents( + w http.ResponseWriter, + r *http.Request, + render func(ev *dmodel.Mutation), +) { + flusher := w.(http.Flusher) + w.Header().Set("Content-Type", "application/x-ndjson") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + rctx := chi.RouteContext(r.Context()) + path := parsePath(rctx.RoutePath) + ch := make(chan dmodel.Mutation) + reqID := middleware.GetReqID(r.Context()) + h.events.Subscribe(path, reqID, ch) + defer h.events.Unsubscribe(path, reqID) + go func() { + for ev := range ch { + render(&ev) + flusher.Flush() + } + }() + <-r.Context().Done() +} diff --git a/internal/handler/handler.go b/internal/handler/handler.go new file mode 100644 index 0000000..11c1936 --- /dev/null +++ b/internal/handler/handler.go @@ -0,0 +1,39 @@ +package handler + +import ( + "net/http" + "strings" + + "go.neonxp.dev/json" + jsonModel "go.neonxp.dev/json/model" + + "go.neonxp.dev/djson/internal/events" + "go.neonxp.dev/djson/internal/tree" +) + +func New(core tree.Core, eventsDispatcher events.Dispatcher) Handler { + return &handler{ + core: core, + events: eventsDispatcher, + } +} + +type handler struct { + core tree.Core + events events.Dispatcher +} + +func writeError(code int, err error, w http.ResponseWriter) { + jsonErr, _ := json.Marshal(jsonModel.NewNode(err.Error())) + _, _ = w.Write(jsonErr) +} + +func parsePath(nodePath string) []string { + arr := []string{} + for _, v := range strings.Split(nodePath, "/") { + if v != "" { + arr = append(arr, v) + } + } + return arr +} diff --git a/internal/handler/hearthbeat.go b/internal/handler/hearthbeat.go new file mode 100644 index 0000000..014880e --- /dev/null +++ b/internal/handler/hearthbeat.go @@ -0,0 +1,22 @@ +package handler + +import ( + "net/http" + + "go.neonxp.dev/djson/internal/tree" +) + +func (h *handler) Hearthbeat(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain") + switch h.core.State() { + case tree.Ready: + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(".")) + case tree.Failed: + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte("start failed")) + case tree.Running: + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte("starting...")) + } +} diff --git a/internal/tree/contract.go b/internal/tree/contract.go index b949339..29748cb 100644 --- a/internal/tree/contract.go +++ b/internal/tree/contract.go @@ -10,7 +10,7 @@ import ( type Core interface { Init() error Get(nodes []string) (model.Node, error) - Mutation(ctx context.Context, mut *dmodel.Mutation) error + Mutate(ctx context.Context, mut *dmodel.Mutation) error State() CoreState } diff --git a/internal/tree/core.go b/internal/tree/core.go index aa47fb6..d2cace3 100644 --- a/internal/tree/core.go +++ b/internal/tree/core.go @@ -3,23 +3,26 @@ package tree import ( "sync" + "go.neonxp.dev/djson/internal/events" "go.neonxp.dev/djson/internal/storage" "go.neonxp.dev/json/model" ) type stdCore struct { - Root model.ObjectNode - state CoreState - mu sync.RWMutex - storage storage.Storage + Root model.ObjectNode + state CoreState + mu sync.RWMutex + storage storage.Storage + eventDispatcher events.Dispatcher } -func New(storage storage.Storage) Core { +func New(storage storage.Storage, eventsDispatcher events.Dispatcher) Core { return &stdCore{ - Root: model.ObjectNode{}, - state: Running, - mu: sync.RWMutex{}, - storage: storage, + Root: model.ObjectNode{}, + state: Running, + mu: sync.RWMutex{}, + storage: storage, + eventDispatcher: eventsDispatcher, } } diff --git a/internal/tree/mutations.go b/internal/tree/mutations.go index 9aa9056..173828d 100644 --- a/internal/tree/mutations.go +++ b/internal/tree/mutations.go @@ -9,7 +9,7 @@ import ( json "go.neonxp.dev/json/model" ) -func (t *stdCore) Mutation(ctx context.Context, mut *model.Mutation) error { +func (t *stdCore) Mutate(ctx context.Context, mut *model.Mutation) error { t.mu.Lock() defer t.mu.Unlock() if err := t.execute(mut); err != nil { @@ -40,7 +40,9 @@ func (t *stdCore) execute(mut *model.Mutation) error { if !ok { return fmt.Errorf("node %s is not object", strings.Join(path, "/")) } - return targetObject.Value.Set(key, mut.Body) + if err := targetObject.Value.Set(key, mut.Body); err != nil { + return err + } case model.Merge: inObject, ok := mut.Body.(*json.ObjectNode) if !ok { @@ -60,7 +62,6 @@ func (t *stdCore) execute(mut *model.Mutation) error { return fmt.Errorf("patch allowed only for objects") } targetObject.Merge(inObject) - return nil case model.Remove: if len(mut.Path) == 0 { return fmt.Errorf("can't remove root node. Only replace or create avaliable") @@ -80,7 +81,9 @@ func (t *stdCore) execute(mut *model.Mutation) error { return fmt.Errorf("remove allowed only from objects") } targetObject.Remove(key) - return nil + default: + return fmt.Errorf("invalid command type: %d", mut.Type) } - return fmt.Errorf("invalid command type: %d", mut.Type) + t.eventDispatcher.Notify(mut.Path, mut) + return nil } |