diff options
-rw-r--r-- | cmd/api/main.go | 68 | ||||
-rw-r--r-- | go.mod | 14 | ||||
-rw-r--r-- | go.sum | 16 | ||||
-rw-r--r-- | internal/api/handler.go | 155 | ||||
-rw-r--r-- | internal/model/commandtype_string.go | 25 | ||||
-rw-r--r-- | internal/model/mutations.go | 36 | ||||
-rw-r--r-- | internal/storage/file.go | 66 | ||||
-rw-r--r-- | internal/storage/storage.go | 14 | ||||
-rw-r--r-- | internal/tree/engine.go | 42 | ||||
-rw-r--r-- | internal/tree/mutations.go | 60 |
10 files changed, 496 insertions, 0 deletions
diff --git a/cmd/api/main.go b/cmd/api/main.go new file mode 100644 index 0000000..f64b43c --- /dev/null +++ b/cmd/api/main.go @@ -0,0 +1,68 @@ +package main + +import ( + "context" + "flag" + "log" + "net/http" + "os" + "os/signal" + + "go.neonxp.dev/djson/internal/api" + "go.neonxp.dev/djson/internal/storage" + "go.neonxp.dev/djson/internal/tree" + "golang.org/x/sync/errgroup" +) + +var ( + cluster = flag.String("cluster", "", "Master node address (ex: 192.168.1.10:5657)") + clusterAddr = flag.String("cluster_addr", ":5657", "Self address for cluster") + apiAddr = flag.String("api_addr", ":5656", "API address") +) + +func main() { + flag.Parse() + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer cancel() + + if cluster != nil { + log.Printf("Connecting to cluster %s", *cluster) + } else { + log.Printf("Creating cluster at %s", *clusterAddr) + } + storage, err := storage.NewFileStorage("test.wal") + if err != nil { + log.Fatal(err) + } + service := tree.New( + storage, + ) + + server := http.Server{ + Addr: *apiAddr, + Handler: api.NewHandler(service), + } + + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { + return service.Run(ctx) + }) + + eg.Go(func() error { + log.Printf("Server started at %s", *apiAddr) + if err := server.ListenAndServe(); err != http.ErrServerClosed { + return err + } + return nil + }) + + eg.Go(func() error { + <-ctx.Done() + return server.Close() + }) + + if err := eg.Wait(); err != nil { + log.Fatal(err) + } +} @@ -0,0 +1,14 @@ +module go.neonxp.dev/djson + +go 1.19 + +require ( + github.com/vmihailenco/msgpack/v5 v5.3.5 + go.neonxp.dev/json v0.0.2 + golang.org/x/sync v0.1.0 +) + +require ( + github.com/gohobby/deepcopy v1.0.1 + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect +) @@ -0,0 +1,16 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gohobby/deepcopy v1.0.1 h1:PTUkRUVlg+Mph0f0RIKEmJJRcOClBrNJlk0weRtMcIs= +github.com/gohobby/deepcopy v1.0.1/go.mod h1:J/RNFAQlLyrU0ZfJ86LUh9o7vdvn9C6HRvHfIsV1YFk= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU= +github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +go.neonxp.dev/json v0.0.2 h1:wrq3Xm70qWrAVUQdfpCR5+cwzJ2QaVFMiGTzQn2dtAw= +go.neonxp.dev/json v0.0.2/go.mod h1:d+el/XRG46QCbCsly5PgUqyiwuU9GzBxFTGCuGHyd0w= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/api/handler.go b/internal/api/handler.go new file mode 100644 index 0000000..0e51807 --- /dev/null +++ b/internal/api/handler.go @@ -0,0 +1,155 @@ +package api + +import ( + "context" + "fmt" + "io" + "log" + "net/http" + "path/filepath" + "strings" + "time" + + "go.neonxp.dev/djson/internal/model" + "go.neonxp.dev/djson/internal/tree" + "go.neonxp.dev/json" +) + +func NewHandler(tree *tree.Engine) *Handler { + return &Handler{tree: tree} +} + +type Handler struct { + tree *tree.Engine +} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + res, err := h.get(r.Context(), r.URL.Path) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Println(err.Error()) + return + } + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Println(err.Error()) + return + } + result, err := res.MarshalJSON() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Println(err.Error()) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write(result) + case http.MethodPost: + jsonBody, err := io.ReadAll(r.Body) + r.Body.Close() + + if err := h.post(r.Context(), r.URL.Path, jsonBody); err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Println(err.Error()) + return + } + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Println(err.Error()) + return + } + w.WriteHeader(http.StatusCreated) + case http.MethodPatch: + jsonBody, err := io.ReadAll(r.Body) + r.Body.Close() + + if err := h.patch(r.Context(), r.URL.Path, jsonBody); err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Println(err.Error()) + return + } + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Println(err.Error()) + return + } + w.WriteHeader(http.StatusOK) + case http.MethodDelete: + if err := h.remove(r.Context(), r.URL.Path); err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Println(err.Error()) + return + } + w.WriteHeader(http.StatusNoContent) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + _, _ = w.Write([]byte(fmt.Sprintf("Unknown method: %s", r.Method))) + } +} + +func (h *Handler) get(ctx context.Context, p string) (Marshaller, error) { + return h.tree.Get(parsePath(p)) +} + +func (h *Handler) post(ctx context.Context, p string, jsonBody []byte) error { + node, err := json.Unmarshal(jsonBody) + if err != nil { + return err + } + + return h.tree.Mutation(ctx, &model.Mutation{ + Date: time.Now(), + Type: model.Create, + Path: parsePath(p), + Body: *node, + }) +} + +func (h *Handler) patch(ctx context.Context, p string, jsonBody []byte) error { + node, err := json.Unmarshal(jsonBody) + if err != nil { + return err + } + + return h.tree.Mutation(ctx, &model.Mutation{ + Date: time.Now(), + Type: model.Merge, + Path: parsePath(p), + Body: *node, + }) +} + +func (h *Handler) remove(ctx context.Context, p string) error { + return h.tree.Mutation(ctx, &model.Mutation{ + Date: time.Now(), + Type: model.Remove, + Path: parsePath(p), + }) +} + +type ErrorStruct struct { + Error string `json:"error"` +} + +func newError(err error) *ErrorStruct { + return &ErrorStruct{ + Error: err.Error(), + } +} + +type Marshaller interface { + MarshalJSON() ([]byte, error) +} + +func parsePath(nodePath string) []string { + nodePath = filepath.Clean(nodePath) + nodePath = strings.Trim(nodePath, "/") + arr := []string{} + for _, v := range strings.Split(nodePath, "/") { + if v != "" { + arr = append(arr, v) + } + } + return arr +} diff --git a/internal/model/commandtype_string.go b/internal/model/commandtype_string.go new file mode 100644 index 0000000..1ad4635 --- /dev/null +++ b/internal/model/commandtype_string.go @@ -0,0 +1,25 @@ +// Code generated by "stringer -type=CommandType"; DO NOT EDIT. + +package model + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[Create-0] + _ = x[Merge-1] + _ = x[Remove-2] +} + +const _CommandType_name = "CreateMergeRemove" + +var _CommandType_index = [...]uint8{0, 6, 11, 17} + +func (i CommandType) String() string { + if i < 0 || i >= CommandType(len(_CommandType_index)-1) { + return "CommandType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _CommandType_name[_CommandType_index[i]:_CommandType_index[i+1]] +} diff --git a/internal/model/mutations.go b/internal/model/mutations.go new file mode 100644 index 0000000..6463823 --- /dev/null +++ b/internal/model/mutations.go @@ -0,0 +1,36 @@ +package model + +import ( + "fmt" + "strings" + "time" + + "go.neonxp.dev/json/model" +) + +type Mutation struct { + Date time.Time + Type CommandType + Path []string + Body model.Node +} + +func (m *Mutation) String() string { + body, _ := m.Body.MarshalJSON() + return fmt.Sprintf( + "Date=%s Type=%s Path='%s' Body=%s", + m.Date.Format(time.RFC3339), + m.Type, + strings.Join(m.Path, "/"), + string(body), + ) +} + +//go:generate stringer -type=CommandType +type CommandType int + +const ( + Create CommandType = iota + Merge + Remove +) diff --git a/internal/storage/file.go b/internal/storage/file.go new file mode 100644 index 0000000..7619e32 --- /dev/null +++ b/internal/storage/file.go @@ -0,0 +1,66 @@ +package storage + +import ( + "context" + "encoding/gob" + "fmt" + "io" + "log" + "os" + + "go.neonxp.dev/djson/internal/model" +) + +type FileStorage struct { + enc *gob.Encoder + dec *gob.Decoder + fh *os.File + fileName string + mutationsLog []model.Mutation +} + +func NewFileStorage(fileName string) (*FileStorage, error) { + fh, err := os.OpenFile(fileName, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0o666) + if err != nil { + return nil, err + } + + return &FileStorage{ + fileName: fileName, + fh: fh, + enc: gob.NewEncoder(fh), + dec: gob.NewDecoder(fh), + mutationsLog: []model.Mutation{}, + }, nil +} + +func (fs *FileStorage) Commit(ctx context.Context, mut model.Mutation) error { + if fs.enc == nil { + return fmt.Errorf("file storage not initiated") + } + return fs.enc.Encode(mut) +} + +func (fs *FileStorage) Load() chan model.Mutation { + ch := make(chan model.Mutation) + go func() { + for { + m := model.Mutation{} + if err := fs.dec.Decode(&m); err != nil { + if err != io.EOF { + log.Println(err.Error()) + } + close(ch) + return + } + log.Println("Loaded from fs", m.String()) + fs.mutationsLog = append(fs.mutationsLog, m) + ch <- m + } + }() + return ch +} + +func (fs *FileStorage) Close() error { + return fs.fh.Close() +} diff --git a/internal/storage/storage.go b/internal/storage/storage.go new file mode 100644 index 0000000..5847bcc --- /dev/null +++ b/internal/storage/storage.go @@ -0,0 +1,14 @@ +package storage + +import ( + "context" + "io" + + "go.neonxp.dev/djson/internal/model" +) + +type Storage interface { + io.Closer + Commit(ctx context.Context, mut model.Mutation) error + Load() chan model.Mutation +} diff --git a/internal/tree/engine.go b/internal/tree/engine.go new file mode 100644 index 0000000..0f28eb7 --- /dev/null +++ b/internal/tree/engine.go @@ -0,0 +1,42 @@ +package tree + +import ( + "context" + "sync" + + "go.neonxp.dev/djson/internal/storage" + "go.neonxp.dev/json/model" +) + +type Engine struct { + Root model.Node + mu sync.RWMutex + storage storage.Storage +} + +func New(storage storage.Storage) *Engine { + return &Engine{ + Root: model.Node{}, + mu: sync.RWMutex{}, + storage: storage, + } +} + +func (t *Engine) Run(ctx context.Context) error { + // Load initial mutations + for m := range t.storage.Load() { + if err := t.execute(&m); err != nil { + return err + } + } + + <-ctx.Done() + return nil +} + +func (t *Engine) Get(nodes []string) (*model.Node, error) { + if len(nodes) == 0 { + return &t.Root, nil + } + return t.Root.Query(nodes) +} diff --git a/internal/tree/mutations.go b/internal/tree/mutations.go new file mode 100644 index 0000000..ec80ebd --- /dev/null +++ b/internal/tree/mutations.go @@ -0,0 +1,60 @@ +package tree + +import ( + "context" + "fmt" + + "go.neonxp.dev/djson/internal/model" +) + +func (t *Engine) Mutation(ctx context.Context, mut *model.Mutation) error { + t.mu.Lock() + defer t.mu.Unlock() + if err := t.execute(mut); err != nil { + return err + } + return t.storage.Commit(ctx, *mut) +} + +func (t *Engine) execute(mut *model.Mutation) error { + switch mut.Type { + case model.Create: + if len(mut.Path) == 0 { + // create root node + t.Root = mut.Body + return nil + } + key := mut.Path[len(mut.Path)-1] + path := mut.Path[:len(mut.Path)-1] + target, err := t.Root.Query(path) + if err != nil { + return err + } + return target.Set(key, mut.Body) + case model.Merge: + if len(mut.Path) == 0 { + // patch root node + return t.Root.Merge(&mut.Body) + } + target, err := t.Root.Query(mut.Path) + if err != nil { + return err + } + return target.Merge(&mut.Body) + case model.Remove: + if len(mut.Path) == 0 { + return fmt.Errorf("can't remove root node. Only replace or create avaliable") + } + key := mut.Path[len(mut.Path)-1] + if len(mut.Path) == 1 { + return t.Root.Remove(key) + } + path := mut.Path[:len(mut.Path)-1] + target, err := t.Root.Query(path) + if err != nil { + return err + } + return target.Remove(key) + } + return fmt.Errorf("invalid command type: %d", mut.Type) +} |