Skip to content

Commit d66e7b0

Browse files
committed
core: expose projected v2 session messages
1 parent 865d7ab commit d66e7b0

14 files changed

Lines changed: 357 additions & 121 deletions

File tree

packages/opencode/src/server/routes/instance/httpapi/server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import { QuestionApi, questionHandlers } from "./question"
2626
import { SessionApi, sessionHandlers } from "./session"
2727
import { SyncApi, syncHandlers } from "./sync"
2828
import { TuiApi, tuiHandlers } from "./tui"
29+
import { V2Api, v2Handlers } from "./v2"
2930
import { WorkspaceApi, workspaceHandlers } from "./workspace"
3031
import { disposeMiddleware } from "./lifecycle"
3132
import { memoMap } from "@opencode-ai/core/effect/memo-map"
@@ -86,6 +87,7 @@ export const routes = Layer.mergeAll(
8687
HttpApiBuilder.layer(ProviderApi).pipe(Layer.provide(providerHandlers)),
8788
HttpApiBuilder.layer(SessionApi).pipe(Layer.provide(sessionHandlers)),
8889
HttpApiBuilder.layer(SyncApi).pipe(Layer.provide(syncHandlers)),
90+
HttpApiBuilder.layer(V2Api).pipe(Layer.provide(v2Handlers)),
8991
HttpApiBuilder.layer(TuiApi).pipe(
9092
Layer.provide(tuiHandlers),
9193
Layer.provide(Session.defaultLayer),
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import { Session as LegacySession } from "@/session/session"
2+
import { SessionID } from "@/session/schema"
3+
import { SessionMessage } from "@/v2/session-message"
4+
import { SessionV2 } from "@/v2/session"
5+
import { Effect, Layer, Schema } from "effect"
6+
import { HttpApi, HttpApiBuilder, HttpApiEndpoint, HttpApiGroup, OpenApi } from "effect/unstable/httpapi"
7+
import { Authorization } from "./auth"
8+
9+
export const V2Api = HttpApi.make("v2")
10+
.add(
11+
HttpApiGroup.make("v2")
12+
.add(
13+
HttpApiEndpoint.get("messages", "/api/session/:sessionID/message", {
14+
params: { sessionID: SessionID },
15+
success: Schema.Array(SessionMessage.Message),
16+
}).annotateMerge(
17+
OpenApi.annotations({
18+
identifier: "v2.session.messages",
19+
summary: "Get v2 session messages",
20+
description: "Retrieve projected v2 messages for a session directly from the message database.",
21+
}),
22+
),
23+
)
24+
.annotateMerge(
25+
OpenApi.annotations({
26+
title: "v2",
27+
description: "Experimental v2 routes.",
28+
}),
29+
)
30+
.middleware(Authorization),
31+
)
32+
.annotateMerge(
33+
OpenApi.annotations({
34+
title: "opencode experimental HttpApi",
35+
version: "0.0.1",
36+
description: "Experimental HttpApi surface for selected instance routes.",
37+
}),
38+
)
39+
40+
export const v2Handlers = Layer.unwrap(
41+
Effect.gen(function* () {
42+
const legacySession = yield* LegacySession.Service
43+
const session = yield* SessionV2.Service
44+
45+
const messages = Effect.fn("V2HttpApi.messages")(function* (ctx: { params: { sessionID: SessionID } }) {
46+
yield* legacySession.get(ctx.params.sessionID)
47+
return yield* session.messages(ctx.params.sessionID)
48+
})
49+
50+
return HttpApiBuilder.group(V2Api, "v2", (handlers) => handlers.handle("messages", messages))
51+
}),
52+
).pipe(Layer.provide(LegacySession.defaultLayer), Layer.provide(SessionV2.layer))
53+
54+
export * as V2HttpApi from "./v2"

packages/opencode/src/server/routes/instance/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export const InstanceRoutes = (upgrade: UpgradeWebSocket): Hono => {
4545
if (Flag.OPENCODE_EXPERIMENTAL_HTTPAPI) {
4646
const handler = ExperimentalHttpApiServer.webHandler().handler
4747
const context = Context.empty() as Context.Context<unknown>
48+
app.all("/api/*", (c) => handler(c.req.raw, context))
4849
app.get(EventPaths.event, (c) => handler(c.req.raw, context))
4950
app.get("/question", (c) => handler(c.req.raw, context))
5051
app.post("/question/:requestID/reply", (c) => handler(c.req.raw, context))

packages/opencode/src/session/processor.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ export const layer: Layer.Layer<
224224

225225
case "reasoning-start":
226226
if (value.id in ctx.reasoningMap) return
227+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
227228
SyncEvent.run(SessionEvent.Reasoning.Started.Sync, {
228229
sessionID: ctx.sessionID,
229230
reasoningID: value.id,
@@ -243,6 +244,7 @@ export const layer: Layer.Layer<
243244

244245
case "reasoning-delta":
245246
if (!(value.id in ctx.reasoningMap)) return
247+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
246248
SyncEvent.run(SessionEvent.Reasoning.Delta.Sync, {
247249
sessionID: ctx.sessionID,
248250
reasoningID: value.id,
@@ -262,6 +264,7 @@ export const layer: Layer.Layer<
262264

263265
case "reasoning-end":
264266
if (!(value.id in ctx.reasoningMap)) return
267+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
265268
SyncEvent.run(SessionEvent.Reasoning.Ended.Sync, {
266269
sessionID: ctx.sessionID,
267270
reasoningID: value.id,
@@ -280,6 +283,7 @@ export const layer: Layer.Layer<
280283
if (ctx.assistantMessage.summary) {
281284
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
282285
}
286+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
283287
SyncEvent.run(SessionEvent.Tool.Input.Started.Sync, {
284288
sessionID: ctx.sessionID,
285289
callID: value.id,
@@ -308,6 +312,7 @@ export const layer: Layer.Layer<
308312
return
309313

310314
case "tool-input-end": {
315+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
311316
SyncEvent.run(SessionEvent.Tool.Input.Ended.Sync, {
312317
sessionID: ctx.sessionID,
313318
callID: value.id,
@@ -322,6 +327,7 @@ export const layer: Layer.Layer<
322327
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
323328
}
324329
const toolCall = yield* readToolCall(value.toolCallId)
330+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
325331
SyncEvent.run(SessionEvent.Tool.Called.Sync, {
326332
sessionID: ctx.sessionID,
327333
callID: value.toolCallId,
@@ -377,6 +383,7 @@ export const layer: Layer.Layer<
377383

378384
case "tool-result": {
379385
const toolCall = yield* readToolCall(value.toolCallId)
386+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
380387
SyncEvent.run(SessionEvent.Tool.Success.Sync, {
381388
sessionID: ctx.sessionID,
382389
callID: value.toolCallId,
@@ -404,6 +411,7 @@ export const layer: Layer.Layer<
404411

405412
case "tool-error": {
406413
const toolCall = yield* readToolCall(value.toolCallId)
414+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
407415
SyncEvent.run(SessionEvent.Tool.Error.Sync, {
408416
sessionID: ctx.sessionID,
409417
callID: value.toolCallId,
@@ -425,6 +433,7 @@ export const layer: Layer.Layer<
425433

426434
case "start-step":
427435
if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
436+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
428437
SyncEvent.run(SessionEvent.Step.Started.Sync, {
429438
sessionID: ctx.sessionID,
430439
model: {
@@ -451,6 +460,7 @@ export const layer: Layer.Layer<
451460
usage: value.usage,
452461
metadata: value.providerMetadata,
453462
})
463+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
454464
SyncEvent.run(SessionEvent.Step.Ended.Sync, {
455465
sessionID: ctx.sessionID,
456466
reason: value.finishReason,
@@ -503,6 +513,7 @@ export const layer: Layer.Layer<
503513
}
504514

505515
case "text-start":
516+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
506517
SyncEvent.run(SessionEvent.Text.Started.Sync, {
507518
sessionID: ctx.sessionID,
508519
timestamp: DateTime.makeUnsafe(Date.now()),
@@ -545,6 +556,7 @@ export const layer: Layer.Layer<
545556
},
546557
{ text: ctx.currentText.text },
547558
)).text
559+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
548560
SyncEvent.run(SessionEvent.Text.Ended.Sync, {
549561
sessionID: ctx.sessionID,
550562
text: ctx.currentText.text,
@@ -677,6 +689,7 @@ export const layer: Layer.Layer<
677689
SessionRetry.policy({
678690
parse,
679691
set: (info) => {
692+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
680693
SyncEvent.run(SessionEvent.Retried.Sync, {
681694
sessionID: ctx.sessionID,
682695
attempt: info.attempt,

packages/opencode/src/session/projectors-next.ts

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,23 @@ import * as DateTime from "effect/DateTime"
77
import { SyncEvent } from "@/sync"
88
import { SessionMessageTable } from "./session.sql"
99
import type { SessionID } from "./schema"
10+
import { Schema } from "effect"
11+
12+
const decodeMessage = Schema.decodeUnknownSync(SessionMessage.Message)
13+
type SessionMessageData = NonNullable<typeof SessionMessageTable.$inferInsert["data"]>
14+
15+
function encodeDateTimes(value: unknown): unknown {
16+
if (DateTime.isDateTime(value)) return DateTime.toEpochMillis(value)
17+
if (Array.isArray(value)) return value.map(encodeDateTimes)
18+
if (typeof value === "object" && value !== null) {
19+
return Object.fromEntries(Object.entries(value).map(([key, item]) => [key, encodeDateTimes(item)]))
20+
}
21+
return value
22+
}
23+
24+
function encodeMessageData(value: unknown): SessionMessageData {
25+
return encodeDateTimes(value) as SessionMessageData
26+
}
1027

1128
function sqlite(db: Database.TxOrDb, sessionID: SessionID): SessionMessageUpdater.Adapter<void> {
1229
return {
@@ -17,13 +34,13 @@ function sqlite(db: Database.TxOrDb, sessionID: SessionID): SessionMessageUpdate
1734
.where(and(eq(SessionMessageTable.session_id, sessionID), eq(SessionMessageTable.type, "assistant")))
1835
.orderBy(desc(SessionMessageTable.id))
1936
.all()
20-
.map((row) => ({ id: row.id, type: row.type, ...row.data }) as SessionMessage.Message)
37+
.map((row) => decodeMessage({ ...row.data, id: row.id, type: row.type }))
2138
.find((message): message is SessionMessage.Assistant => message.type === "assistant" && !message.time.completed)
2239
},
2340
updateAssistant(assistant) {
2441
const { id, type, ...data } = assistant
2542
db.update(SessionMessageTable)
26-
.set({ data })
43+
.set({ data: encodeMessageData(data) })
2744
.where(
2845
and(
2946
eq(SessionMessageTable.id, id),
@@ -36,13 +53,15 @@ function sqlite(db: Database.TxOrDb, sessionID: SessionID): SessionMessageUpdate
3653
appendMessage(message) {
3754
const { id, type, ...data } = message
3855
db.insert(SessionMessageTable)
39-
.values({
40-
id,
41-
session_id: sessionID,
42-
type,
43-
time_created: DateTime.toEpochMillis(message.time.created),
44-
data,
45-
})
56+
.values([
57+
{
58+
id,
59+
session_id: sessionID,
60+
type,
61+
time_created: DateTime.toEpochMillis(message.time.created),
62+
data: encodeMessageData(data),
63+
},
64+
])
4665
.run()
4766
},
4867
appendPending() {},

packages/opencode/src/session/prompt.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ import { InstanceState } from "@/effect/instance-state"
5353
import { TaskTool, type TaskPromptOps } from "@/tool/task"
5454
import { SessionRunState } from "./run-state"
5555
import { EffectBridge } from "@/effect/bridge"
56+
import { SessionEvent } from "@/v2/session-event"
57+
import { AgentAttachment, FileAttachment, Source } from "@/v2/session-prompt"
58+
import { SyncEvent } from "@/sync"
59+
import * as DateTime from "effect/DateTime"
5660

5761
// @ts-ignore
5862
globalThis.AI_SDK_LOG_WARNINGS = false
@@ -1236,6 +1240,69 @@ NOTE: At any point in time through this workflow you should feel free to ask the
12361240

12371241
yield* sessions.updateMessage(info)
12381242
for (const part of parts) yield* sessions.updatePart(part)
1243+
const nextPrompt = parts.reduce(
1244+
(result, part) => {
1245+
if (part.type === "text") {
1246+
if (part.synthetic) result.synthetic.push(part.text)
1247+
else result.text.push(part.text)
1248+
}
1249+
if (part.type === "file") {
1250+
result.files.push(
1251+
new FileAttachment({
1252+
uri: part.url,
1253+
mime: part.mime,
1254+
name: part.filename,
1255+
source: part.source
1256+
? new Source({
1257+
start: part.source.text.start,
1258+
end: part.source.text.end,
1259+
text: part.source.text.value,
1260+
})
1261+
: undefined,
1262+
}),
1263+
)
1264+
}
1265+
if (part.type === "agent") {
1266+
result.agents.push(
1267+
new AgentAttachment({
1268+
name: part.name,
1269+
source: part.source
1270+
? new Source({
1271+
start: part.source.start,
1272+
end: part.source.end,
1273+
text: part.source.value,
1274+
})
1275+
: undefined,
1276+
}),
1277+
)
1278+
}
1279+
return result
1280+
},
1281+
{
1282+
text: [] as string[],
1283+
files: [] as FileAttachment[],
1284+
agents: [] as AgentAttachment[],
1285+
synthetic: [] as string[],
1286+
},
1287+
)
1288+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
1289+
SyncEvent.run(SessionEvent.Prompted.Sync, {
1290+
sessionID: input.sessionID,
1291+
timestamp: DateTime.makeUnsafe(info.time.created),
1292+
prompt: {
1293+
text: nextPrompt.text.join("\n"),
1294+
files: nextPrompt.files,
1295+
agents: nextPrompt.agents,
1296+
},
1297+
})
1298+
for (const text of nextPrompt.synthetic) {
1299+
// TODO(v2): Temporary dual-write while migrating session messages to v2 events.
1300+
SyncEvent.run(SessionEvent.Synthetic.Sync, {
1301+
sessionID: input.sessionID,
1302+
timestamp: DateTime.makeUnsafe(info.time.created),
1303+
text,
1304+
})
1305+
}
12391306

12401307
return { info, parts }
12411308
}, Effect.scoped)

packages/opencode/src/session/session.sql.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { Timestamps } from "../storage/schema.sql"
1111

1212
type PartData = Omit<MessageV2.Part, "id" | "sessionID" | "messageID">
1313
type InfoData = Omit<MessageV2.Info, "id" | "sessionID">
14+
type SessionMessageData = Omit<(typeof SessionMessage.Message)["Encoded"], "type" | "id">
1415

1516
export const SessionTable = sqliteTable(
1617
"session",
@@ -105,7 +106,7 @@ export const SessionMessageTable = sqliteTable(
105106
.references(() => SessionTable.id, { onDelete: "cascade" }),
106107
type: text().$type<SessionMessage.Type>().notNull(),
107108
...Timestamps,
108-
data: text({ mode: "json" }).notNull().$type<Omit<SessionMessage.Message, "type" | "id">>(),
109+
data: text({ mode: "json" }).notNull().$type<SessionMessageData>(),
109110
},
110111
(table) => [
111112
index("session_message_session_idx").on(table.session_id),

packages/opencode/src/v2/session-message-updater.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,16 @@ export function update<Result>(adapter: Adapter<Result>, event: SessionEvent.Eve
121121
)
122122
}
123123
},
124-
"session.next.text.ended": () => {},
124+
"session.next.text.ended": (event) => {
125+
if (currentAssistant) {
126+
adapter.updateAssistant(
127+
produce(currentAssistant, (draft) => {
128+
const match = latestText(draft)
129+
if (match) match.text = event.data.text
130+
}),
131+
)
132+
}
133+
},
125134
"session.next.tool.input.started": (event) => {
126135
if (currentAssistant) {
127136
adapter.updateAssistant(

0 commit comments

Comments
 (0)