Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/shared/src/typed-event-emitter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,31 @@ describe("TypedEventEmitter", () => {
expect(seen).toEqual(["a", "b", "a"]);
});

it("emit does not surface a rejecting async listener as an unhandled rejection", async () => {
const e = new TypedEventEmitter<Events>();
const unhandled = vi.fn();
process.on("unhandledRejection", unhandled);
try {
// An async listener whose body rejects (here: throws synchronously before
// its first await, the shape of the DB-not-initialized bug this guards).
e.on("data", async () => {
throw new Error("listener boom");
});
const other = vi.fn();
e.on("data", other);

expect(e.emit("data", { value: 1 })).toBe(true);
// Later listeners still run despite the earlier one rejecting.
expect(other).toHaveBeenCalledWith({ value: 1 });

// Give the microtask queue and the unhandledRejection task a chance.
await new Promise((resolve) => setTimeout(resolve, 0));
expect(unhandled).not.toHaveBeenCalled();
} finally {
process.off("unhandledRejection", unhandled);
}
});

it("toIterable yields events that arrive while awaiting", async () => {
const e = new TypedEventEmitter<Events>();
const result = collect(e.toIterable("data"), 2);
Expand Down
14 changes: 13 additions & 1 deletion packages/shared/src/typed-event-emitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,19 @@ export class TypedEventEmitter<TEvents> {
return false;
}
for (const record of [...records]) {
record.fn(payload);
// An `async` listener returns a promise; if it rejects (or throws
// synchronously before its first await) the rejection would otherwise
// escape this fire-and-forget call and surface as an unhandled rejection.
// Swallow it here so one misbehaving listener can never crash the process
// or pollute error tracking — listeners that care must handle their own
// errors.
const result = record.fn(payload) as unknown;
if (
result != null &&
typeof (result as { then?: unknown }).then === "function"
) {
(result as Promise<unknown>).then(undefined, () => {});
}
}
return true;
}
Expand Down
10 changes: 10 additions & 0 deletions packages/workspace-server/src/db/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ export class DatabaseService {
return this._db;
}

/**
* Whether the database is ready to be read or written. False during the
* startup window before `initialize()` runs and after `close()` tears the
* connection down (`@preDestroy`). Callers on best-effort paths use this to
* bail gracefully instead of letting the `db` getter throw.
*/
isInitialized(): boolean {
return this._db !== null;
}

@postConstruct()
initialize(): void {
const dbPath = path.join(this.storagePaths.appDataPath, "posthog-code.db");
Expand Down
20 changes: 20 additions & 0 deletions packages/workspace-server/src/services/workspace/workspace.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { createMockRepositoryRepository } from "../../db/repositories/repository-repository.mock";
import { createMockWorkspaceRepository } from "../../db/repositories/workspace-repository.mock";
import { createMockWorktreeRepository } from "../../db/repositories/worktree-repository.mock";
import type { DatabaseService } from "../../db/service";
import type { ProcessTrackingService } from "../process-tracking/process-tracking";
import type { SuspensionService } from "../suspension/suspension";
import { listLinkedWorktrees } from "../worktree-query/worktree-query";
Expand Down Expand Up @@ -72,6 +73,9 @@ vi.mock("@posthog/git/worktree", () => ({
}));

function createMocks() {
const databaseService = {
isInitialized: vi.fn(() => true),
} as unknown as DatabaseService;
const agent = {
cancelSessionsByTaskId: vi.fn(async () => {}),
onAgentFileActivity: vi.fn(),
Expand Down Expand Up @@ -113,6 +117,7 @@ function createMocks() {
};

return {
databaseService,
agent,
processTracking,
repositoryRepo,
Expand Down Expand Up @@ -153,6 +158,7 @@ function seedWorktreeTask(

function makeService(mocks: ReturnType<typeof createMocks>): WorkspaceService {
return new WorkspaceService(
mocks.databaseService,
mocks.agent,
mocks.processTracking,
mocks.repositoryRepo,
Expand Down Expand Up @@ -292,6 +298,20 @@ describe("WorkspaceService", () => {
expect(mocks.focus.onBranchRenamed).toHaveBeenCalledTimes(1);
expect(mocks.agent.onAgentFileActivity).toHaveBeenCalledTimes(1);
});

it("agent file activity bails without touching the db when it is not initialized", async () => {
vi.mocked(mocks.databaseService.isInitialized).mockReturnValue(false);
const findByTaskId = vi.spyOn(mocks.workspaceRepo, "findByTaskId");
service.initBranchWatcher();
const handler = vi.mocked(mocks.agent.onAgentFileActivity).mock
.calls[0][0];

await (handler({
taskId: "task-1",
branchName: "feature/x",
}) as unknown as Promise<void>);
expect(findByTaskId).not.toHaveBeenCalled();
});
});

describe("checkWorktreeBranch", () => {
Expand Down
10 changes: 10 additions & 0 deletions packages/workspace-server/src/services/workspace/workspace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import {
import { ANALYTICS_EVENTS, TypedEventEmitter } from "@posthog/shared";
import { inject, injectable } from "inversify";
import {
DATABASE_SERVICE,
REPOSITORY_REPOSITORY,
WORKSPACE_REPOSITORY,
WORKTREE_REPOSITORY,
} from "../../db/identifiers";
import type { IRepositoryRepository } from "../../db/repositories/repository-repository";
import type { IWorkspaceRepository } from "../../db/repositories/workspace-repository";
import type { IWorktreeRepository } from "../../db/repositories/worktree-repository";
import type { DatabaseService } from "../../db/service";
import {
IMPORTED_SESSION_CLEANER,
type ImportedSessionCleaner,
Expand Down Expand Up @@ -124,6 +126,8 @@ export class WorkspaceService extends TypedEventEmitter<WorkspaceServiceEvents>
private readonly log: ScopedLogger;

constructor(
@inject(DATABASE_SERVICE)
private readonly databaseService: DatabaseService,
@inject(WORKSPACE_AGENT)
private readonly agent: WorkspaceAgent,
@inject(PROCESS_TRACKING_SERVICE)
Expand Down Expand Up @@ -326,6 +330,12 @@ export class WorkspaceService extends TypedEventEmitter<WorkspaceServiceEvents>
}): Promise<void> {
if (!branchName) return;

// This runs from a fire-and-forget emit on the agent side, so it can land
// during the startup/teardown window when the DB is closed or not yet
// initialized. Bail gracefully — branch association is best-effort — rather
// than letting the synchronous repo read throw into an unhandled rejection.
if (!this.databaseService.isInitialized()) return;

const dbRow = this.workspaceRepo.findByTaskId(taskId);
if (!dbRow || dbRow.mode !== "local") return;
if (!dbRow.repositoryId) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { createMockRepositoryRepository } from "../../db/repositories/repository-repository.mock";
import { createMockWorkspaceRepository } from "../../db/repositories/workspace-repository.mock";
import { createMockWorktreeRepository } from "../../db/repositories/worktree-repository.mock";
import type { DatabaseService } from "../../db/service";
import type { ProcessTrackingService } from "../process-tracking/process-tracking";
import type { SuspensionService } from "../suspension/suspension";
import type {
Expand Down Expand Up @@ -39,6 +40,7 @@ function createService(worktreeBasePath: string) {
};

const service = new WorkspaceService(
{ isInitialized: vi.fn(() => true) } as unknown as DatabaseService,
{
cancelSessionsByTaskId: vi.fn(async () => {}),
onAgentFileActivity: vi.fn(),
Expand Down
Loading