You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
131 lines
4.9 KiB
131 lines
4.9 KiB
<?php
|
|
|
|
namespace App\Console\Commands;
|
|
|
|
use Illuminate\Console\Command;
|
|
use Anik\Amqp\ConsumableMessage;
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
use Throwable;
|
|
|
|
class Consumer extends Command
|
|
{
|
|
protected $signature = 'consume
|
|
{queue}
|
|
{--durable=1}
|
|
{--bind=key}
|
|
{--exchange=amq.direct}
|
|
{--type=direct}
|
|
';
|
|
|
|
protected $description = 'Run the rabbitmq consumer';
|
|
|
|
public function handle()
|
|
{
|
|
$time = -microtime(true);
|
|
$line = fn(string $string) => $string.\PHP_EOL;
|
|
|
|
$handler = function (ConsumableMessage $message, AMQPMessage $otherMessage) use ($line) {
|
|
echo $line($body = $message->getMessageBody());
|
|
|
|
try {
|
|
if (\str_contains($body, "finish")) {
|
|
echo $line("finish");
|
|
$message->ack();
|
|
$otherMessage->getChannel()->getConnection()->close();
|
|
} elseif (\str_contains($body, "requeue")) {
|
|
echo $line("requeue");
|
|
$message->reject();
|
|
} else {
|
|
echo $line("ack");
|
|
$message->ack();
|
|
}
|
|
} catch (Throwable $e) {
|
|
echo $line($e->getMessage());
|
|
}
|
|
};
|
|
|
|
$consume = [
|
|
'allowed_methods' => null,
|
|
'non_blocking' => false,
|
|
// Instead of disabling the timeout entirely, consider using a high value (for example, a few hours).
|
|
'timeout' => 3600000, // in milliseconds
|
|
];
|
|
|
|
$consumer = [
|
|
// Every consumer has an identifier that is used by client libraries to
|
|
// determine what handler to invoke for a given delivery. Their names
|
|
// vary from protocol to protocol. Consumer tags and subscription
|
|
// IDs are two most commonly used terms. RabbitMQ documentation
|
|
// tends to use the former.
|
|
'tag' => '',
|
|
// If the no-local field is set the server will not send messages to the connection that published them.
|
|
'no_local' => false,
|
|
// does not expect acknowledgements for messages
|
|
'no_ack' => false,
|
|
// Request exclusive consumer access, meaning only this consumer can access the queue.
|
|
'exclusive' => false,
|
|
// If set, the server will not respond to the method. The client should
|
|
// not wait for a reply method. If the server could not complete the
|
|
// method it will raise a channel or connection exception.
|
|
'no_wait' => false,
|
|
|
|
'arguments' => [],
|
|
'ticket' => null,
|
|
];
|
|
|
|
$exchange = [
|
|
'name' => $this->option('exchange'),
|
|
'type' => $this->option('type'),
|
|
'declare' => false, // should declare
|
|
];
|
|
|
|
$queue = [
|
|
'name' => $this->argument('queue'),
|
|
'declare' => true,
|
|
'passive' => false,
|
|
// If set when creating a new queue, the queue will be marked as durable.
|
|
// Durable queues remain active when a server restarts.
|
|
'durable' => $this->option('durable') === "1",
|
|
// Exclusive queues may only be accessed by the current connection, and
|
|
// are deleted when that connection closes. Passive declaration of an
|
|
// exclusive queue by other connections are not allowed.
|
|
'exclusive' => false,
|
|
// If set, the queue is deleted when all consumers have finished using it.
|
|
// The last consumer can be cancelled either explicitly or because its
|
|
// channel is closed. If there was no consumer ever on the queue, it
|
|
// won't be deleted.
|
|
'auto_delete' => false,
|
|
'no_wait' => false,
|
|
'arguments' => [],
|
|
'ticket' => null,
|
|
];
|
|
|
|
// Quality of service
|
|
$qos = [
|
|
'enabled' => true,
|
|
// The client can request that messages be sent in advance so that when
|
|
// the client finishes processing a message, the following message is
|
|
// already held locally, rather than needing to be sent down the
|
|
// channel. Prefetching gives a performance improvement.
|
|
'prefetch_size' => 0,
|
|
// Specifies a prefetch window in terms of whole messages.
|
|
'prefetch_count' => 2,
|
|
'global' => false,
|
|
];
|
|
|
|
app('amqp')->consume(
|
|
handler: $handler,
|
|
bindingKey: $this->option('bind'),
|
|
options: \compact(
|
|
'consume',
|
|
'consumer',
|
|
'exchange',
|
|
'queue',
|
|
'qos'
|
|
),
|
|
);
|
|
|
|
echo sprintf('%f', $time += microtime(true));
|
|
echo PHP_EOL;
|
|
}
|
|
}
|