summaryrefslogblamecommitdiff
path: root/internal/handler/events.go
blob: 43de3c4a200b1585c053bb5ce5826c5d46a1a567 (plain) (tree)
1
2
3
4
5
6
7
8
9
10


               
                       





                                             

                                              




                                                                          

                                                                          











                                                                                       

                                                                          














                                                                                   
                                          







                                                              
                                         










                                                 
package handler

import (
	"encoding/json"
	"fmt"
	"net/http"
	"strings"

	"github.com/go-chi/chi/v5"
	"github.com/go-chi/chi/v5/middleware"

	"go.neonxp.dev/djson/internal/command"
)

func (h *handler) HandleEvents(r chi.Router) {
	r.Route("/sse", func(r chi.Router) {
		r.Get("/*", func(w http.ResponseWriter, r *http.Request) {
			h.receiveEvents(w, r, func(ev *command.Mutation) {
				message, _ := json.Marshal(ev.Data)
				fmt.Fprintf(
					w,
					`event: %s\ndata: {"path":"%s","data":%s}\n\n`,
					ev.Type.String(),
					strings.Join(ev.Path, "/"),
					message,
				)
			})
		})
	})
	r.Route("/json", func(r chi.Router) {
		r.Get("/*", func(w http.ResponseWriter, r *http.Request) {
			h.receiveEvents(w, r, func(ev *command.Mutation) {
				message, _ := json.Marshal(ev.Data)
				fmt.Fprintf(
					w,
					`{"event":"%s","path":"%s","data":"%s"}\n`,
					ev.Type.String(),
					strings.Join(ev.Path, "/"),
					message,
				)
			})
		})
	})
}

func (h *handler) receiveEvents(
	w http.ResponseWriter,
	r *http.Request,
	render func(ev *command.Mutation),
) {
	flusher := w.(http.Flusher)
	w.Header().Set("Content-Type", "application/x-ndjson")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("Access-Control-Allow-Origin", "*")
	rctx := chi.RouteContext(r.Context())
	path := parsePath(rctx.RoutePath)
	ch := make(chan command.Mutation)
	reqID := middleware.GetReqID(r.Context())
	h.events.Subscribe(path, reqID, ch)
	defer h.events.Unsubscribe(path, reqID)
	go func() {
		for ev := range ch {
			render(&ev)
			flusher.Flush()
		}
	}()
	<-r.Context().Done()
}