diff --git a/packages/shared/src/typed-event-emitter.test.ts b/packages/shared/src/typed-event-emitter.test.ts index 7160e94a6..b60249f54 100644 --- a/packages/shared/src/typed-event-emitter.test.ts +++ b/packages/shared/src/typed-event-emitter.test.ts @@ -112,6 +112,31 @@ describe("TypedEventEmitter", () => { expect(seen).toEqual(["a", "b", "a"]); }); + it("emit does not surface a rejecting async listener as an unhandled rejection", async () => { + const e = new TypedEventEmitter(); + const unhandled = vi.fn(); + process.on("unhandledRejection", unhandled); + try { + // An async listener whose body rejects (here: throws synchronously before + // its first await, the shape of the DB-not-initialized bug this guards). + e.on("data", async () => { + throw new Error("listener boom"); + }); + const other = vi.fn(); + e.on("data", other); + + expect(e.emit("data", { value: 1 })).toBe(true); + // Later listeners still run despite the earlier one rejecting. + expect(other).toHaveBeenCalledWith({ value: 1 }); + + // Give the microtask queue and the unhandledRejection task a chance. + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(unhandled).not.toHaveBeenCalled(); + } finally { + process.off("unhandledRejection", unhandled); + } + }); + it("toIterable yields events that arrive while awaiting", async () => { const e = new TypedEventEmitter(); const result = collect(e.toIterable("data"), 2); diff --git a/packages/shared/src/typed-event-emitter.ts b/packages/shared/src/typed-event-emitter.ts index 333964ace..14ef8346c 100644 --- a/packages/shared/src/typed-event-emitter.ts +++ b/packages/shared/src/typed-event-emitter.ts @@ -155,7 +155,19 @@ export class TypedEventEmitter { return false; } for (const record of [...records]) { - record.fn(payload); + // An `async` listener returns a promise; if it rejects (or throws + // synchronously before its first await) the rejection would otherwise + // escape this fire-and-forget call and surface as an unhandled rejection. + // Swallow it here so one misbehaving listener can never crash the process + // or pollute error tracking — listeners that care must handle their own + // errors. + const result = record.fn(payload) as unknown; + if ( + result != null && + typeof (result as { then?: unknown }).then === "function" + ) { + (result as Promise).then(undefined, () => {}); + } } return true; } diff --git a/packages/workspace-server/src/db/service.ts b/packages/workspace-server/src/db/service.ts index dd1a0dfe5..075d88f32 100644 --- a/packages/workspace-server/src/db/service.ts +++ b/packages/workspace-server/src/db/service.ts @@ -32,6 +32,16 @@ export class DatabaseService { return this._db; } + /** + * Whether the database is ready to be read or written. False during the + * startup window before `initialize()` runs and after `close()` tears the + * connection down (`@preDestroy`). Callers on best-effort paths use this to + * bail gracefully instead of letting the `db` getter throw. + */ + isInitialized(): boolean { + return this._db !== null; + } + @postConstruct() initialize(): void { const dbPath = path.join(this.storagePaths.appDataPath, "posthog-code.db"); diff --git a/packages/workspace-server/src/services/workspace/workspace.test.ts b/packages/workspace-server/src/services/workspace/workspace.test.ts index d13d543f1..d450e55c8 100644 --- a/packages/workspace-server/src/services/workspace/workspace.test.ts +++ b/packages/workspace-server/src/services/workspace/workspace.test.ts @@ -16,6 +16,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { createMockRepositoryRepository } from "../../db/repositories/repository-repository.mock"; import { createMockWorkspaceRepository } from "../../db/repositories/workspace-repository.mock"; import { createMockWorktreeRepository } from "../../db/repositories/worktree-repository.mock"; +import type { DatabaseService } from "../../db/service"; import type { ProcessTrackingService } from "../process-tracking/process-tracking"; import type { SuspensionService } from "../suspension/suspension"; import { listLinkedWorktrees } from "../worktree-query/worktree-query"; @@ -72,6 +73,9 @@ vi.mock("@posthog/git/worktree", () => ({ })); function createMocks() { + const databaseService = { + isInitialized: vi.fn(() => true), + } as unknown as DatabaseService; const agent = { cancelSessionsByTaskId: vi.fn(async () => {}), onAgentFileActivity: vi.fn(), @@ -113,6 +117,7 @@ function createMocks() { }; return { + databaseService, agent, processTracking, repositoryRepo, @@ -153,6 +158,7 @@ function seedWorktreeTask( function makeService(mocks: ReturnType): WorkspaceService { return new WorkspaceService( + mocks.databaseService, mocks.agent, mocks.processTracking, mocks.repositoryRepo, @@ -292,6 +298,20 @@ describe("WorkspaceService", () => { expect(mocks.focus.onBranchRenamed).toHaveBeenCalledTimes(1); expect(mocks.agent.onAgentFileActivity).toHaveBeenCalledTimes(1); }); + + it("agent file activity bails without touching the db when it is not initialized", async () => { + vi.mocked(mocks.databaseService.isInitialized).mockReturnValue(false); + const findByTaskId = vi.spyOn(mocks.workspaceRepo, "findByTaskId"); + service.initBranchWatcher(); + const handler = vi.mocked(mocks.agent.onAgentFileActivity).mock + .calls[0][0]; + + await (handler({ + taskId: "task-1", + branchName: "feature/x", + }) as unknown as Promise); + expect(findByTaskId).not.toHaveBeenCalled(); + }); }); describe("checkWorktreeBranch", () => { diff --git a/packages/workspace-server/src/services/workspace/workspace.ts b/packages/workspace-server/src/services/workspace/workspace.ts index 299884009..34cc50f25 100644 --- a/packages/workspace-server/src/services/workspace/workspace.ts +++ b/packages/workspace-server/src/services/workspace/workspace.ts @@ -27,6 +27,7 @@ import { import { ANALYTICS_EVENTS, TypedEventEmitter } from "@posthog/shared"; import { inject, injectable } from "inversify"; import { + DATABASE_SERVICE, REPOSITORY_REPOSITORY, WORKSPACE_REPOSITORY, WORKTREE_REPOSITORY, @@ -34,6 +35,7 @@ import { import type { IRepositoryRepository } from "../../db/repositories/repository-repository"; import type { IWorkspaceRepository } from "../../db/repositories/workspace-repository"; import type { IWorktreeRepository } from "../../db/repositories/worktree-repository"; +import type { DatabaseService } from "../../db/service"; import { IMPORTED_SESSION_CLEANER, type ImportedSessionCleaner, @@ -124,6 +126,8 @@ export class WorkspaceService extends TypedEventEmitter private readonly log: ScopedLogger; constructor( + @inject(DATABASE_SERVICE) + private readonly databaseService: DatabaseService, @inject(WORKSPACE_AGENT) private readonly agent: WorkspaceAgent, @inject(PROCESS_TRACKING_SERVICE) @@ -326,6 +330,12 @@ export class WorkspaceService extends TypedEventEmitter }): Promise { if (!branchName) return; + // This runs from a fire-and-forget emit on the agent side, so it can land + // during the startup/teardown window when the DB is closed or not yet + // initialized. Bail gracefully — branch association is best-effort — rather + // than letting the synchronous repo read throw into an unhandled rejection. + if (!this.databaseService.isInitialized()) return; + const dbRow = this.workspaceRepo.findByTaskId(taskId); if (!dbRow || dbRow.mode !== "local") return; if (!dbRow.repositoryId) return; diff --git a/packages/workspace-server/src/services/workspace/workspace.verify.test.ts b/packages/workspace-server/src/services/workspace/workspace.verify.test.ts index 67b45244f..a919a9bad 100644 --- a/packages/workspace-server/src/services/workspace/workspace.verify.test.ts +++ b/packages/workspace-server/src/services/workspace/workspace.verify.test.ts @@ -8,6 +8,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { createMockRepositoryRepository } from "../../db/repositories/repository-repository.mock"; import { createMockWorkspaceRepository } from "../../db/repositories/workspace-repository.mock"; import { createMockWorktreeRepository } from "../../db/repositories/worktree-repository.mock"; +import type { DatabaseService } from "../../db/service"; import type { ProcessTrackingService } from "../process-tracking/process-tracking"; import type { SuspensionService } from "../suspension/suspension"; import type { @@ -39,6 +40,7 @@ function createService(worktreeBasePath: string) { }; const service = new WorkspaceService( + { isInitialized: vi.fn(() => true) } as unknown as DatabaseService, { cancelSessionsByTaskId: vi.fn(async () => {}), onAgentFileActivity: vi.fn(),