Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 1 addition & 35 deletions packages/core/src/asyncContext/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import type { Carrier } from './../carrier';
import { getMainCarrier, getSentryCarrier } from './../carrier';
import type { Scope } from './../scope';
import { _setSpanForScope } from './../utils/spanOnScope';
import { getStackAsyncContextStrategy } from './stackStrategy';
import type { AsyncContextStrategy, TracingChannelBinding } from './types';
import type { AsyncContextStrategy } from './types';

/**
* @private Private API with no semver guarantees!
Expand Down Expand Up @@ -31,35 +29,3 @@ export function getAsyncContextStrategy(carrier: Carrier): AsyncContextStrategy
// Otherwise, use the default one (stack)
return getStackAsyncContextStrategy();
}

/**
* Get the runtime binding needed to connect tracing channels to async context.
*/
export function getTracingChannelBinding(): TracingChannelBinding | undefined {
return getAsyncContextStrategy(getMainCarrier()).getTracingChannelBinding?.();
}

/**
* Build the default {@link TracingChannelBinding} shared by AsyncLocalStorage-based strategies.
*
* The ALS instance is supplied by the caller (kept as `unknown`).
* The binding clones the current scope, plants the span on it, and reuses the existing isolation scope.
*
* The OpenTelemetry strategy does not use this: its store value is an OTel context, not a
* `{ scope, isolationScope }` pair.
*/
export function _INTERNAL_createTracingChannelBinding(
asyncLocalStorage: unknown,
getScopes: () => { scope: Scope; isolationScope: Scope },
): TracingChannelBinding {
return {
asyncLocalStorage,
getStoreWithActiveSpan: span => {
const { scope, isolationScope } = getScopes();
const activeScope = scope.clone();
_setSpanForScope(activeScope, span);

return { scope: activeScope, isolationScope };
},
};
}
59 changes: 59 additions & 0 deletions packages/core/src/asyncContext/tracing-channel-binding.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { getMainCarrier } from '../carrier';
import type { Scope } from '../scope';
import { _setSpanForScope } from '../utils/spanOnScope';
import { safeUnref } from '../utils/timer';
import { getAsyncContextStrategy } from './index';
import type { TracingChannelBinding } from './types';

/**
* Execute a callback whenever the tracing channel binding is available.
* If it is not available after retry, the callback is not executed.
*/
export function waitForTracingChannelBinding(callback: () => void, retries = 1): void {
const binding = getAsyncContextStrategy(getMainCarrier()).getTracingChannelBinding?.();

if (binding) {
callback();
return;
}

if (!retries) {
return;
}

// It is possible that the binding is not available yet when this is initially called
// This happens when users use a custom OTEL setup
// In this case, we wait for a tick and try again afterwards
// If it still fails, we bail and do nothing
// `safeUnref` so this retry timer never keeps the process alive on its own (Node server runtimes).
safeUnref(
setTimeout(() => {
waitForTracingChannelBinding(callback, retries - 1);
}, 1),
);
}

/**
* Build the default {@link TracingChannelBinding} shared by AsyncLocalStorage-based strategies.
*
* The ALS instance is supplied by the caller (kept as `unknown`).
* The binding clones the current scope, plants the span on it, and reuses the existing isolation scope.
*
* The OpenTelemetry strategy does not use this: its store value is an OTel context, not a
* `{ scope, isolationScope }` pair.
*/
export function _INTERNAL_createTracingChannelBinding(
asyncLocalStorage: unknown,
getScopes: () => { scope: Scope; isolationScope: Scope },
): TracingChannelBinding {
return {
asyncLocalStorage,
getStoreWithActiveSpan: span => {
const { scope, isolationScope } = getScopes();
const activeScope = scope.clone();
_setSpanForScope(activeScope, span);

return { scope: activeScope, isolationScope };
},
};
}
6 changes: 3 additions & 3 deletions packages/core/src/shared-exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ export {
hasExternalPropagationContext,
} from './currentScopes';
export { getDefaultCurrentScope, getDefaultIsolationScope } from './defaultScopes';
export { setAsyncContextStrategy, getAsyncContextStrategy } from './asyncContext';
export {
setAsyncContextStrategy,
getTracingChannelBinding as _INTERNAL_getTracingChannelBinding,
waitForTracingChannelBinding,
_INTERNAL_createTracingChannelBinding,
} from './asyncContext';
} from './asyncContext/tracing-channel-binding';
export { getGlobalSingleton, getMainCarrier } from './carrier';
export { makeSession, closeSession, updateSession } from './session';
export { Scope } from './scope';
Expand Down
123 changes: 123 additions & 0 deletions packages/core/test/lib/asyncContext/tracing-channel-binding.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { getAsyncContextStrategy, setAsyncContextStrategy } from '../../../src/asyncContext';
import { waitForTracingChannelBinding } from '../../../src/asyncContext/tracing-channel-binding';
import type { TracingChannelBinding } from '../../../src/asyncContext/types';
import { getMainCarrier } from '../../../src/carrier';

const FAKE_BINDING: TracingChannelBinding = {
asyncLocalStorage: {},
getStoreWithActiveSpan: () => ({}),
};

/** Install an async context strategy whose `getTracingChannelBinding` is driven by `provider`. */
function setBindingProvider(provider: (() => TracingChannelBinding | undefined) | undefined): void {
setAsyncContextStrategy({
...getAsyncContextStrategy(getMainCarrier()),
getTracingChannelBinding: provider,
});
}

describe('waitForTracingChannelBinding', () => {
beforeEach(() => {
vi.useFakeTimers();
setAsyncContextStrategy(undefined);
});

afterEach(() => {
setAsyncContextStrategy(undefined);
vi.useRealTimers();
});

it('runs the callback synchronously when the binding is already available', () => {
const getBinding = vi.fn(() => FAKE_BINDING);
setBindingProvider(getBinding);

const callback = vi.fn();
waitForTracingChannelBinding(callback);

expect(callback).toHaveBeenCalledTimes(1);
// Resolved on the first attempt, so no retry should be scheduled.
expect(getBinding).toHaveBeenCalledTimes(1);
vi.runAllTimers();
expect(getBinding).toHaveBeenCalledTimes(1);
expect(callback).toHaveBeenCalledTimes(1);
});

it('retries on the next tick and runs the callback once the binding becomes available', () => {
const getBinding = vi.fn<[], TracingChannelBinding | undefined>(() => FAKE_BINDING);
getBinding.mockReturnValueOnce(undefined);
setBindingProvider(getBinding);

const callback = vi.fn();
waitForTracingChannelBinding(callback);

// Not available on the first (synchronous) attempt.
expect(callback).not.toHaveBeenCalled();

vi.advanceTimersByTime(1);

expect(callback).toHaveBeenCalledTimes(1);
});

it('does not run the callback if the binding never becomes available (default single retry)', () => {
const getBinding = vi.fn(() => undefined);
setBindingProvider(getBinding);

const callback = vi.fn();
waitForTracingChannelBinding(callback);

expect(callback).not.toHaveBeenCalled();

vi.advanceTimersByTime(1);
expect(callback).not.toHaveBeenCalled();

// The single retry is exhausted — no further attempts are scheduled.
expect(getBinding).toHaveBeenCalledTimes(2);
vi.runAllTimers();
expect(getBinding).toHaveBeenCalledTimes(2);
expect(callback).not.toHaveBeenCalled();
});

it('does not retry when retries is 0', () => {
const getBinding = vi.fn(() => undefined);
setBindingProvider(getBinding);

const callback = vi.fn();
waitForTracingChannelBinding(callback, 0);

expect(callback).not.toHaveBeenCalled();
expect(getBinding).toHaveBeenCalledTimes(1);

// No retry is scheduled when no retries remain.
vi.runAllTimers();
expect(getBinding).toHaveBeenCalledTimes(1);
expect(callback).not.toHaveBeenCalled();
});

it('honors a custom retry count', () => {
const getBinding = vi.fn<[], TracingChannelBinding | undefined>(() => FAKE_BINDING);
getBinding.mockReturnValueOnce(undefined).mockReturnValueOnce(undefined).mockReturnValue(FAKE_BINDING);
setBindingProvider(getBinding);

const callback = vi.fn();
waitForTracingChannelBinding(callback, 2);

expect(callback).not.toHaveBeenCalled(); // attempt 1 (sync): undefined

vi.advanceTimersByTime(1);
expect(callback).not.toHaveBeenCalled(); // attempt 2: undefined

vi.advanceTimersByTime(1);
expect(callback).toHaveBeenCalledTimes(1); // attempt 3: available
});

it('does nothing when the strategy exposes no `getTracingChannelBinding`', () => {
// The default (stack) strategy has no tracing-channel binding support.
setAsyncContextStrategy(undefined);

const callback = vi.fn();
waitForTracingChannelBinding(callback, 0);

expect(callback).not.toHaveBeenCalled();
});
});
2 changes: 1 addition & 1 deletion packages/node-core/src/sdk/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ function _init(
initializeEsmLoader();
}

setOpenTelemetryContextAsyncContextStrategy();
setOpenTelemetryContextAsyncContextStrategy(options);

const scope = getCurrentScope();
scope.update(options.initialScope);
Expand Down
7 changes: 4 additions & 3 deletions packages/node/src/integrations/tracing/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
SEMANTIC_ATTRIBUTE_SENTRY_OP,
spanToJSON,
truncate,
waitForTracingChannelBinding,
} from '@sentry/core';
import * as dc from 'node:diagnostics_channel';
import { subscribeRedisDiagnosticChannels, type RedisTracingChannelFactory } from '@sentry/server-utils';
Expand Down Expand Up @@ -128,9 +129,9 @@ export const instrumentRedis = Object.assign(
// so defer to the next tick.
// Check this here to ensure this does not fail at runtime for Node <= 18.18.0
if (dc.tracingChannel) {
void Promise.resolve().then(() =>
subscribeRedisDiagnosticChannels(dc.tracingChannel as RedisTracingChannelFactory, cacheResponseHook),
);
waitForTracingChannelBinding(() => {
subscribeRedisDiagnosticChannels(dc.tracingChannel as RedisTracingChannelFactory, cacheResponseHook);
});
}

// todo: implement them gradually
Expand Down
28 changes: 5 additions & 23 deletions packages/opentelemetry/src/asyncContextStrategy.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as api from '@opentelemetry/api';
import type { Scope, Span, withActiveSpan as defaultWithActiveSpan } from '@sentry/core';
import type { Scope, TracingChannelBinding, withActiveSpan as defaultWithActiveSpan } from '@sentry/core';
import { getDefaultCurrentScope, getDefaultIsolationScope, setAsyncContextStrategy } from '@sentry/core';
import {
SENTRY_FORK_ISOLATION_SCOPE_CONTEXT_KEY,
Expand All @@ -13,19 +13,13 @@ import { getActiveSpan } from './utils/getActiveSpan';
import { getTraceData } from './utils/getTraceData';
import { suppressTracing } from './utils/suppressTracing';

interface ContextApi {
_getContextManager(): {
getAsyncLocalStorageLookup(): {
asyncLocalStorage: unknown;
};
};
}

/**
* Sets the async context strategy to use follow the OTEL context under the hood.
* We handle forking a hub inside of our custom OTEL Context Manager (./otelContextManager.ts)
*/
export function setOpenTelemetryContextAsyncContextStrategy(): void {
export function setOpenTelemetryContextAsyncContextStrategy(options?: {
getTracingChannelBinding?: () => TracingChannelBinding | undefined;
}): void {
function getScopes(): CurrentScopes {
const ctx = api.context.active();
const scopes = getScopesFromContext(ctx);
Expand Down Expand Up @@ -116,18 +110,6 @@ export function setOpenTelemetryContextAsyncContextStrategy(): void {
// The types here don't fully align, because our own `Span` type is narrower
// than the OTEL one - but this is OK for here, as we now we'll only have OTEL spans passed around
withActiveSpan: withActiveSpan as typeof defaultWithActiveSpan,
getTracingChannelBinding: () => {
try {
const contextManager = (api.context as unknown as ContextApi)._getContextManager();
const lookup = contextManager.getAsyncLocalStorageLookup();

return {
asyncLocalStorage: lookup.asyncLocalStorage,
getStoreWithActiveSpan: (span: Span) => api.trace.setSpan(api.context.active(), span as api.Span),
};
} catch {
return undefined;
}
},
getTracingChannelBinding: options?.getTracingChannelBinding,
});
}
10 changes: 8 additions & 2 deletions packages/opentelemetry/src/asyncLocalStorageContextManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ import type { Context, ContextManager } from '@opentelemetry/api';
import { ROOT_CONTEXT } from '@opentelemetry/api';
import { AsyncLocalStorage } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import { SENTRY_SCOPES_CONTEXT_KEY } from './constants';
import type { AsyncLocalStorageLookup } from './contextManager';
import { SENTRY_SCOPES_CONTEXT_KEY } from './constants';
import { buildContextWithSentryScopes } from './utils/buildContextWithSentryScopes';
import { setIsSetup } from './utils/setupCheck';
import { getAsyncContextStrategy, getMainCarrier } from '@sentry/core';

type ListenerFn = (...args: unknown[]) => unknown;

Expand All @@ -44,13 +45,18 @@ const ADD_LISTENER_METHODS = ['addListener', 'on', 'once', 'prependListener', 'p
* Semantics match `@opentelemetry/context-async-hooks` (function `bind` + `EventEmitter` patching).
*/
export class SentryAsyncLocalStorageContextManager implements ContextManager {
protected readonly _asyncLocalStorage = new AsyncLocalStorage<Context>();
protected readonly _asyncLocalStorage: AsyncLocalStorage<Context>;

private readonly _kOtListeners = Symbol('OtListeners');
private _wrapped = false;

public constructor() {
setIsSetup('SentryContextManager');
// Pick the instance from the async context strategy
// this should normally always be there, but if it is not for whatever reason, we fall back to a new instance
this._asyncLocalStorage =
(getAsyncContextStrategy(getMainCarrier()).getTracingChannelBinding?.()
?.asyncLocalStorage as AsyncLocalStorage<Context>) ?? new AsyncLocalStorage<Context>();
}

public active(): Context {
Expand Down
1 change: 0 additions & 1 deletion packages/opentelemetry/src/exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ export { suppressTracing } from './utils/suppressTracing';

export { setupEventContextTrace } from './setupEventContextTrace';

export { setOpenTelemetryContextAsyncContextStrategy } from './asyncContextStrategy';
// eslint-disable-next-line typescript/no-deprecated
export { wrapContextManagerClass } from './contextManager';

Expand Down
3 changes: 3 additions & 0 deletions packages/opentelemetry/src/index.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ export class SentryAsyncLocalStorageContextManager {
}
}

// This is the generic, non-node specific async context strategy
export { setOpenTelemetryContextAsyncContextStrategy } from './asyncContextStrategy';

export type AsyncLocalStorageLookup = {
asyncLocalStorage: unknown;
contextSymbol: symbol;
Expand Down
3 changes: 3 additions & 0 deletions packages/opentelemetry/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ export * from './exports';
// Node-specific exports
export { SentryAsyncLocalStorageContextManager } from './asyncLocalStorageContextManager';
export type { AsyncLocalStorageLookup } from './contextManager';

// We export the node-specific variant here that uses async local storage
export { setNodeOpenTelemetryContextAsyncContextStrategy as setOpenTelemetryContextAsyncContextStrategy } from './nodeAsyncContextStrategy';
Comment thread
cursor[bot] marked this conversation as resolved.
Loading
Loading