|
|
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command; use Anik\Amqp\ConsumableMessage; use PhpAmqpLib\Message\AMQPMessage; use Throwable;
class Consumer extends Command { protected $signature = 'consume {queue} {--durable=1} {--bind=key} {--exchange=amq.direct} {--type=direct} ';
protected $description = 'Run the rabbitmq consumer';
public function handle() { $time = -microtime(true); $line = fn(string $string) => $string.\PHP_EOL;
$handler = function (ConsumableMessage $message, AMQPMessage $otherMessage) use ($line) { echo $line($body = $message->getMessageBody());
try { if (\str_contains($body, "finish")) { echo $line("finish"); $message->ack(); $otherMessage->getChannel()->getConnection()->close(); } elseif (\str_contains($body, "requeue")) { echo $line("requeue"); $message->reject(); } else { echo $line("ack"); $message->ack(); } } catch (Throwable $e) { echo $line($e->getMessage()); } };
$consume = [ 'allowed_methods' => null, 'non_blocking' => false, // Instead of disabling the timeout entirely, consider using a high value (for example, a few hours).
'timeout' => 3600000, // in milliseconds
];
$consumer = [ // Every consumer has an identifier that is used by client libraries to
// determine what handler to invoke for a given delivery. Their names
// vary from protocol to protocol. Consumer tags and subscription
// IDs are two most commonly used terms. RabbitMQ documentation
// tends to use the former.
'tag' => '', // If the no-local field is set the server will not send messages to the connection that published them.
'no_local' => false, // does not expect acknowledgements for messages
'no_ack' => false, // Request exclusive consumer access, meaning only this consumer can access the queue.
'exclusive' => false, // If set, the server will not respond to the method. The client should
// not wait for a reply method. If the server could not complete the
// method it will raise a channel or connection exception.
'no_wait' => false,
'arguments' => [], 'ticket' => null, ];
$exchange = [ 'name' => $this->option('exchange'), 'type' => $this->option('type'), 'declare' => false, // should declare
];
$queue = [ 'name' => $this->argument('queue'), 'declare' => true, 'passive' => false, // If set when creating a new queue, the queue will be marked as durable.
// Durable queues remain active when a server restarts.
'durable' => $this->option('durable') === "1", // Exclusive queues may only be accessed by the current connection, and
// are deleted when that connection closes. Passive declaration of an
// exclusive queue by other connections are not allowed.
'exclusive' => false, // If set, the queue is deleted when all consumers have finished using it.
// The last consumer can be cancelled either explicitly or because its
// channel is closed. If there was no consumer ever on the queue, it
// won't be deleted.
'auto_delete' => false, 'no_wait' => false, 'arguments' => [], 'ticket' => null, ];
// Quality of service
$qos = [ 'enabled' => true, // The client can request that messages be sent in advance so that when
// the client finishes processing a message, the following message is
// already held locally, rather than needing to be sent down the
// channel. Prefetching gives a performance improvement.
'prefetch_size' => 0, // Specifies a prefetch window in terms of whole messages.
'prefetch_count' => 2, 'global' => false, ];
app('amqp')->consume( handler: $handler, bindingKey: $this->option('bind'), options: \compact( 'consume', 'consumer', 'exchange', 'queue', 'qos' ), );
echo sprintf('%f', $time += microtime(true)); echo PHP_EOL; } }
|