Skip to content

Commit c50d65b

Browse files
authored
refactor(sync): make session events schema-first (#24019)
1 parent 353532b commit c50d65b

9 files changed

Lines changed: 141 additions & 81 deletions

File tree

packages/opencode/src/bus/index.ts

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import z from "zod"
2-
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
2+
import { Effect, Exit, Layer, PubSub, Scope, Context, Stream, Schema as EffectSchema, Types } from "effect"
33
import { EffectBridge } from "@/effect"
44
import { Log } from "../util"
55
import { BusEvent } from "./bus-event"
@@ -9,6 +9,12 @@ import { makeRuntime } from "@/effect/run-service"
99

1010
const log = Log.create({ service: "bus" })
1111

12+
type BusProperties<D extends BusEvent.Definition = BusEvent.Definition> = D extends {
13+
effectProperties: infer Properties extends EffectSchema.Top
14+
}
15+
? Types.DeepMutable<EffectSchema.Schema.Type<Properties>>
16+
: z.infer<D["properties"]>
17+
1218
export const InstanceDisposed = BusEvent.define(
1319
"server.instance.disposed",
1420
z.object({
@@ -18,7 +24,7 @@ export const InstanceDisposed = BusEvent.define(
1824

1925
type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
2026
type: D["type"]
21-
properties: z.infer<D["properties"]>
27+
properties: BusProperties<D>
2228
}
2329

2430
type State = {
@@ -29,7 +35,7 @@ type State = {
2935
export interface Interface {
3036
readonly publish: <D extends BusEvent.Definition>(
3137
def: D,
32-
properties: z.output<D["properties"]>,
38+
properties: BusProperties<D>,
3339
) => Effect.Effect<void>
3440
readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
3541
readonly subscribeAll: () => Stream.Stream<Payload>
@@ -79,7 +85,10 @@ export const layer = Layer.effect(
7985
})
8086
}
8187

82-
function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
88+
function publish<D extends BusEvent.Definition>(
89+
def: D,
90+
properties: BusProperties<D>,
91+
) {
8392
return Effect.gen(function* () {
8493
const s = yield* InstanceState.get(state)
8594
const payload: Payload = { type: def.type, properties }
@@ -175,13 +184,16 @@ const { runPromise, runSync } = makeRuntime(Service, layer)
175184

176185
// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
177186
// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
178-
export async function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
187+
export async function publish<D extends BusEvent.Definition>(
188+
def: D,
189+
properties: BusProperties<D>,
190+
) {
179191
return runPromise((svc) => svc.publish(def, properties))
180192
}
181193

182194
export function subscribe<D extends BusEvent.Definition>(
183195
def: D,
184-
callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => unknown,
196+
callback: (event: Payload<D>) => unknown,
185197
) {
186198
return runSync((svc) => svc.subscribeCallback(def, callback))
187199
}

packages/opencode/src/server/projectors.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import z from "zod"
21
import sessionProjectors from "../session/projectors"
32
import { SyncEvent } from "@/sync"
43
import { Session } from "@/session"
@@ -10,7 +9,7 @@ export function initProjectors() {
109
projectors: sessionProjectors,
1110
convertEvent: (type, data) => {
1211
if (type === "session.updated") {
13-
const id = (data as z.infer<typeof Session.Event.Updated.schema>).sessionID
12+
const id = (data as SyncEvent.Event<typeof Session.Event.Updated>["data"]).sessionID
1413
const row = Database.use((db) => db.select().from(SessionTable).where(eq(SessionTable.id, id)).get())
1514

1615
if (!row) return data

packages/opencode/src/session/message-v2.ts

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -576,34 +576,46 @@ export const Info = Object.assign(_Info, {
576576
})
577577
export type Info = User | Assistant
578578

579+
const UpdatedEventSchema = Schema.Struct({
580+
sessionID: SessionID,
581+
info: _Info,
582+
})
583+
584+
const RemovedEventSchema = Schema.Struct({
585+
sessionID: SessionID,
586+
messageID: MessageID,
587+
})
588+
589+
const PartUpdatedEventSchema = Schema.Struct({
590+
sessionID: SessionID,
591+
part: _Part,
592+
time: Schema.Number,
593+
})
594+
595+
const PartRemovedEventSchema = Schema.Struct({
596+
sessionID: SessionID,
597+
messageID: MessageID,
598+
partID: PartID,
599+
})
600+
579601
export const Event = {
580602
Updated: SyncEvent.define({
581603
type: "message.updated",
582604
version: 1,
583605
aggregate: "sessionID",
584-
schema: z.object({
585-
sessionID: SessionID.zod,
586-
info: Info.zod,
587-
}),
606+
schema: UpdatedEventSchema,
588607
}),
589608
Removed: SyncEvent.define({
590609
type: "message.removed",
591610
version: 1,
592611
aggregate: "sessionID",
593-
schema: z.object({
594-
sessionID: SessionID.zod,
595-
messageID: MessageID.zod,
596-
}),
612+
schema: RemovedEventSchema,
597613
}),
598614
PartUpdated: SyncEvent.define({
599615
type: "message.part.updated",
600616
version: 1,
601617
aggregate: "sessionID",
602-
schema: z.object({
603-
sessionID: SessionID.zod,
604-
part: Part.zod,
605-
time: z.number(),
606-
}),
618+
schema: PartUpdatedEventSchema,
607619
}),
608620
PartDelta: BusEvent.define(
609621
"message.part.delta",
@@ -619,11 +631,7 @@ export const Event = {
619631
type: "message.part.removed",
620632
version: 1,
621633
aggregate: "sessionID",
622-
schema: z.object({
623-
sessionID: SessionID.zod,
624-
messageID: MessageID.zod,
625-
partID: PartID.zod,
626-
}),
634+
schema: PartRemovedEventSchema,
627635
}),
628636
}
629637

packages/opencode/src/session/projectors.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ export default [
7171
const info = data.info
7272
const row = db
7373
.update(SessionTable)
74-
.set(toPartialRow(info))
74+
.set(toPartialRow(info as Session.Patch))
7575
.where(eq(SessionTable.id, data.sessionID))
7676
.returning()
7777
.get()

packages/opencode/src/session/session.ts

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import { PartTable, SessionTable } from "./session.sql"
1515
import { ProjectTable } from "../project/project.sql"
1616
import { Storage } from "@/storage"
1717
import { Log } from "../util"
18-
import { updateSchema } from "../util/update-schema"
1918
import { MessageV2 } from "./message-v2"
2019
import { Instance } from "../project/instance"
2120
import { InstanceState } from "@/effect"
@@ -28,7 +27,7 @@ import type { Provider } from "@/provider"
2827
import { Permission } from "@/permission"
2928
import { Global } from "@/global"
3029
import { Effect, Layer, Option, Context, Schema, Types } from "effect"
31-
import { zod, zodObject } from "@/util/effect-zod"
30+
import { zod } from "@/util/effect-zod"
3231
import { withStatics } from "@/util/schema"
3332

3433
const log = Log.create({ service: "session" })
@@ -215,40 +214,62 @@ export const MessagesInput = Schema.Struct({
215214
limit: Schema.optional(Schema.Number),
216215
}).pipe(withStatics((s) => ({ zod: zod(s) })))
217216

217+
const CreatedEventSchema = Schema.Struct({
218+
sessionID: SessionID,
219+
info: Info,
220+
})
221+
222+
const UpdatedShare = Schema.Struct({
223+
url: Schema.optional(Schema.NullOr(Schema.String)),
224+
})
225+
226+
const UpdatedTime = Schema.Struct({
227+
created: Schema.optional(Schema.NullOr(Schema.Number)),
228+
updated: Schema.optional(Schema.NullOr(Schema.Number)),
229+
compacting: Schema.optional(Schema.NullOr(Schema.Number)),
230+
archived: Schema.optional(Schema.NullOr(Schema.Number)),
231+
})
232+
233+
const UpdatedInfo = Schema.Struct({
234+
id: Schema.optional(Schema.NullOr(SessionID)),
235+
slug: Schema.optional(Schema.NullOr(Schema.String)),
236+
projectID: Schema.optional(Schema.NullOr(ProjectID)),
237+
workspaceID: Schema.optional(Schema.NullOr(WorkspaceID)),
238+
directory: Schema.optional(Schema.NullOr(Schema.String)),
239+
parentID: Schema.optional(Schema.NullOr(SessionID)),
240+
summary: Schema.optional(Schema.NullOr(Summary)),
241+
share: Schema.optional(UpdatedShare),
242+
title: Schema.optional(Schema.NullOr(Schema.String)),
243+
version: Schema.optional(Schema.NullOr(Schema.String)),
244+
time: Schema.optional(UpdatedTime),
245+
permission: Schema.optional(Schema.NullOr(Permission.Ruleset)),
246+
revert: Schema.optional(Schema.NullOr(Revert)),
247+
})
248+
249+
const UpdatedEventSchema = Schema.Struct({
250+
sessionID: SessionID,
251+
info: UpdatedInfo,
252+
})
253+
218254
export const Event = {
219255
Created: SyncEvent.define({
220256
type: "session.created",
221257
version: 1,
222258
aggregate: "sessionID",
223-
schema: z.object({
224-
sessionID: SessionID.zod,
225-
info: Info.zod,
226-
}),
259+
schema: CreatedEventSchema,
227260
}),
228261
Updated: SyncEvent.define({
229262
type: "session.updated",
230263
version: 1,
231264
aggregate: "sessionID",
232-
schema: z.object({
233-
sessionID: SessionID.zod,
234-
info: updateSchema(zodObject(Info)).extend({
235-
share: updateSchema(zodObject(Share)).optional(),
236-
time: updateSchema(zodObject(Time)).optional(),
237-
}),
238-
}),
239-
busSchema: z.object({
240-
sessionID: SessionID.zod,
241-
info: Info.zod,
242-
}),
265+
schema: UpdatedEventSchema,
266+
busSchema: CreatedEventSchema,
243267
}),
244268
Deleted: SyncEvent.define({
245269
type: "session.deleted",
246270
version: 1,
247271
aggregate: "sessionID",
248-
schema: z.object({
249-
sessionID: SessionID.zod,
250-
info: Info.zod,
251-
}),
272+
schema: CreatedEventSchema,
252273
}),
253274
Diff: BusEvent.define(
254275
"session.diff",
@@ -394,7 +415,7 @@ export interface Interface {
394415

395416
export class Service extends Context.Service<Service, Interface>()("@opencode/Session") {}
396417

397-
type Patch = z.infer<typeof Event.Updated.schema>["info"]
418+
export type Patch = Types.DeepMutable<SyncEvent.Event<typeof Event.Updated>["data"]["info"]>
398419

399420
const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
400421
Effect.sync(() => Database.use(fn))

packages/opencode/src/share/share-next.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ export const layer = Layer.effect(
181181

182182
yield* watch(Session.Event.Updated, (evt) =>
183183
Effect.gen(function* () {
184-
const info = yield* session.get(evt.properties.sessionID)
184+
const info = evt.properties.info
185185
yield* sync(info.id, [{ type: "session", data: info }])
186186
}),
187187
)

packages/opencode/src/sync/index.ts

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import z from "zod"
2-
import type { ZodObject } from "zod"
32
import { Database, eq } from "@/storage"
43
import { GlobalBus } from "@/bus/global"
54
import { Bus as ProjectBus } from "@/bus"
@@ -9,11 +8,16 @@ import { EventSequenceTable, EventTable } from "./event.sql"
98
import { WorkspaceContext } from "@/control-plane/workspace-context"
109
import { EventID } from "./schema"
1110
import { Flag } from "@/flag/flag"
11+
import { Schema as EffectSchema, Types } from "effect"
12+
import { zodObject } from "@/util/effect-zod"
13+
import { isRecord } from "@/util/record"
1214

13-
export type Definition = {
15+
export type Definition<Schema extends EffectSchema.Top = EffectSchema.Top, BusSchema extends EffectSchema.Top = Schema> = {
1416
type: string
1517
version: number
1618
aggregate: string
19+
effectSchema: Schema
20+
effectProperties: BusSchema
1721
schema: z.ZodObject
1822

1923
// This is temporary and only exists for compatibility with bus
@@ -25,9 +29,13 @@ export type Event<Def extends Definition = Definition> = {
2529
id: string
2630
seq: number
2731
aggregateID: string
28-
data: z.infer<Def["schema"]>
32+
data: Types.DeepMutable<EffectSchema.Schema.Type<Def["effectSchema"]>>
2933
}
3034

35+
export type Properties<Def extends Definition = Definition> = Types.DeepMutable<
36+
EffectSchema.Schema.Type<Def["effectProperties"]>
37+
>
38+
3139
export type SerializedEvent<Def extends Definition = Definition> = Event<Def> & { type: string }
3240

3341
type ProjectorFunc = (db: Database.TxOrDb, data: unknown) => void
@@ -36,7 +44,12 @@ export const registry = new Map<string, Definition>()
3644
let projectors: Map<Definition, ProjectorFunc> | undefined
3745
const versions = new Map<string, number>()
3846
let frozen = false
39-
let convertEvent: (type: string, event: Event["data"]) => Promise<Record<string, unknown>> | Record<string, unknown>
47+
let convertEvent: (type: string, event: Event["data"]) => Promise<unknown> | unknown
48+
49+
function asRecord(input: unknown) {
50+
if (isRecord(input)) return input
51+
throw new Error(`SyncEvent.convertEvent must return an object, got: ${JSON.stringify(input)}`)
52+
}
4053

4154
export function reset() {
4255
frozen = false
@@ -54,7 +67,7 @@ export function init(input: { projectors: Array<[Definition, ProjectorFunc]>; co
5467
for (let [type, version] of versions.entries()) {
5568
let def = registry.get(versionedType(type, version))!
5669

57-
BusEvent.define(def.type, def.properties || def.schema)
70+
BusEvent.define(def.type, def.properties)
5871
}
5972

6073
// Freeze the system so it clearly errors if events are defined
@@ -72,19 +85,26 @@ export function versionedType(type: string, version?: number) {
7285
export function define<
7386
Type extends string,
7487
Agg extends string,
75-
Schema extends ZodObject<Record<Agg, z.ZodType<string>>>,
76-
BusSchema extends ZodObject = Schema,
77-
>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }) {
88+
Schema extends EffectSchema.Top,
89+
BusSchema extends EffectSchema.Top = Schema,
90+
>(input: { type: Type; version: number; aggregate: Agg; schema: Schema; busSchema?: BusSchema }): Definition<
91+
Schema,
92+
BusSchema
93+
> {
7894
if (frozen) {
7995
throw new Error("Error defining sync event: sync system has been frozen")
8096
}
8197

98+
const effectProperties = (input.busSchema ?? input.schema) as BusSchema
99+
82100
const def = {
83101
type: input.type,
84102
version: input.version,
85103
aggregate: input.aggregate,
86-
schema: input.schema,
87-
properties: input.busSchema ? input.busSchema : input.schema,
104+
effectSchema: input.schema,
105+
effectProperties,
106+
schema: zodObject(input.schema),
107+
properties: zodObject(effectProperties),
88108
}
89109

90110
versions.set(def.type, Math.max(def.version, versions.get(def.type) || 0))
@@ -143,10 +163,10 @@ function process<Def extends Definition>(def: Def, event: Event<Def>, options: {
143163
const result = convertEvent(def.type, event.data)
144164
if (result instanceof Promise) {
145165
void result.then((data) => {
146-
void ProjectBus.publish({ type: def.type, properties: def.schema }, data)
166+
void ProjectBus.publish({ type: def.type, properties: def.properties }, asRecord(data))
147167
})
148168
} else {
149-
void ProjectBus.publish({ type: def.type, properties: def.schema }, result)
169+
void ProjectBus.publish({ type: def.type, properties: def.properties }, asRecord(result))
150170
}
151171

152172
GlobalBus.emit("event", {

0 commit comments

Comments
 (0)