@@ -4,9 +4,11 @@ import { Installation } from "@/installation"
44import { Instance } from "@/project/instance"
55import { InstallationVersion } from "@opencode-ai/core/installation/version"
66import * as Log from "@opencode-ai/core/util/log"
7- import { Effect , Schema } from "effect"
7+ import { Effect , Queue , Schema } from "effect"
8+ import * as Stream from "effect/Stream"
89import { HttpServerRequest , HttpServerResponse } from "effect/unstable/http"
910import { HttpApi , HttpApiBuilder , HttpApiEndpoint , HttpApiGroup , OpenApi } from "effect/unstable/httpapi"
11+ import * as Sse from "effect/unstable/encoding/Sse"
1012
1113const log = Log . create ( { service : "server" } )
1214
@@ -108,8 +110,13 @@ export const GlobalApi = HttpApi.make("global").add(
108110 . annotateMerge ( OpenApi . annotations ( { title : "global" , description : "Global server routes." } ) ) ,
109111)
110112
111- function eventData ( data : unknown ) {
112- return `data: ${ JSON . stringify ( data ) } \n\n`
113+ function eventData ( data : unknown ) : Sse . Event {
114+ return {
115+ _tag : "Event" ,
116+ event : "message" ,
117+ id : undefined ,
118+ data : JSON . stringify ( data ) ,
119+ }
113120}
114121
115122function parseBody ( body : string ) {
@@ -121,49 +128,35 @@ function parseBody(body: string) {
121128}
122129
123130function eventResponse ( ) {
124- const encoder = new TextEncoder ( )
125- let heartbeat : ReturnType < typeof setInterval > | undefined
126- let unsubscribe = ( ) => { }
127- let done = false
128-
129- const cleanup = ( ) => {
130- if ( done ) return
131- done = true
132- if ( heartbeat ) clearInterval ( heartbeat )
133- unsubscribe ( )
134- log . info ( "global event disconnected" )
135- }
136-
137131 log . info ( "global event connected" )
138- return HttpServerResponse . raw (
139- new Response (
140- new ReadableStream < Uint8Array > ( {
141- start ( controller ) {
142- const write = ( data : unknown ) => {
143- if ( done ) return
144- try {
145- controller . enqueue ( encoder . encode ( eventData ( data ) ) )
146- } catch {
147- cleanup ( )
148- }
149- }
150- const handler = ( event : GlobalBusEvent ) => write ( event )
151- unsubscribe = ( ) => GlobalBus . off ( "event" , handler )
152- GlobalBus . on ( "event" , handler )
153- write ( { payload : { type : "server.connected" , properties : { } } } )
154- heartbeat = setInterval ( ( ) => write ( { payload : { type : "server.heartbeat" , properties : { } } } ) , 10_000 )
155- } ,
156- cancel : cleanup ,
157- } ) ,
158- {
159- headers : {
160- "Cache-Control" : "no-cache, no-transform" ,
161- "Content-Type" : "text/event-stream" ,
162- "X-Accel-Buffering" : "no" ,
163- "X-Content-Type-Options" : "nosniff" ,
164- } ,
165- } ,
132+ const events = Stream . callback < GlobalBusEvent > ( ( queue ) => {
133+ const handler = ( event : GlobalBusEvent ) => Queue . offerUnsafe ( queue , event )
134+ return Effect . acquireRelease (
135+ Effect . sync ( ( ) => GlobalBus . on ( "event" , handler ) ) ,
136+ ( ) => Effect . sync ( ( ) => GlobalBus . off ( "event" , handler ) ) ,
137+ )
138+ } )
139+ const heartbeat = Stream . tick ( "10 seconds" ) . pipe (
140+ Stream . drop ( 1 ) ,
141+ Stream . map ( ( ) => ( { payload : { type : "server.heartbeat" , properties : { } } } ) ) ,
142+ )
143+
144+ return HttpServerResponse . stream (
145+ Stream . make ( { payload : { type : "server.connected" , properties : { } } } ) . pipe (
146+ Stream . concat ( events . pipe ( Stream . merge ( heartbeat , { haltStrategy : "left" } ) ) ) ,
147+ Stream . map ( eventData ) ,
148+ Stream . pipeThroughChannel ( Sse . encode ( ) ) ,
149+ Stream . encodeText ,
150+ Stream . ensuring ( Effect . sync ( ( ) => log . info ( "global event disconnected" ) ) ) ,
166151 ) ,
152+ {
153+ contentType : "text/event-stream" ,
154+ headers : {
155+ "Cache-Control" : "no-cache, no-transform" ,
156+ "X-Accel-Buffering" : "no" ,
157+ "X-Content-Type-Options" : "nosniff" ,
158+ } ,
159+ } ,
167160 )
168161}
169162
0 commit comments