package handler
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"go.neonxp.dev/djson/internal/command"
)
func (h *handler) HandleEvents(r chi.Router) {
r.Route("/sse", func(r chi.Router) {
r.Get("/*", func(w http.ResponseWriter, r *http.Request) {
h.receiveEvents(w, r, func(ev *command.Mutation) {
message, _ := json.Marshal(ev.Data)
fmt.Fprintf(
w,
`event: %s\ndata: {"path":"%s","data":%s}\n\n`,
ev.Type.String(),
strings.Join(ev.Path, "/"),
message,
)
})
})
})
r.Route("/json", func(r chi.Router) {
r.Get("/*", func(w http.ResponseWriter, r *http.Request) {
h.receiveEvents(w, r, func(ev *command.Mutation) {
message, _ := json.Marshal(ev.Data)
fmt.Fprintf(
w,
`{"event":"%s","path":"%s","data":"%s"}\n`,
ev.Type.String(),
strings.Join(ev.Path, "/"),
message,
)
})
})
})
}
func (h *handler) receiveEvents(
w http.ResponseWriter,
r *http.Request,
render func(ev *command.Mutation),
) {
flusher := w.(http.Flusher)
w.Header().Set("Content-Type", "application/x-ndjson")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
rctx := chi.RouteContext(r.Context())
path := parsePath(rctx.RoutePath)
ch := make(chan command.Mutation)
reqID := middleware.GetReqID(r.Context())
h.events.Subscribe(path, reqID, ch)
defer h.events.Unsubscribe(path, reqID)
go func() {
for ev := range ch {
render(&ev)
flusher.Flush()
}
}()
<-r.Context().Done()
}