Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
services:
redis:
image: redis:8
restart: always
container_name: e2e-tests-node-bullmq-redis
ports:
- '6379:6379'
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { execSync } from 'child_process';
import { dirname } from 'path';
import { fileURLToPath } from 'url';

const __dirname = dirname(fileURLToPath(import.meta.url));

export default async function globalSetup() {
// Start Redis via Docker Compose
execSync('docker compose up -d --wait', {
cwd: __dirname,
stdio: 'inherit',
});
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { execSync } from 'child_process';
import { dirname } from 'path';
import { fileURLToPath } from 'url';

const __dirname = dirname(fileURLToPath(import.meta.url));

export default async function globalTeardown() {
// Stop Redis and remove containers
execSync('docker compose down --volumes', {
cwd: __dirname,
stdio: 'inherit',
});
}
25 changes: 25 additions & 0 deletions dev-packages/e2e-tests/test-applications/node-bullmq/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name": "node-bullmq",
"version": "0.0.1",
"private": true,
"type": "module",
"scripts": {
"start": "node src/app.mjs",
"clean": "npx rimraf node_modules pnpm-lock.yaml",
"test": "playwright test",
"test:build": "pnpm install",
"test:assert": "pnpm test"
},
"dependencies": {
"@sentry/node": "file:../../packed/sentry-node-packed.tgz",
"bullmq": "^5.0.0",
"express": "^4.21.0"
},
"devDependencies": {
"@playwright/test": "~1.56.0",
"@sentry-internal/test-utils": "link:../../../test-utils"
},
"volta": {
"extends": "../../package.json"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { getPlaywrightConfig } from '@sentry-internal/test-utils';

const config = getPlaywrightConfig({
startCommand: `pnpm start`,
});

export default {
...config,
globalSetup: './global-setup.mjs',
globalTeardown: './global-teardown.mjs',
};
58 changes: 58 additions & 0 deletions dev-packages/e2e-tests/test-applications/node-bullmq/src/app.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import './instrument.mjs';

import * as Sentry from '@sentry/node';
import { Queue, Worker } from 'bullmq';
import express from 'express';

const app = express();
const port = 3030;

const connection = { host: '127.0.0.1', port: 6379 };
const telemetry = new Sentry.BullMQTelemetry();

const testQueue = new Queue('test-queue', { connection, telemetry });

const worker = new Worker(
'test-queue',
async job => {
if (job.name === 'fail-job') {
throw new Error('Test error from BullMQ processor');
}

if (job.name === 'breadcrumb-job') {
Sentry.addBreadcrumb({ message: 'breadcrumb-from-bullmq-processor' });
}

return { success: true };
},
{ connection, telemetry },
);

worker.on('error', err => {
console.error('Worker error:', err);
});

app.get('/enqueue/success', async (req, res) => {
await testQueue.add('success-job', { data: 'test' });
res.send('Job enqueued');
});

app.get('/enqueue/fail', async (req, res) => {
await testQueue.add('fail-job', { data: 'test' });
res.send('Job enqueued');
});

app.get('/enqueue/breadcrumb-test', async (req, res) => {
await testQueue.add('breadcrumb-job', { data: 'test' });
res.send('Job enqueued');
});

app.get('/check-isolation', async (req, res) => {
res.send('Isolation check');
});

Sentry.setupExpressErrorHandler(app);

app.listen(port, () => {
console.log(`App listening on port ${port}`);
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import * as Sentry from '@sentry/node';

Sentry.init({
dsn: process.env.E2E_TEST_DSN,
tunnel: 'http://localhost:3031/',
tracesSampleRate: 1.0,
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { startEventProxyServer } from '@sentry-internal/test-utils';

startEventProxyServer({
port: 3031,
proxyServerName: 'node-bullmq',
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { expect, test } from '@playwright/test';
import { waitForError, waitForMetric, waitForTransaction } from '@sentry-internal/test-utils';

test('Creates a queue.submit span when adding a job', async ({ baseURL }) => {
const transactionPromise = waitForTransaction('node-bullmq', transactionEvent => {
return transactionEvent.transaction === 'GET /enqueue/success';
});

await fetch(`${baseURL}/enqueue/success`);

const transaction = await transactionPromise;

const submitSpan = transaction.spans?.find(span => span.op === 'queue.submit');
expect(submitSpan).toBeDefined();
expect(submitSpan!.origin).toBe('auto.queue.bullmq.producer');
expect(submitSpan!.data?.['messaging.system']).toBe('bullmq');
});

test('Creates a transaction for queue.task when processing a job', async ({ baseURL }) => {
const transactionPromise = waitForTransaction('node-bullmq', transactionEvent => {
return transactionEvent.contexts?.trace?.op === 'queue.task';
});

await fetch(`${baseURL}/enqueue/success`);

const transaction = await transactionPromise;

expect(transaction.contexts?.trace?.op).toBe('queue.task');
expect(transaction.contexts?.trace?.origin).toBe('auto.queue.bullmq.consumer');
expect(transaction.contexts?.trace?.data?.['messaging.system']).toBe('bullmq');
});

test('Sends exception to Sentry on error in job processor', async ({ baseURL }) => {
const errorEventPromise = waitForError('node-bullmq', event => {
return (
!event.type &&
event.exception?.values?.[0]?.value === 'Test error from BullMQ processor' &&
event.exception?.values?.[0]?.mechanism?.type === 'auto.queue.bullmq'
);
});

await fetch(`${baseURL}/enqueue/fail`);

const errorEvent = await errorEventPromise;

expect(errorEvent.exception?.values).toHaveLength(1);
expect(errorEvent.exception?.values?.[0]?.mechanism).toEqual({
handled: false,
type: 'auto.queue.bullmq',
});
});

test('BullMQ processor breadcrumbs do not leak into subsequent HTTP requests', async ({ baseURL }) => {
const processTransactionPromise = waitForTransaction('node-bullmq', transactionEvent => {
return transactionEvent.contexts?.trace?.op === 'queue.task';
});

await fetch(`${baseURL}/enqueue/breadcrumb-test`);

await processTransactionPromise;

const transactionPromise = waitForTransaction('node-bullmq', transactionEvent => {
return transactionEvent.transaction === 'GET /check-isolation';
});

await fetch(`${baseURL}/check-isolation`);

const transaction = await transactionPromise;

const leakedBreadcrumb = (transaction.breadcrumbs || []).find(
(b: { message?: string }) => b.message === 'breadcrumb-from-bullmq-processor',
);
expect(leakedBreadcrumb).toBeUndefined();
});

test('Links consumer transaction to producer span via sentry.previous_trace', async ({ baseURL }) => {
const httpTransactionPromise = waitForTransaction('node-bullmq', transactionEvent => {
return transactionEvent.transaction === 'GET /enqueue/success';
});

const consumerTransactionPromise = waitForTransaction('node-bullmq', transactionEvent => {
return transactionEvent.contexts?.trace?.op === 'queue.task';
});

await fetch(`${baseURL}/enqueue/success`);

const httpTransaction = await httpTransactionPromise;
const consumerTransaction = await consumerTransactionPromise;

const producerSpan = httpTransaction.spans?.find(span => span.op === 'queue.submit');
expect(producerSpan).toBeDefined();

const previousTrace = consumerTransaction.contexts?.trace?.data?.['sentry.previous_trace'];
expect(previousTrace).toBeDefined();
expect(previousTrace).toContain(httpTransaction.contexts?.trace?.trace_id);
});

test('Emits bullmq.jobs.completed counter metric on successful job', async ({ baseURL }) => {
const metricPromise = waitForMetric('node-bullmq', metric => {
return metric.name === 'bullmq.jobs.completed' && metric.type === 'counter';
});

await fetch(`${baseURL}/enqueue/success`);

const metric = await metricPromise;

expect(metric.name).toBe('bullmq.jobs.completed');
expect(metric.type).toBe('counter');
expect(metric.value).toEqual(expect.any(Number));
});

test('Emits bullmq.job.duration histogram metric on job completion', async ({ baseURL }) => {
const metricPromise = waitForMetric('node-bullmq', metric => {
return metric.name === 'bullmq.job.duration' && metric.type === 'distribution';
});

await fetch(`${baseURL}/enqueue/success`);

const metric = await metricPromise;

expect(metric.name).toBe('bullmq.job.duration');
expect(metric.type).toBe('distribution');
expect(metric.value).toEqual(expect.any(Number));
});
1 change: 1 addition & 0 deletions dev-packages/node-integration-tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"@types/pg": "^8.6.5",
"ai": "^4.3.16",
"amqplib": "^0.10.9",
"bullmq": "^5.79.1",
"body-parser": "^2.2.2",
"connect": "^3.7.0",
"consola": "^3.2.3",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
services:
db:
image: redis:latest
restart: always
container_name: integration-tests-bullmq-redis
ports:
- '6380:6379'
healthcheck:
test: ['CMD-SHELL', 'redis-cli ping | grep -q PONG']
interval: 2s
timeout: 3s
retries: 30
start_period: 5s
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import * as Sentry from '@sentry/node';
import { loggingTransport } from '@sentry-internal/node-integration-tests';

Sentry.init({
dsn: 'https://public@dsn.ingest.sentry.io/1337',
release: '1.0',
tracesSampleRate: 1.0,
transport: loggingTransport,
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import * as Sentry from '@sentry/node';
import { Queue, Worker } from 'bullmq';

const telemetry = new Sentry.BullMQTelemetry();
const connection = { host: '127.0.0.1', port: 6380 };

async function run() {
const queue = new Queue('test-queue', { connection, telemetry });

const worker = new Worker(
'test-queue',
async () => {
// job processed
},
{ connection, telemetry },
);

const jobProcessed = new Promise(resolve => {
worker.on('completed', () => resolve());
});

await Sentry.startSpan({ name: 'enqueue test-job' }, async () => {
await queue.add('test-job', { data: 'test-data' });
});

await jobProcessed;
await worker.close();
await queue.close();
await Sentry.flush();
}

run();
Loading
Loading