$string.\PHP_EOL; $handler = function (ConsumableMessage $message, AMQPMessage $otherMessage) use ($line) { echo $line($body = $message->getMessageBody()); try { if (\str_contains($body, "finish")) { echo $line("finish"); $message->ack(); $otherMessage->getChannel()->getConnection()->close(); } elseif (\str_contains($body, "requeue")) { echo $line("requeue"); $message->reject(); } else { echo $line("ack"); $message->ack(); } } catch (Throwable $e) { echo $line($e->getMessage()); } }; $consume = [ 'allowed_methods' => null, 'non_blocking' => false, // Instead of disabling the timeout entirely, consider using a high value (for example, a few hours). 'timeout' => 3600000, // in milliseconds ]; $consumer = [ // Every consumer has an identifier that is used by client libraries to // determine what handler to invoke for a given delivery. Their names // vary from protocol to protocol. Consumer tags and subscription // IDs are two most commonly used terms. RabbitMQ documentation // tends to use the former. 'tag' => '', // If the no-local field is set the server will not send messages to the connection that published them. 'no_local' => false, // does not expect acknowledgements for messages 'no_ack' => false, // Request exclusive consumer access, meaning only this consumer can access the queue. 'exclusive' => false, // If set, the server will not respond to the method. The client should // not wait for a reply method. If the server could not complete the // method it will raise a channel or connection exception. 'no_wait' => false, 'arguments' => [], 'ticket' => null, ]; $exchange = [ 'name' => $this->option('exchange'), 'type' => $this->option('type'), 'declare' => false, // should declare ]; $queue = [ 'name' => $this->argument('queue'), 'declare' => true, 'passive' => false, // If set when creating a new queue, the queue will be marked as durable. // Durable queues remain active when a server restarts. 'durable' => $this->option('durable') === "1", // Exclusive queues may only be accessed by the current connection, and // are deleted when that connection closes. Passive declaration of an // exclusive queue by other connections are not allowed. 'exclusive' => false, // If set, the queue is deleted when all consumers have finished using it. // The last consumer can be cancelled either explicitly or because its // channel is closed. If there was no consumer ever on the queue, it // won't be deleted. 'auto_delete' => false, 'no_wait' => false, 'arguments' => [], 'ticket' => null, ]; // Quality of service $qos = [ 'enabled' => true, // The client can request that messages be sent in advance so that when // the client finishes processing a message, the following message is // already held locally, rather than needing to be sent down the // channel. Prefetching gives a performance improvement. 'prefetch_size' => 0, // Specifies a prefetch window in terms of whole messages. 'prefetch_count' => 2, 'global' => false, ]; app('amqp')->consume( handler: $handler, bindingKey: $this->option('bind'), options: \compact( 'consume', 'consumer', 'exchange', 'queue', 'qos' ), ); echo sprintf('%f', $time += microtime(true)); echo PHP_EOL; } }