summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNeonXP <i@neonxp.dev>2022-11-21 03:46:30 +0300
committerNeonXP <i@neonxp.dev>2022-11-21 03:46:30 +0300
commit340a623e1a35efe0182cadd780a5a3385b526705 (patch)
tree7baad78e20b922b5487fa243c68b5b6137c7e1c4
initial
-rw-r--r--cmd/api/main.go68
-rw-r--r--go.mod14
-rw-r--r--go.sum16
-rw-r--r--internal/api/handler.go155
-rw-r--r--internal/model/commandtype_string.go25
-rw-r--r--internal/model/mutations.go36
-rw-r--r--internal/storage/file.go66
-rw-r--r--internal/storage/storage.go14
-rw-r--r--internal/tree/engine.go42
-rw-r--r--internal/tree/mutations.go60
10 files changed, 496 insertions, 0 deletions
diff --git a/cmd/api/main.go b/cmd/api/main.go
new file mode 100644
index 0000000..f64b43c
--- /dev/null
+++ b/cmd/api/main.go
@@ -0,0 +1,68 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "log"
+ "net/http"
+ "os"
+ "os/signal"
+
+ "go.neonxp.dev/djson/internal/api"
+ "go.neonxp.dev/djson/internal/storage"
+ "go.neonxp.dev/djson/internal/tree"
+ "golang.org/x/sync/errgroup"
+)
+
+var (
+ cluster = flag.String("cluster", "", "Master node address (ex: 192.168.1.10:5657)")
+ clusterAddr = flag.String("cluster_addr", ":5657", "Self address for cluster")
+ apiAddr = flag.String("api_addr", ":5656", "API address")
+)
+
+func main() {
+ flag.Parse()
+ ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
+ defer cancel()
+
+ if cluster != nil {
+ log.Printf("Connecting to cluster %s", *cluster)
+ } else {
+ log.Printf("Creating cluster at %s", *clusterAddr)
+ }
+ storage, err := storage.NewFileStorage("test.wal")
+ if err != nil {
+ log.Fatal(err)
+ }
+ service := tree.New(
+ storage,
+ )
+
+ server := http.Server{
+ Addr: *apiAddr,
+ Handler: api.NewHandler(service),
+ }
+
+ eg, ctx := errgroup.WithContext(ctx)
+
+ eg.Go(func() error {
+ return service.Run(ctx)
+ })
+
+ eg.Go(func() error {
+ log.Printf("Server started at %s", *apiAddr)
+ if err := server.ListenAndServe(); err != http.ErrServerClosed {
+ return err
+ }
+ return nil
+ })
+
+ eg.Go(func() error {
+ <-ctx.Done()
+ return server.Close()
+ })
+
+ if err := eg.Wait(); err != nil {
+ log.Fatal(err)
+ }
+}
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..93e2040
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,14 @@
+module go.neonxp.dev/djson
+
+go 1.19
+
+require (
+ github.com/vmihailenco/msgpack/v5 v5.3.5
+ go.neonxp.dev/json v0.0.2
+ golang.org/x/sync v0.1.0
+)
+
+require (
+ github.com/gohobby/deepcopy v1.0.1
+ github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..b02d2f1
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,16 @@
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/gohobby/deepcopy v1.0.1 h1:PTUkRUVlg+Mph0f0RIKEmJJRcOClBrNJlk0weRtMcIs=
+github.com/gohobby/deepcopy v1.0.1/go.mod h1:J/RNFAQlLyrU0ZfJ86LUh9o7vdvn9C6HRvHfIsV1YFk=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
+github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
+github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
+github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
+go.neonxp.dev/json v0.0.2 h1:wrq3Xm70qWrAVUQdfpCR5+cwzJ2QaVFMiGTzQn2dtAw=
+go.neonxp.dev/json v0.0.2/go.mod h1:d+el/XRG46QCbCsly5PgUqyiwuU9GzBxFTGCuGHyd0w=
+golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
+golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
diff --git a/internal/api/handler.go b/internal/api/handler.go
new file mode 100644
index 0000000..0e51807
--- /dev/null
+++ b/internal/api/handler.go
@@ -0,0 +1,155 @@
+package api
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "log"
+ "net/http"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "go.neonxp.dev/djson/internal/model"
+ "go.neonxp.dev/djson/internal/tree"
+ "go.neonxp.dev/json"
+)
+
+func NewHandler(tree *tree.Engine) *Handler {
+ return &Handler{tree: tree}
+}
+
+type Handler struct {
+ tree *tree.Engine
+}
+
+func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case http.MethodGet:
+ res, err := h.get(r.Context(), r.URL.Path)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Println(err.Error())
+ return
+ }
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Println(err.Error())
+ return
+ }
+ result, err := res.MarshalJSON()
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Println(err.Error())
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write(result)
+ case http.MethodPost:
+ jsonBody, err := io.ReadAll(r.Body)
+ r.Body.Close()
+
+ if err := h.post(r.Context(), r.URL.Path, jsonBody); err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Println(err.Error())
+ return
+ }
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Println(err.Error())
+ return
+ }
+ w.WriteHeader(http.StatusCreated)
+ case http.MethodPatch:
+ jsonBody, err := io.ReadAll(r.Body)
+ r.Body.Close()
+
+ if err := h.patch(r.Context(), r.URL.Path, jsonBody); err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Println(err.Error())
+ return
+ }
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Println(err.Error())
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ case http.MethodDelete:
+ if err := h.remove(r.Context(), r.URL.Path); err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ log.Println(err.Error())
+ return
+ }
+ w.WriteHeader(http.StatusNoContent)
+ default:
+ w.WriteHeader(http.StatusMethodNotAllowed)
+ _, _ = w.Write([]byte(fmt.Sprintf("Unknown method: %s", r.Method)))
+ }
+}
+
+func (h *Handler) get(ctx context.Context, p string) (Marshaller, error) {
+ return h.tree.Get(parsePath(p))
+}
+
+func (h *Handler) post(ctx context.Context, p string, jsonBody []byte) error {
+ node, err := json.Unmarshal(jsonBody)
+ if err != nil {
+ return err
+ }
+
+ return h.tree.Mutation(ctx, &model.Mutation{
+ Date: time.Now(),
+ Type: model.Create,
+ Path: parsePath(p),
+ Body: *node,
+ })
+}
+
+func (h *Handler) patch(ctx context.Context, p string, jsonBody []byte) error {
+ node, err := json.Unmarshal(jsonBody)
+ if err != nil {
+ return err
+ }
+
+ return h.tree.Mutation(ctx, &model.Mutation{
+ Date: time.Now(),
+ Type: model.Merge,
+ Path: parsePath(p),
+ Body: *node,
+ })
+}
+
+func (h *Handler) remove(ctx context.Context, p string) error {
+ return h.tree.Mutation(ctx, &model.Mutation{
+ Date: time.Now(),
+ Type: model.Remove,
+ Path: parsePath(p),
+ })
+}
+
+type ErrorStruct struct {
+ Error string `json:"error"`
+}
+
+func newError(err error) *ErrorStruct {
+ return &ErrorStruct{
+ Error: err.Error(),
+ }
+}
+
+type Marshaller interface {
+ MarshalJSON() ([]byte, error)
+}
+
+func parsePath(nodePath string) []string {
+ nodePath = filepath.Clean(nodePath)
+ nodePath = strings.Trim(nodePath, "/")
+ arr := []string{}
+ for _, v := range strings.Split(nodePath, "/") {
+ if v != "" {
+ arr = append(arr, v)
+ }
+ }
+ return arr
+}
diff --git a/internal/model/commandtype_string.go b/internal/model/commandtype_string.go
new file mode 100644
index 0000000..1ad4635
--- /dev/null
+++ b/internal/model/commandtype_string.go
@@ -0,0 +1,25 @@
+// Code generated by "stringer -type=CommandType"; DO NOT EDIT.
+
+package model
+
+import "strconv"
+
+func _() {
+ // An "invalid array index" compiler error signifies that the constant values have changed.
+ // Re-run the stringer command to generate them again.
+ var x [1]struct{}
+ _ = x[Create-0]
+ _ = x[Merge-1]
+ _ = x[Remove-2]
+}
+
+const _CommandType_name = "CreateMergeRemove"
+
+var _CommandType_index = [...]uint8{0, 6, 11, 17}
+
+func (i CommandType) String() string {
+ if i < 0 || i >= CommandType(len(_CommandType_index)-1) {
+ return "CommandType(" + strconv.FormatInt(int64(i), 10) + ")"
+ }
+ return _CommandType_name[_CommandType_index[i]:_CommandType_index[i+1]]
+}
diff --git a/internal/model/mutations.go b/internal/model/mutations.go
new file mode 100644
index 0000000..6463823
--- /dev/null
+++ b/internal/model/mutations.go
@@ -0,0 +1,36 @@
+package model
+
+import (
+ "fmt"
+ "strings"
+ "time"
+
+ "go.neonxp.dev/json/model"
+)
+
+type Mutation struct {
+ Date time.Time
+ Type CommandType
+ Path []string
+ Body model.Node
+}
+
+func (m *Mutation) String() string {
+ body, _ := m.Body.MarshalJSON()
+ return fmt.Sprintf(
+ "Date=%s Type=%s Path='%s' Body=%s",
+ m.Date.Format(time.RFC3339),
+ m.Type,
+ strings.Join(m.Path, "/"),
+ string(body),
+ )
+}
+
+//go:generate stringer -type=CommandType
+type CommandType int
+
+const (
+ Create CommandType = iota
+ Merge
+ Remove
+)
diff --git a/internal/storage/file.go b/internal/storage/file.go
new file mode 100644
index 0000000..7619e32
--- /dev/null
+++ b/internal/storage/file.go
@@ -0,0 +1,66 @@
+package storage
+
+import (
+ "context"
+ "encoding/gob"
+ "fmt"
+ "io"
+ "log"
+ "os"
+
+ "go.neonxp.dev/djson/internal/model"
+)
+
+type FileStorage struct {
+ enc *gob.Encoder
+ dec *gob.Decoder
+ fh *os.File
+ fileName string
+ mutationsLog []model.Mutation
+}
+
+func NewFileStorage(fileName string) (*FileStorage, error) {
+ fh, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o666)
+ if err != nil {
+ return nil, err
+ }
+
+ return &FileStorage{
+ fileName: fileName,
+ fh: fh,
+ enc: gob.NewEncoder(fh),
+ dec: gob.NewDecoder(fh),
+ mutationsLog: []model.Mutation{},
+ }, nil
+}
+
+func (fs *FileStorage) Commit(ctx context.Context, mut model.Mutation) error {
+ if fs.enc == nil {
+ return fmt.Errorf("file storage not initiated")
+ }
+ return fs.enc.Encode(mut)
+}
+
+func (fs *FileStorage) Load() chan model.Mutation {
+ ch := make(chan model.Mutation)
+ go func() {
+ for {
+ m := model.Mutation{}
+ if err := fs.dec.Decode(&m); err != nil {
+ if err != io.EOF {
+ log.Println(err.Error())
+ }
+ close(ch)
+ return
+ }
+ log.Println("Loaded from fs", m.String())
+ fs.mutationsLog = append(fs.mutationsLog, m)
+ ch <- m
+ }
+ }()
+ return ch
+}
+
+func (fs *FileStorage) Close() error {
+ return fs.fh.Close()
+}
diff --git a/internal/storage/storage.go b/internal/storage/storage.go
new file mode 100644
index 0000000..5847bcc
--- /dev/null
+++ b/internal/storage/storage.go
@@ -0,0 +1,14 @@
+package storage
+
+import (
+ "context"
+ "io"
+
+ "go.neonxp.dev/djson/internal/model"
+)
+
+type Storage interface {
+ io.Closer
+ Commit(ctx context.Context, mut model.Mutation) error
+ Load() chan model.Mutation
+}
diff --git a/internal/tree/engine.go b/internal/tree/engine.go
new file mode 100644
index 0000000..0f28eb7
--- /dev/null
+++ b/internal/tree/engine.go
@@ -0,0 +1,42 @@
+package tree
+
+import (
+ "context"
+ "sync"
+
+ "go.neonxp.dev/djson/internal/storage"
+ "go.neonxp.dev/json/model"
+)
+
+type Engine struct {
+ Root model.Node
+ mu sync.RWMutex
+ storage storage.Storage
+}
+
+func New(storage storage.Storage) *Engine {
+ return &Engine{
+ Root: model.Node{},
+ mu: sync.RWMutex{},
+ storage: storage,
+ }
+}
+
+func (t *Engine) Run(ctx context.Context) error {
+ // Load initial mutations
+ for m := range t.storage.Load() {
+ if err := t.execute(&m); err != nil {
+ return err
+ }
+ }
+
+ <-ctx.Done()
+ return nil
+}
+
+func (t *Engine) Get(nodes []string) (*model.Node, error) {
+ if len(nodes) == 0 {
+ return &t.Root, nil
+ }
+ return t.Root.Query(nodes)
+}
diff --git a/internal/tree/mutations.go b/internal/tree/mutations.go
new file mode 100644
index 0000000..ec80ebd
--- /dev/null
+++ b/internal/tree/mutations.go
@@ -0,0 +1,60 @@
+package tree
+
+import (
+ "context"
+ "fmt"
+
+ "go.neonxp.dev/djson/internal/model"
+)
+
+func (t *Engine) Mutation(ctx context.Context, mut *model.Mutation) error {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ if err := t.execute(mut); err != nil {
+ return err
+ }
+ return t.storage.Commit(ctx, *mut)
+}
+
+func (t *Engine) execute(mut *model.Mutation) error {
+ switch mut.Type {
+ case model.Create:
+ if len(mut.Path) == 0 {
+ // create root node
+ t.Root = mut.Body
+ return nil
+ }
+ key := mut.Path[len(mut.Path)-1]
+ path := mut.Path[:len(mut.Path)-1]
+ target, err := t.Root.Query(path)
+ if err != nil {
+ return err
+ }
+ return target.Set(key, mut.Body)
+ case model.Merge:
+ if len(mut.Path) == 0 {
+ // patch root node
+ return t.Root.Merge(&mut.Body)
+ }
+ target, err := t.Root.Query(mut.Path)
+ if err != nil {
+ return err
+ }
+ return target.Merge(&mut.Body)
+ case model.Remove:
+ if len(mut.Path) == 0 {
+ return fmt.Errorf("can't remove root node. Only replace or create avaliable")
+ }
+ key := mut.Path[len(mut.Path)-1]
+ if len(mut.Path) == 1 {
+ return t.Root.Remove(key)
+ }
+ path := mut.Path[:len(mut.Path)-1]
+ target, err := t.Root.Query(path)
+ if err != nil {
+ return err
+ }
+ return target.Remove(key)
+ }
+ return fmt.Errorf("invalid command type: %d", mut.Type)
+}