You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

131 lines
4.9 KiB

<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Anik\Amqp\ConsumableMessage;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
class Consumer extends Command
{
protected $signature = 'consume
{queue}
{--durable=1}
{--bind=key}
{--exchange=amq.direct}
{--type=direct}
';
protected $description = 'Run the rabbitmq consumer';
public function handle()
{
$time = -microtime(true);
$line = fn(string $string) => $string.\PHP_EOL;
$handler = function (ConsumableMessage $message, AMQPMessage $otherMessage) use ($line) {
echo $line($body = $message->getMessageBody());
try {
if (\str_contains($body, "finish")) {
echo $line("finish");
$message->ack();
$otherMessage->getChannel()->getConnection()->close();
} elseif (\str_contains($body, "requeue")) {
echo $line("requeue");
$message->reject();
} else {
echo $line("ack");
$message->ack();
}
} catch (Throwable $e) {
echo $line($e->getMessage());
}
};
$consume = [
'allowed_methods' => null,
'non_blocking' => false,
// Instead of disabling the timeout entirely, consider using a high value (for example, a few hours).
'timeout' => 3600000, // in milliseconds
];
$consumer = [
// Every consumer has an identifier that is used by client libraries to
// determine what handler to invoke for a given delivery. Their names
// vary from protocol to protocol. Consumer tags and subscription
// IDs are two most commonly used terms. RabbitMQ documentation
// tends to use the former.
'tag' => '',
// If the no-local field is set the server will not send messages to the connection that published them.
'no_local' => false,
// does not expect acknowledgements for messages
'no_ack' => false,
// Request exclusive consumer access, meaning only this consumer can access the queue.
'exclusive' => false,
// If set, the server will not respond to the method. The client should
// not wait for a reply method. If the server could not complete the
// method it will raise a channel or connection exception.
'no_wait' => false,
'arguments' => [],
'ticket' => null,
];
$exchange = [
'name' => $this->option('exchange'),
'type' => $this->option('type'),
'declare' => false, // should declare
];
$queue = [
'name' => $this->argument('queue'),
'declare' => true,
'passive' => false,
// If set when creating a new queue, the queue will be marked as durable.
// Durable queues remain active when a server restarts.
'durable' => $this->option('durable') === "1",
// Exclusive queues may only be accessed by the current connection, and
// are deleted when that connection closes. Passive declaration of an
// exclusive queue by other connections are not allowed.
'exclusive' => false,
// If set, the queue is deleted when all consumers have finished using it.
// The last consumer can be cancelled either explicitly or because its
// channel is closed. If there was no consumer ever on the queue, it
// won't be deleted.
'auto_delete' => false,
'no_wait' => false,
'arguments' => [],
'ticket' => null,
];
// Quality of service
$qos = [
'enabled' => true,
// The client can request that messages be sent in advance so that when
// the client finishes processing a message, the following message is
// already held locally, rather than needing to be sent down the
// channel. Prefetching gives a performance improvement.
'prefetch_size' => 0,
// Specifies a prefetch window in terms of whole messages.
'prefetch_count' => 2,
'global' => false,
];
app('amqp')->consume(
handler: $handler,
bindingKey: $this->option('bind'),
options: \compact(
'consume',
'consumer',
'exchange',
'queue',
'qos'
),
);
echo sprintf('%f', $time += microtime(true));
echo PHP_EOL;
}
}