Skip to content

Commit 1554e24

Browse files
committed
feat(session): project next session events
1 parent f75bff5 commit 1554e24

4 files changed

Lines changed: 218 additions & 5 deletions

File tree

packages/core/src/util/log.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
export * as Log from "./log"
2+
13
import path from "path"
24
import fs from "fs/promises"
35
import { createWriteStream } from "fs"

packages/opencode/src/session/processor.ts

Lines changed: 112 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import { Question } from "@/question"
2020
import { errorMessage } from "@/util/error"
2121
import * as Log from "@opencode-ai/core/util/log"
2222
import { isRecord } from "@/util/record"
23+
import { SyncEvent } from "@/sync"
24+
import { SessionEvent } from "@/v2/session-event"
25+
import * as DateTime from "effect/DateTime"
2326

2427
const DOOM_LOOP_THRESHOLD = 3
2528
const log = Log.create({ service: "session.processor" })
@@ -221,6 +224,10 @@ export const layer: Layer.Layer<
221224

222225
case "reasoning-start":
223226
if (value.id in ctx.reasoningMap) return
227+
SyncEvent.run(SessionEvent.Reasoning.Started.Sync, {
228+
sessionID: ctx.sessionID,
229+
timestamp: DateTime.makeUnsafe(Date.now()),
230+
})
224231
ctx.reasoningMap[value.id] = {
225232
id: PartID.ascending(),
226233
messageID: ctx.assistantMessage.id,
@@ -248,6 +255,11 @@ export const layer: Layer.Layer<
248255

249256
case "reasoning-end":
250257
if (!(value.id in ctx.reasoningMap)) return
258+
SyncEvent.run(SessionEvent.Reasoning.Ended.Sync, {
259+
sessionID: ctx.sessionID,
260+
text: ctx.reasoningMap[value.id].text,
261+
timestamp: DateTime.makeUnsafe(Date.now()),
262+
})
251263
// oxlint-disable-next-line no-self-assign -- reactivity trigger
252264
ctx.reasoningMap[value.id].text = ctx.reasoningMap[value.id].text
253265
ctx.reasoningMap[value.id].time = { ...ctx.reasoningMap[value.id].time, end: Date.now() }
@@ -260,6 +272,12 @@ export const layer: Layer.Layer<
260272
if (ctx.assistantMessage.summary) {
261273
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
262274
}
275+
SyncEvent.run(SessionEvent.Tool.Input.Started.Sync, {
276+
sessionID: ctx.sessionID,
277+
callID: value.id,
278+
name: value.toolName,
279+
timestamp: DateTime.makeUnsafe(Date.now()),
280+
})
263281
const part = yield* session.updatePart({
264282
id: ctx.toolcalls[value.id]?.partID ?? PartID.ascending(),
265283
messageID: ctx.assistantMessage.id,
@@ -281,13 +299,32 @@ export const layer: Layer.Layer<
281299
case "tool-input-delta":
282300
return
283301

284-
case "tool-input-end":
302+
case "tool-input-end": {
303+
SyncEvent.run(SessionEvent.Tool.Input.Ended.Sync, {
304+
sessionID: ctx.sessionID,
305+
callID: value.id,
306+
text: "",
307+
timestamp: DateTime.makeUnsafe(Date.now()),
308+
})
285309
return
310+
}
286311

287312
case "tool-call": {
288313
if (ctx.assistantMessage.summary) {
289314
throw new Error(`Tool call not allowed while generating summary: ${value.toolName}`)
290315
}
316+
const toolCall = yield* readToolCall(value.toolCallId)
317+
SyncEvent.run(SessionEvent.Tool.Called.Sync, {
318+
sessionID: ctx.sessionID,
319+
callID: value.toolCallId,
320+
tool: value.toolName,
321+
input: value.input,
322+
provider: {
323+
executed: toolCall?.part.metadata?.providerExecuted === true,
324+
...(value.providerMetadata ? { metadata: value.providerMetadata } : {}),
325+
},
326+
timestamp: DateTime.makeUnsafe(Date.now()),
327+
})
291328
yield* updateToolCall(value.toolCallId, (match) => ({
292329
...match,
293330
tool: value.toolName,
@@ -331,11 +368,47 @@ export const layer: Layer.Layer<
331368
}
332369

333370
case "tool-result": {
371+
const toolCall = yield* readToolCall(value.toolCallId)
372+
SyncEvent.run(SessionEvent.Tool.Success.Sync, {
373+
sessionID: ctx.sessionID,
374+
callID: value.toolCallId,
375+
title: value.output.title,
376+
output: value.output.output,
377+
attachments: value.output.attachments?.map((item: MessageV2.FilePart) => ({
378+
uri: item.url,
379+
mime: item.mime,
380+
...(item.filename ? { name: item.filename } : {}),
381+
...(item.source
382+
? {
383+
source: {
384+
start: item.source.text.start,
385+
end: item.source.text.end,
386+
text: item.source.text.value,
387+
},
388+
}
389+
: {}),
390+
})),
391+
provider: {
392+
executed: toolCall?.part.metadata?.providerExecuted === true,
393+
metadata: value.output.metadata,
394+
},
395+
timestamp: DateTime.makeUnsafe(Date.now()),
396+
})
334397
yield* completeToolCall(value.toolCallId, value.output)
335398
return
336399
}
337400

338401
case "tool-error": {
402+
const toolCall = yield* readToolCall(value.toolCallId)
403+
SyncEvent.run(SessionEvent.Tool.Error.Sync, {
404+
sessionID: ctx.sessionID,
405+
callID: value.toolCallId,
406+
error: errorMessage(value.error),
407+
provider: {
408+
executed: toolCall?.part.metadata?.providerExecuted === true,
409+
},
410+
timestamp: DateTime.makeUnsafe(Date.now()),
411+
})
339412
yield* failToolCall(value.toolCallId, value.error)
340413
return
341414
}
@@ -345,6 +418,15 @@ export const layer: Layer.Layer<
345418

346419
case "start-step":
347420
if (!ctx.snapshot) ctx.snapshot = yield* snapshot.track()
421+
SyncEvent.run(SessionEvent.Step.Started.Sync, {
422+
sessionID: ctx.sessionID,
423+
model: {
424+
id: ctx.model.id,
425+
providerID: ctx.model.providerID,
426+
variant: input.assistantMessage.variant,
427+
},
428+
timestamp: DateTime.makeUnsafe(Date.now()),
429+
})
348430
yield* session.updatePart({
349431
id: PartID.ascending(),
350432
messageID: ctx.assistantMessage.id,
@@ -360,6 +442,13 @@ export const layer: Layer.Layer<
360442
usage: value.usage,
361443
metadata: value.providerMetadata,
362444
})
445+
SyncEvent.run(SessionEvent.Step.Ended.Sync, {
446+
sessionID: ctx.sessionID,
447+
reason: value.finishReason,
448+
cost: usage.cost,
449+
tokens: usage.tokens,
450+
timestamp: DateTime.makeUnsafe(Date.now()),
451+
})
363452
ctx.assistantMessage.finish = value.finishReason
364453
ctx.assistantMessage.cost += usage.cost
365454
ctx.assistantMessage.tokens = usage.tokens
@@ -404,6 +493,10 @@ export const layer: Layer.Layer<
404493
}
405494

406495
case "text-start":
496+
SyncEvent.run(SessionEvent.Text.Started.Sync, {
497+
sessionID: ctx.sessionID,
498+
timestamp: DateTime.makeUnsafe(Date.now()),
499+
})
407500
ctx.currentText = {
408501
id: PartID.ascending(),
409502
messageID: ctx.assistantMessage.id,
@@ -442,6 +535,11 @@ export const layer: Layer.Layer<
442535
},
443536
{ text: ctx.currentText.text },
444537
)).text
538+
SyncEvent.run(SessionEvent.Text.Ended.Sync, {
539+
sessionID: ctx.sessionID,
540+
text: ctx.currentText.text,
541+
timestamp: DateTime.makeUnsafe(Date.now()),
542+
})
445543
{
446544
const end = Date.now()
447545
ctx.currentText.time = { start: ctx.currentText.time?.start ?? end, end }
@@ -568,13 +666,23 @@ export const layer: Layer.Layer<
568666
Effect.retry(
569667
SessionRetry.policy({
570668
parse,
571-
set: (info) =>
572-
status.set(ctx.sessionID, {
669+
set: (info) => {
670+
SyncEvent.run(SessionEvent.Retried.Sync, {
671+
sessionID: ctx.sessionID,
672+
attempt: info.attempt,
673+
error: {
674+
message: info.message,
675+
isRetryable: true,
676+
},
677+
timestamp: DateTime.makeUnsafe(Date.now()),
678+
})
679+
return status.set(ctx.sessionID, {
573680
type: "retry",
574681
attempt: info.attempt,
575682
message: info.message,
576683
next: info.next,
577-
}),
684+
})
685+
},
578686
}),
579687
),
580688
Effect.catch(halt),
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import { and, desc, eq } from "@/storage"
2+
import type { Database } from "@/storage"
3+
import { SessionEntry } from "@/v2/session-entry"
4+
import { SessionEntryStepper } from "@/v2/session-entry-stepper"
5+
import { SessionEvent } from "@/v2/session-event"
6+
import * as DateTime from "effect/DateTime"
7+
import { SyncEvent } from "@/sync"
8+
import { SessionEntryTable } from "./session.sql"
9+
import type { SessionID } from "./schema"
10+
11+
function sqlite(db: Database.TxOrDb, sessionID: SessionID): SessionEntryStepper.Adapter<void> {
12+
return {
13+
getCurrentAssistant() {
14+
return db
15+
.select()
16+
.from(SessionEntryTable)
17+
.where(and(eq(SessionEntryTable.session_id, sessionID), eq(SessionEntryTable.type, "assistant")))
18+
.orderBy(desc(SessionEntryTable.id))
19+
.all()
20+
.map((row) => ({ id: row.id, type: row.type, ...row.data }) as SessionEntry.Entry)
21+
.find((entry): entry is SessionEntry.Assistant => entry.type === "assistant" && !entry.time.completed)
22+
},
23+
updateAssistant(assistant) {
24+
const { id, type, ...data } = assistant
25+
db.update(SessionEntryTable)
26+
.set({ data })
27+
.where(and(eq(SessionEntryTable.id, id), eq(SessionEntryTable.session_id, sessionID), eq(SessionEntryTable.type, type)))
28+
.run()
29+
},
30+
appendEntry(entry) {
31+
const { id, type, ...data } = entry
32+
db.insert(SessionEntryTable)
33+
.values({
34+
id,
35+
session_id: sessionID,
36+
type,
37+
time_created: DateTime.toEpochMillis(entry.time.created),
38+
data,
39+
})
40+
.run()
41+
},
42+
appendPending() {},
43+
finish() {},
44+
}
45+
}
46+
47+
function step(db: Database.TxOrDb, event: SessionEvent.Event) {
48+
SessionEntryStepper.stepWith(sqlite(db, event.data.sessionID), event)
49+
}
50+
51+
export default [
52+
SyncEvent.project(SessionEvent.Prompted.Sync, (db, data) => {
53+
step(db, { type: "session.next.prompted", data })
54+
}),
55+
SyncEvent.project(SessionEvent.Synthetic.Sync, (db, data) => {
56+
step(db, { type: "session.next.synthetic", data })
57+
}),
58+
SyncEvent.project(SessionEvent.Step.Started.Sync, (db, data) => {
59+
step(db, { type: "session.next.step.started", data })
60+
}),
61+
SyncEvent.project(SessionEvent.Step.Ended.Sync, (db, data) => {
62+
step(db, { type: "session.next.step.ended", data })
63+
}),
64+
SyncEvent.project(SessionEvent.Text.Started.Sync, (db, data) => {
65+
step(db, { type: "session.next.text.started", data })
66+
}),
67+
SyncEvent.project(SessionEvent.Text.Delta.Sync, () => {}),
68+
SyncEvent.project(SessionEvent.Text.Ended.Sync, (db, data) => {
69+
step(db, { type: "session.next.text.ended", data })
70+
}),
71+
SyncEvent.project(SessionEvent.Tool.Input.Started.Sync, (db, data) => {
72+
step(db, { type: "session.next.tool.input.started", data })
73+
}),
74+
SyncEvent.project(SessionEvent.Tool.Input.Delta.Sync, () => {}),
75+
SyncEvent.project(SessionEvent.Tool.Input.Ended.Sync, (db, data) => {
76+
step(db, { type: "session.next.tool.input.ended", data })
77+
}),
78+
SyncEvent.project(SessionEvent.Tool.Called.Sync, (db, data) => {
79+
step(db, { type: "session.next.tool.called", data })
80+
}),
81+
SyncEvent.project(SessionEvent.Tool.Success.Sync, (db, data) => {
82+
step(db, { type: "session.next.tool.success", data })
83+
}),
84+
SyncEvent.project(SessionEvent.Tool.Error.Sync, (db, data) => {
85+
step(db, { type: "session.next.tool.error", data })
86+
}),
87+
SyncEvent.project(SessionEvent.Reasoning.Started.Sync, (db, data) => {
88+
step(db, { type: "session.next.reasoning.started", data })
89+
}),
90+
SyncEvent.project(SessionEvent.Reasoning.Delta.Sync, () => {}),
91+
SyncEvent.project(SessionEvent.Reasoning.Ended.Sync, (db, data) => {
92+
step(db, { type: "session.next.reasoning.ended", data })
93+
}),
94+
SyncEvent.project(SessionEvent.Retried.Sync, (db, data) => {
95+
step(db, { type: "session.next.retried", data })
96+
}),
97+
SyncEvent.project(SessionEvent.Compacted.Sync, (db, data) => {
98+
step(db, { type: "session.next.compacted", data })
99+
}),
100+
]

packages/opencode/src/session/projectors.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import { SyncEvent } from "@/sync"
55
import * as Session from "./session"
66
import { MessageV2 } from "./message-v2"
77
import { SessionTable, MessageTable, PartTable } from "./session.sql"
8-
import * as Log from "@opencode-ai/core/util/log"
8+
import { Log } from "@opencode-ai/core/util/log"
9+
import nextProjectors from "./projectors-next"
910

1011
const log = Log.create({ service: "session.projector" })
1112

@@ -136,4 +137,6 @@ export default [
136137
log.warn("ignored late part update", { partID: id, messageID, sessionID })
137138
}
138139
}),
140+
141+
...nextProjectors,
139142
]

0 commit comments

Comments
 (0)