Skip to content

Commit d4d8ea6

Browse files
committed
refactor(httpapi): encode event streams with effect sse
1 parent 7198c8f commit d4d8ea6

2 files changed

Lines changed: 46 additions & 46 deletions

File tree

packages/opencode/src/server/routes/instance/httpapi/event.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { Effect, Schema } from "effect"
44
import * as Stream from "effect/Stream"
55
import { HttpRouter, HttpServerResponse } from "effect/unstable/http"
66
import { HttpApi, HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi"
7+
import * as Sse from "effect/unstable/encoding/Sse"
78

89
const log = Log.create({ service: "server" })
910

@@ -27,8 +28,13 @@ export const EventApi = HttpApi.make("event").add(
2728
.annotateMerge(OpenApi.annotations({ title: "event", description: "Instance event stream route." })),
2829
)
2930

30-
function eventData(data: unknown) {
31-
return `data: ${JSON.stringify(data)}\n\n`
31+
function eventData(data: unknown): Sse.Event {
32+
return {
33+
_tag: "Event",
34+
event: "message",
35+
id: undefined,
36+
data: JSON.stringify(data),
37+
}
3238
}
3339

3440
export const eventRoute = HttpRouter.add(
@@ -47,6 +53,7 @@ export const eventRoute = HttpRouter.add(
4753
Stream.make({ type: "server.connected", properties: {} }).pipe(
4854
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
4955
Stream.map(eventData),
56+
Stream.pipeThroughChannel(Sse.encode()),
5057
Stream.encodeText,
5158
Stream.ensuring(Effect.sync(() => log.info("event disconnected"))),
5259
),

packages/opencode/src/server/routes/instance/httpapi/global.ts

Lines changed: 37 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import { Installation } from "@/installation"
44
import { Instance } from "@/project/instance"
55
import { InstallationVersion } from "@opencode-ai/core/installation/version"
66
import * as Log from "@opencode-ai/core/util/log"
7-
import { Effect, Schema } from "effect"
7+
import { Effect, Queue, Schema } from "effect"
8+
import * as Stream from "effect/Stream"
89
import { HttpServerRequest, HttpServerResponse } from "effect/unstable/http"
910
import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi"
11+
import * as Sse from "effect/unstable/encoding/Sse"
1012

1113
const log = Log.create({ service: "server" })
1214

@@ -108,8 +110,13 @@ export const GlobalApi = HttpApi.make("global").add(
108110
.annotateMerge(OpenApi.annotations({ title: "global", description: "Global server routes." })),
109111
)
110112

111-
function eventData(data: unknown) {
112-
return `data: ${JSON.stringify(data)}\n\n`
113+
function eventData(data: unknown): Sse.Event {
114+
return {
115+
_tag: "Event",
116+
event: "message",
117+
id: undefined,
118+
data: JSON.stringify(data),
119+
}
113120
}
114121

115122
function parseBody(body: string) {
@@ -121,49 +128,35 @@ function parseBody(body: string) {
121128
}
122129

123130
function eventResponse() {
124-
const encoder = new TextEncoder()
125-
let heartbeat: ReturnType<typeof setInterval> | undefined
126-
let unsubscribe = () => {}
127-
let done = false
128-
129-
const cleanup = () => {
130-
if (done) return
131-
done = true
132-
if (heartbeat) clearInterval(heartbeat)
133-
unsubscribe()
134-
log.info("global event disconnected")
135-
}
136-
137131
log.info("global event connected")
138-
return HttpServerResponse.raw(
139-
new Response(
140-
new ReadableStream<Uint8Array>({
141-
start(controller) {
142-
const write = (data: unknown) => {
143-
if (done) return
144-
try {
145-
controller.enqueue(encoder.encode(eventData(data)))
146-
} catch {
147-
cleanup()
148-
}
149-
}
150-
const handler = (event: GlobalBusEvent) => write(event)
151-
unsubscribe = () => GlobalBus.off("event", handler)
152-
GlobalBus.on("event", handler)
153-
write({ payload: { type: "server.connected", properties: {} } })
154-
heartbeat = setInterval(() => write({ payload: { type: "server.heartbeat", properties: {} } }), 10_000)
155-
},
156-
cancel: cleanup,
157-
}),
158-
{
159-
headers: {
160-
"Cache-Control": "no-cache, no-transform",
161-
"Content-Type": "text/event-stream",
162-
"X-Accel-Buffering": "no",
163-
"X-Content-Type-Options": "nosniff",
164-
},
165-
},
132+
const events = Stream.callback<GlobalBusEvent>((queue) => {
133+
const handler = (event: GlobalBusEvent) => Queue.offerUnsafe(queue, event)
134+
return Effect.acquireRelease(
135+
Effect.sync(() => GlobalBus.on("event", handler)),
136+
() => Effect.sync(() => GlobalBus.off("event", handler)),
137+
)
138+
})
139+
const heartbeat = Stream.tick("10 seconds").pipe(
140+
Stream.drop(1),
141+
Stream.map(() => ({ payload: { type: "server.heartbeat", properties: {} } })),
142+
)
143+
144+
return HttpServerResponse.stream(
145+
Stream.make({ payload: { type: "server.connected", properties: {} } }).pipe(
146+
Stream.concat(events.pipe(Stream.merge(heartbeat, { haltStrategy: "left" }))),
147+
Stream.map(eventData),
148+
Stream.pipeThroughChannel(Sse.encode()),
149+
Stream.encodeText,
150+
Stream.ensuring(Effect.sync(() => log.info("global event disconnected"))),
166151
),
152+
{
153+
contentType: "text/event-stream",
154+
headers: {
155+
"Cache-Control": "no-cache, no-transform",
156+
"X-Accel-Buffering": "no",
157+
"X-Content-Type-Options": "nosniff",
158+
},
159+
},
167160
)
168161
}
169162

0 commit comments

Comments
 (0)