Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions src/Handlers/RabbitMQHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ class RabbitMQHandler extends BaseHandler
{
private readonly AbstractConnection $connection;
private readonly AMQPChannel $channel;
private array $declaredQueues = [];
private array $declaredExchanges = [];
private array $declaredLogicalQueues = [];
private array $declaredQueues = [];
private array $declaredExchanges = [];

public function __construct(protected QueueConfig $config)
{
Expand Down Expand Up @@ -273,8 +274,12 @@ public function clear(?string $queue = null): bool
{
try {
if ($queue === null) {
// Clear all configured queues
foreach (array_keys($this->config->queuePriorities) as $queueName) {
$queueNames = array_unique([
...array_keys($this->config->queuePriorities),
...array_keys($this->declaredLogicalQueues),
]);

foreach ($queueNames as $queueName) {
$this->clearQueue($queueName);
}
} else {
Expand All @@ -300,7 +305,8 @@ public function clear(?string $queue = null): bool
*/
private function declareQueue(string $queue): void
{
$priorities = $this->config->queuePriorities[$queue] ?? ['default'];
$this->declaredLogicalQueues[$queue] = true;
$priorities = $this->config->queuePriorities[$queue] ?? ['default'];

foreach ($priorities as $priority) {
$queueName = $this->getQueueName($queue, $priority);
Expand Down Expand Up @@ -459,16 +465,20 @@ private function publishWithOptionalConfirm(AMQPMessage $message, string $exchan
*/
private function clearQueue(string $queue): void
{
// Purging a queue that does not exist closes the AMQP channel.
$this->declareQueue($queue);

$priorities = $this->config->queuePriorities[$queue] ?? ['default'];

foreach ($priorities as $priority) {
$queueName = $this->getQueueName($queue, $priority);

try {
$this->channel->queue_purge($queueName);
} catch (Throwable) {
// Queue might not exist, ignore
}
$this->channel->queue_purge($queueName);
}

$delayQueueName = $this->getDelayQueueName($queue);
if (isset($this->declaredQueues[$delayQueueName])) {
$this->channel->queue_purge($delayQueueName);
}
}

Expand Down
76 changes: 40 additions & 36 deletions tests/PushAndPopWithDelayTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,43 +44,47 @@ public function testPushAndPopWithDelay(string $name, string $class): void
Time::setTestNow('2023-12-29 14:15:16');

$handler = new $class($this->config);
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key1' => 'value1']);

$this->assertNotNull($result);

$result = $handler->push('queue-delay', 'success', ['key2' => 'value2']);

$this->assertNotNull($result);

if ($name === 'database') {
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue-delay',
'payload' => json_encode(['job' => 'success', 'data' => ['key1' => 'value1'], 'metadata' => []]),
'available_at' => 1703859376,
]);

$this->seeInDatabase('queue_jobs', [
'queue' => 'queue-delay',
'payload' => json_encode(['job' => 'success', 'data' => ['key2' => 'value2'], 'metadata' => []]),
'available_at' => 1703859316,
]);
$handler->clear('queue-delay');

try {
$result = $handler->setDelay(MINUTE)->push('queue-delay', 'success', ['key1' => 'value1']);
$this->assertTrue($result->getStatus());

$result = $handler->push('queue-delay', 'success', ['key2' => 'value2']);
$this->assertTrue($result->getStatus());

if ($name === 'database') {
$this->seeInDatabase('queue_jobs', [
'queue' => 'queue-delay',
'payload' => json_encode(['job' => 'success', 'data' => ['key1' => 'value1'], 'metadata' => []]),
'available_at' => 1703859376,
]);

$this->seeInDatabase('queue_jobs', [
'queue' => 'queue-delay',
'payload' => json_encode(['job' => 'success', 'data' => ['key2' => 'value2'], 'metadata' => []]),
'available_at' => 1703859316,
]);
}

$result = $handler->pop('queue-delay', ['default']);
$this->assertInstanceOf(QueueJob::class, $result);
$payload = ['job' => 'success', 'data' => ['key2' => 'value2'], 'metadata' => []];
$this->assertSame($payload, $result->payload);

$result = $handler->pop('queue-delay', ['default']);
$this->assertNull($result);

// add 1 minute
Time::setTestNow('2023-12-29 14:16:16');

$result = $handler->pop('queue-delay', ['default']);
$this->assertInstanceOf(QueueJob::class, $result);
$payload = ['job' => 'success', 'data' => ['key1' => 'value1'], 'metadata' => []];
$this->assertSame($payload, $result->payload);
} finally {
$handler->clear('queue-delay');
}

$result = $handler->pop('queue-delay', ['default']);
$this->assertInstanceOf(QueueJob::class, $result);
$payload = ['job' => 'success', 'data' => ['key2' => 'value2'], 'metadata' => []];
$this->assertSame($payload, $result->payload);

$result = $handler->pop('queue-delay', ['default']);
$this->assertNull($result);

// add 1 minute
Time::setTestNow('2023-12-29 14:16:16');

$result = $handler->pop('queue-delay', ['default']);
$this->assertInstanceOf(QueueJob::class, $result);
$payload = ['job' => 'success', 'data' => ['key1' => 'value1'], 'metadata' => []];
$this->assertSame($payload, $result->payload);
}

public static function providePushAndPopWithDelay(): iterable
Expand Down
51 changes: 34 additions & 17 deletions tests/RabbitMQDelayTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use CodeIgniter\Queue\Entities\QueueJob;
use CodeIgniter\Queue\Handlers\RabbitMQHandler;
use CodeIgniter\Queue\QueuePushResult;
use CodeIgniter\Test\ReflectionHelper;
use PhpAmqpLib\Connection\AMQPConnectionFactory;
use Tests\Support\Config\Queue as QueueConfig;
use Tests\Support\TestCase;
Expand All @@ -29,13 +30,17 @@
*/
final class RabbitMQDelayTest extends TestCase
{
use ReflectionHelper;

private ?RabbitMQHandler $handler = null;
private string $queue;

protected function setUp(): void
{
parent::setUp();

$config = config(QueueConfig::class);
$config = config(QueueConfig::class);
$this->queue = 'delay-test-queue-' . bin2hex(random_bytes(6));

// Skip tests if RabbitMQ is not available
if (! $this->isRabbitMQAvailable()) {
Expand All @@ -52,9 +57,8 @@ protected function setUp(): void
protected function tearDown(): void
{
if ($this->handler !== null) {
// Clear test queues
try {
$this->handler->clear('delay-test-queue');
$this->deleteDeclaredResources();
} catch (Throwable) {
// Ignore cleanup errors
}
Expand All @@ -70,31 +74,31 @@ public function testDelayedMessageWithRealTiming(): void
$startTime = time();

// Push a delayed job
$result = $this->handler->setDelay($delaySeconds)->push('delay-test-queue', 'success', ['type' => 'delayed']);
$result = $this->handler->setDelay($delaySeconds)->push($this->queue, 'success', ['type' => 'delayed']);
$this->assertInstanceOf(QueuePushResult::class, $result);
$this->assertTrue($result->getStatus());

// Push an immediate job
$result = $this->handler->push('delay-test-queue', 'success', ['type' => 'immediate']);
$result = $this->handler->push($this->queue, 'success', ['type' => 'immediate']);
$this->assertInstanceOf(QueuePushResult::class, $result);
$this->assertTrue($result->getStatus());

// Should get immediate job first
$job = $this->handler->pop('delay-test-queue', ['default']);
$job = $this->handler->pop($this->queue, ['default']);
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('immediate', $job->payload['data']['type']);
$this->handler->done($job);

// Should not get delayed job yet (within first second)
$job = $this->handler->pop('delay-test-queue', ['default']);
$job = $this->handler->pop($this->queue, ['default']);
$this->assertNull($job);

// Wait for delay to expire (with a small buffer)
$waitTime = $delaySeconds + 1;
sleep($waitTime);

// Should now get the delayed job
$job = $this->handler->pop('delay-test-queue', ['default']);
$job = $this->handler->pop($this->queue, ['default']);
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('delayed', $job->payload['data']['type']);

Expand All @@ -109,34 +113,34 @@ public function testDelayedMessageWithRealTiming(): void
public function testMultipleDelayedJobsWithDifferentDelays(): void
{
// Push jobs with different delays
$result1 = $this->handler->setDelay(1)->push('delay-test-queue', 'success', ['order' => 'first', 'delay' => 1]);
$result2 = $this->handler->setDelay(3)->push('delay-test-queue', 'success', ['order' => 'second', 'delay' => 3]);
$result3 = $this->handler->push('delay-test-queue', 'success', ['order' => 'immediate', 'delay' => 0]);
$result1 = $this->handler->setDelay(1)->push($this->queue, 'success', ['order' => 'first', 'delay' => 1]);
$result2 = $this->handler->setDelay(3)->push($this->queue, 'success', ['order' => 'second', 'delay' => 3]);
$result3 = $this->handler->push($this->queue, 'success', ['order' => 'immediate', 'delay' => 0]);

$this->assertTrue($result1->getStatus());
$this->assertTrue($result2->getStatus());
$this->assertTrue($result3->getStatus());

// Should get immediate job first
$job = $this->handler->pop('delay-test-queue', ['default']);
$job = $this->handler->pop($this->queue, ['default']);
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('immediate', $job->payload['data']['order']);
$this->handler->done($job);

// Wait 2 seconds - should get first delayed job
sleep(2);
$job = $this->handler->pop('delay-test-queue', ['default']);
$job = $this->handler->pop($this->queue, ['default']);
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('first', $job->payload['data']['order']);
$this->handler->done($job);

// Should not get second job yet
$job = $this->handler->pop('delay-test-queue', ['default']);
$job = $this->handler->pop($this->queue, ['default']);
$this->assertNull($job);

// Wait another 2 seconds - should get second delayed job
sleep(2);
$job = $this->handler->pop('delay-test-queue', ['default']);
$job = $this->handler->pop($this->queue, ['default']);
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('second', $job->payload['data']['order']);
$this->handler->done($job);
Expand All @@ -145,11 +149,11 @@ public function testMultipleDelayedJobsWithDifferentDelays(): void
public function testZeroDelayWorksImmediately(): void
{
// Jobs with 0 delay should work immediately
$result = $this->handler->setDelay(0)->push('delay-test-queue', 'success', ['type' => 'zero-delay']);
$result = $this->handler->setDelay(0)->push($this->queue, 'success', ['type' => 'zero-delay']);
$this->assertTrue($result->getStatus());

// Should be able to pop immediately
$job = $this->handler->pop('delay-test-queue', ['default']);
$job = $this->handler->pop($this->queue, ['default']);
$this->assertInstanceOf(QueueJob::class, $job);
$this->assertSame('zero-delay', $job->payload['data']['type']);

Expand All @@ -163,4 +167,17 @@ private function isRabbitMQAvailable(): bool
{
return class_exists(AMQPConnectionFactory::class);
}

private function deleteDeclaredResources(): void
{
$channel = self::getPrivateProperty($this->handler, 'channel');

foreach (array_keys(self::getPrivateProperty($this->handler, 'declaredQueues')) as $queue) {
$channel->queue_delete($queue);
}

foreach (array_keys(self::getPrivateProperty($this->handler, 'declaredExchanges')) as $exchange) {
$channel->exchange_delete($exchange);
}
}
}
Loading
Loading