|
1 | 1 | import { Config } from "@/config/config" |
2 | | -import { Schema } from "effect" |
3 | | -import { HttpApi, HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi" |
| 2 | +import { GlobalBus, type GlobalEvent as GlobalBusEvent } from "@/bus/global" |
| 3 | +import { Installation } from "@/installation" |
| 4 | +import { Instance } from "@/project/instance" |
| 5 | +import { InstallationVersion } from "@opencode-ai/core/installation/version" |
| 6 | +import * as Log from "@opencode-ai/core/util/log" |
| 7 | +import { Effect, Schema } from "effect" |
| 8 | +import { HttpServerRequest, HttpServerResponse } from "effect/unstable/http" |
| 9 | +import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi" |
| 10 | + |
| 11 | +const log = Log.create({ service: "server" }) |
4 | 12 |
|
5 | 13 | const GlobalHealth = Schema.Struct({ |
6 | 14 | healthy: Schema.Literal(true), |
7 | 15 | version: Schema.String, |
8 | 16 | }).annotate({ identifier: "GlobalHealth" }) |
9 | 17 |
|
10 | | -const GlobalEvent = Schema.Struct({ |
| 18 | +const GlobalEventSchema = Schema.Struct({ |
11 | 19 | directory: Schema.String, |
12 | 20 | project: Schema.optional(Schema.String), |
13 | 21 | workspace: Schema.optional(Schema.String), |
@@ -50,7 +58,7 @@ export const GlobalApi = HttpApi.make("global").add( |
50 | 58 | }), |
51 | 59 | ), |
52 | 60 | HttpApiEndpoint.get("event", GlobalPaths.event, { |
53 | | - success: GlobalEvent, |
| 61 | + success: GlobalEventSchema, |
54 | 62 | }).annotateMerge( |
55 | 63 | OpenApi.annotations({ |
56 | 64 | identifier: "global.event", |
@@ -99,3 +107,153 @@ export const GlobalApi = HttpApi.make("global").add( |
99 | 107 | ) |
100 | 108 | .annotateMerge(OpenApi.annotations({ title: "global", description: "Global server routes." })), |
101 | 109 | ) |
| 110 | + |
| 111 | +function eventData(data: unknown) { |
| 112 | + return `data: ${JSON.stringify(data)}\n\n` |
| 113 | +} |
| 114 | + |
| 115 | +function parseBody(body: string) { |
| 116 | + try { |
| 117 | + return JSON.parse(body || "{}") as unknown |
| 118 | + } catch { |
| 119 | + return undefined |
| 120 | + } |
| 121 | +} |
| 122 | + |
| 123 | +function eventResponse() { |
| 124 | + const encoder = new TextEncoder() |
| 125 | + let heartbeat: ReturnType<typeof setInterval> | undefined |
| 126 | + let unsubscribe = () => {} |
| 127 | + let done = false |
| 128 | + |
| 129 | + const cleanup = () => { |
| 130 | + if (done) return |
| 131 | + done = true |
| 132 | + if (heartbeat) clearInterval(heartbeat) |
| 133 | + unsubscribe() |
| 134 | + log.info("global event disconnected") |
| 135 | + } |
| 136 | + |
| 137 | + log.info("global event connected") |
| 138 | + return HttpServerResponse.raw( |
| 139 | + new Response( |
| 140 | + new ReadableStream<Uint8Array>({ |
| 141 | + start(controller) { |
| 142 | + const write = (data: unknown) => { |
| 143 | + if (done) return |
| 144 | + try { |
| 145 | + controller.enqueue(encoder.encode(eventData(data))) |
| 146 | + } catch { |
| 147 | + cleanup() |
| 148 | + } |
| 149 | + } |
| 150 | + const handler = (event: GlobalBusEvent) => write(event) |
| 151 | + unsubscribe = () => GlobalBus.off("event", handler) |
| 152 | + GlobalBus.on("event", handler) |
| 153 | + write({ payload: { type: "server.connected", properties: {} } }) |
| 154 | + heartbeat = setInterval(() => write({ payload: { type: "server.heartbeat", properties: {} } }), 10_000) |
| 155 | + }, |
| 156 | + cancel: cleanup, |
| 157 | + }), |
| 158 | + { |
| 159 | + headers: { |
| 160 | + "Cache-Control": "no-cache, no-transform", |
| 161 | + "Content-Type": "text/event-stream", |
| 162 | + "X-Accel-Buffering": "no", |
| 163 | + "X-Content-Type-Options": "nosniff", |
| 164 | + }, |
| 165 | + }, |
| 166 | + ), |
| 167 | + ) |
| 168 | +} |
| 169 | + |
| 170 | +export const globalHandlers = HttpApiBuilder.group(GlobalApi, "global", (handlers) => |
| 171 | + Effect.gen(function* () { |
| 172 | + const config = yield* Config.Service |
| 173 | + const installation = yield* Installation.Service |
| 174 | + |
| 175 | + const health = Effect.fn("GlobalHttpApi.health")(function* () { |
| 176 | + return { healthy: true as const, version: InstallationVersion } |
| 177 | + }) |
| 178 | + |
| 179 | + const event = Effect.fn("GlobalHttpApi.event")(function* () { |
| 180 | + return eventResponse() |
| 181 | + }) |
| 182 | + |
| 183 | + const configGet = Effect.fn("GlobalHttpApi.configGet")(function* () { |
| 184 | + return yield* config.getGlobal() |
| 185 | + }) |
| 186 | + |
| 187 | + const configUpdate = Effect.fn("GlobalHttpApi.configUpdate")(function* (ctx) { |
| 188 | + return yield* config.updateGlobal(ctx.payload) |
| 189 | + }) |
| 190 | + |
| 191 | + const dispose = Effect.fn("GlobalHttpApi.dispose")(function* () { |
| 192 | + yield* Effect.promise(() => Instance.disposeAll()) |
| 193 | + GlobalBus.emit("event", { |
| 194 | + directory: "global", |
| 195 | + payload: { type: "global.disposed", properties: {} }, |
| 196 | + }) |
| 197 | + return true |
| 198 | + }) |
| 199 | + |
| 200 | + const upgrade = Effect.fn("GlobalHttpApi.upgrade")(function* (ctx: { payload: typeof GlobalUpgradeInput.Type }) { |
| 201 | + const method = yield* installation.method() |
| 202 | + if (method === "unknown") { |
| 203 | + return { |
| 204 | + status: 400, |
| 205 | + body: { success: false as const, error: "Unknown installation method" }, |
| 206 | + } |
| 207 | + } |
| 208 | + const target = ctx.payload.target || (yield* installation.latest(method)) |
| 209 | + const result = yield* installation.upgrade(method, target).pipe( |
| 210 | + Effect.as({ status: 200, body: { success: true as const, version: target } }), |
| 211 | + Effect.catch((err) => |
| 212 | + Effect.succeed({ |
| 213 | + status: 500, |
| 214 | + body: { |
| 215 | + success: false as const, |
| 216 | + error: err instanceof Error ? err.message : String(err), |
| 217 | + }, |
| 218 | + }), |
| 219 | + ), |
| 220 | + ) |
| 221 | + if (!result.body.success) return result |
| 222 | + GlobalBus.emit("event", { |
| 223 | + directory: "global", |
| 224 | + payload: { |
| 225 | + type: Installation.Event.Updated.type, |
| 226 | + properties: { version: target }, |
| 227 | + }, |
| 228 | + }) |
| 229 | + return result |
| 230 | + }) |
| 231 | + |
| 232 | + const upgradeRaw = Effect.fn("GlobalHttpApi.upgradeRaw")(function* (ctx: { |
| 233 | + request: HttpServerRequest.HttpServerRequest |
| 234 | + }) { |
| 235 | + const body = yield* Effect.orDie(ctx.request.text) |
| 236 | + const json = parseBody(body) |
| 237 | + if (json === undefined) { |
| 238 | + return HttpServerResponse.jsonUnsafe({ success: false, error: "Invalid request body" }, { status: 400 }) |
| 239 | + } |
| 240 | + const payload = yield* Schema.decodeUnknownEffect(GlobalUpgradeInput)(json).pipe( |
| 241 | + Effect.map((payload) => ({ valid: true as const, payload })), |
| 242 | + Effect.catch(() => Effect.succeed({ valid: false as const })), |
| 243 | + ) |
| 244 | + if (!payload.valid) { |
| 245 | + return HttpServerResponse.jsonUnsafe({ success: false, error: "Invalid request body" }, { status: 400 }) |
| 246 | + } |
| 247 | + const result = yield* upgrade({ payload: payload.payload }) |
| 248 | + return HttpServerResponse.jsonUnsafe(result.body, { status: result.status }) |
| 249 | + }) |
| 250 | + |
| 251 | + return handlers |
| 252 | + .handle("health", health) |
| 253 | + .handleRaw("event", event) |
| 254 | + .handle("configGet", configGet) |
| 255 | + .handle("configUpdate", configUpdate) |
| 256 | + .handle("dispose", dispose) |
| 257 | + .handleRaw("upgrade", upgradeRaw) |
| 258 | + }), |
| 259 | +) |
0 commit comments