Server-sent events example
A working Server-Sent Events (SSE) endpoint on MuxMaster with the opt-in PoolRequestBundle enabled. The example pairs a streaming /events/:topic route with a synchronous /publish endpoint and a periodic server-side tick so a subscriber sees live data even with no producer attached.
Why this example is pool-safe
PoolRequestBundle recycles the per-request bundle the instant the handler returns. SSE handlers do not return until the client disconnects or the server shuts down — they sit inside their for loop streaming data. The request stays alive for the entire stream, so the pooled bundle is returned only after the SSE session ends. The lifetime contract holds; pooling is safe.
This is the inverse of WebSocket / Hijack() upgrades, which transfer ownership of the connection past ServeHTTP return — pooling is unsafe there. SSE is plain HTTP over net/http and respects the contract trivially.
Step 1 — Construct the router
hub := newHub()
log := slog.New(slog.NewTextHandler(os.Stdout, nil))
mux := mm.New()
mux.PoolRequestBundle = true
mux.Pre(mw.RequestID(), mw.RecovererWithLogger(log))
mux.GET("/events/:topic", hub.stream)
mux.POST("/publish", hub.publish)
mux.GET("/", indexHTML)
go hub.tick() // emit a server-side "tick" every 5 s on the "system" topic
Pre runs once per request, before routing — RequestID and RecovererWithLogger apply to both the streaming route and the synchronous publisher.
Step 2 — The hub — a tiny topic-to-subscribers fan-out
The hub holds one map[string]map[chan string]struct{} (topic → set of subscriber channels) behind a sync.RWMutex. Reads (broadcasts) take the read lock; writes (subscribe / unsubscribe) take the write lock. A slow subscriber is dropped rather than blocking the broadcast — backpressure that would otherwise stall every subscriber on a topic.
func (h *hub) broadcast(topic, msg string) int {
h.mu.RLock()
subs := h.topics[topic]
delivered := 0
for ch := range subs {
select {
case ch <- msg:
delivered++
default: // slow subscriber — drop the message rather than block
}
}
h.mu.RUnlock()
return delivered
}
subscribe returns a buffered channel (make(chan string, 16)); unsubscribe removes the entry and closes the channel.
Step 3 — The stream handler
The SSE handler writes the canonical event-stream headers, performs the initial flush to send the response head immediately, subscribes, and then loops over r.Context().Done() and the subscriber channel. r.Context() is safe to use under PoolRequestBundle because the handler has not returned yet — the bundle is still live.
func (h *hub) stream(w http.ResponseWriter, r *http.Request) {
topic := mm.PathParam(r, "topic")
if topic == "" {
http.Error(w, "missing topic", http.StatusBadRequest)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // for nginx proxies
w.WriteHeader(http.StatusOK)
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
return
}
ch := h.subscribe(topic)
defer h.unsubscribe(topic, ch)
fmt.Fprintf(w, "event: ready\ndata: subscribed to %q\n\n", topic)
flusher.Flush()
for {
select {
case <-r.Context().Done():
return
case msg, ok := <-ch:
if !ok {
return
}
_, _ = fmt.Fprintf(w, "data: %s\n\n", msg)
flusher.Flush()
}
}
}
Notes on the wire format: SSE separates events with a blank line. data: is the message body; event: names a custom event the client subscribes to via addEventListener (the example uses event: ready for the initial subscription confirmation). X-Accel-Buffering: no disables nginx's response buffering for the stream — without it, an nginx-fronted SSE endpoint will appear to hang for minutes.
Step 4 — The publish handler
Synchronous POST endpoint that drains the body, parses the JSON, broadcasts to the named topic, and returns a delivery count. Body is drained inline with io.ReadAll(io.LimitReader(r.Body, 1<<16)) — nothing escapes the handler, so the pooled bundle recycles cleanly.
func (h *hub) publish(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(io.LimitReader(r.Body, 1<<16))
if err != nil {
http.Error(w, "bad body", http.StatusBadRequest)
return
}
var p struct {
Topic string `json:"topic"`
Msg string `json:"msg"`
}
if err := json.Unmarshal(body, &p); err != nil {
http.Error(w, "invalid json", http.StatusBadRequest)
return
}
if p.Topic == "" || p.Msg == "" {
http.Error(w, "topic and msg required", http.StatusUnprocessableEntity)
return
}
n := h.broadcast(p.Topic, p.Msg)
_ = json.NewEncoder(w).Encode(map[string]any{
"delivered": n,
"topic": p.Topic,
})
}
The 64 KB LimitReader cap prevents a malicious publisher from exhausting server memory with an unbounded body. The 422 status code on empty topic / msg follows RFC 9110 §15.5.21 (Unprocessable Content) — the JSON parsed but did not validate.
Step 5 — The periodic tick
A goroutine launched at boot broadcasts a "tick" message every 5 s on the system topic. It is the canonical pattern for a server-driven heartbeat — a subscriber sees activity even when no client is publishing.
func (h *hub) tick() {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-h.done:
return
case now := <-t.C:
h.broadcast("system", fmt.Sprintf("tick at %s", now.Format(time.Kitchen)))
}
}
}
Try it
# Terminal 1: start the server
go run .
# Terminal 2: subscribe to the news topic
curl -N http://localhost:8080/events/news
# Terminal 3: publish to the news topic
curl -X POST http://localhost:8080/publish \
-d '{"topic":"news","msg":"hello"}'
# Or open http://localhost:8080/ in a browser — the page subscribes to /events/system.
Frequently asked questions
Why is SSE safe with PoolRequestBundle when the connection outlives the handler?
The connection outlives the call, but the handler does not return until the connection closes — it sits inside the for loop streaming events. Under PoolRequestBundle, the bundle is recycled when the handler returns; for an SSE handler, that is when the stream ends. So the pool reclaim happens after the connection terminates, never during the stream. The contract holds.
How do I bound memory if a subscriber stops reading?
The hub's broadcast uses select with a default branch (Step 2). When the subscriber's buffered channel (make(chan string, 16)) is full, the message is dropped rather than block the broadcast goroutine. A subscriber that does not read for 16 events gets dropped messages but never causes head-of-line blocking for other subscribers. For stronger guarantees, swap the default branch for time.After plus an unsubscribe action.
How do I authenticate the publisher?
Wrap /publish with an auth middleware in a Group: pub := mux.Group("/publish"); pub.Use(mw.JWTAuth(cfg)); pub.POST("", hub.publish). Or, for a server-to-server publisher, use mw.APIKey with a SHA-256-hashed key lookup. Both compose at registration time — zero per-request overhead on the publish path. The stream endpoint stays unauthenticated if it should be world-readable, or guarded with the same middleware family if subscriptions are per-tenant.
Upstream source
Every code excerpt above is lifted verbatim from examples/server-sent-events/main.go at the v1.1.0 tag. The upstream file also contains the full hub type with subscribe, unsubscribe, and close, plus the in-browser demo HTML — follow the link for the full program.
See also
- Maximum performance — Recipe 3 explains why long-lived streams are pool-safe.
- Reverse-proxy example — another pool-safe pattern (the proxy returns synchronously before
ServeHTTPexits). - Upload file example — the body-drain-before-spawn pattern for background work.