summaryrefslogtreecommitdiff
path: root/internal/handler/events.go
diff options
context:
space:
mode:
authorNeonXP <i@neonxp.dev>2022-12-09 03:36:06 +0300
committerNeonXP <i@neonxp.dev>2022-12-09 03:36:06 +0300
commit2055e57de00865607223401a890125909080f4a3 (patch)
tree73a74ec3370f7211601302bd77bada10b75ff665 /internal/handler/events.go
parent9d46ca252151a2c48434f9ec201bcb3c9133ec78 (diff)
events added
Diffstat (limited to 'internal/handler/events.go')
-rw-r--r--internal/handler/events.go67
1 files changed, 67 insertions, 0 deletions
diff --git a/internal/handler/events.go b/internal/handler/events.go
new file mode 100644
index 0000000..299971d
--- /dev/null
+++ b/internal/handler/events.go
@@ -0,0 +1,67 @@
+package handler
+
+import (
+ "fmt"
+ "net/http"
+ "strings"
+
+ "github.com/go-chi/chi/v5"
+ "github.com/go-chi/chi/v5/middleware"
+ dmodel "go.neonxp.dev/djson/internal/model"
+)
+
+func (h *handler) HandleEvents(r chi.Router) {
+ r.Route("/sse", func(r chi.Router) {
+ r.Get("/*", func(w http.ResponseWriter, r *http.Request) {
+ h.receiveEvents(w, r, func(ev *dmodel.Mutation) {
+ message, _ := ev.Body.MarshalJSON()
+ fmt.Fprintf(
+ w,
+ `event: %s\ndata: {"path":"%s","data":%s}\n\n`,
+ ev.Type.String(),
+ strings.Join(ev.Path, "/"),
+ message,
+ )
+ })
+ })
+ })
+ r.Route("/json", func(r chi.Router) {
+ r.Get("/*", func(w http.ResponseWriter, r *http.Request) {
+ h.receiveEvents(w, r, func(ev *dmodel.Mutation) {
+ message, _ := ev.Body.MarshalJSON()
+ fmt.Fprintf(
+ w,
+ `{"event":"%s","path":"%s","data":"%s"}\n`,
+ ev.Type.String(),
+ strings.Join(ev.Path, "/"),
+ message,
+ )
+ })
+ })
+ })
+}
+
+func (h *handler) receiveEvents(
+ w http.ResponseWriter,
+ r *http.Request,
+ render func(ev *dmodel.Mutation),
+) {
+ flusher := w.(http.Flusher)
+ w.Header().Set("Content-Type", "application/x-ndjson")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ rctx := chi.RouteContext(r.Context())
+ path := parsePath(rctx.RoutePath)
+ ch := make(chan dmodel.Mutation)
+ reqID := middleware.GetReqID(r.Context())
+ h.events.Subscribe(path, reqID, ch)
+ defer h.events.Unsubscribe(path, reqID)
+ go func() {
+ for ev := range ch {
+ render(&ev)
+ flusher.Flush()
+ }
+ }()
+ <-r.Context().Done()
+}