You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

125 lines
4.6 KiB

3 years ago
3 years ago
3 years ago
  1. <?php
  2. namespace App\Console\Commands;
  3. use Illuminate\Console\Command;
  4. use Anik\Amqp\ConsumableMessage;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use Throwable;
  7. class Consumer extends Command
  8. {
  9. protected $signature = 'consume
  10. {queue}
  11. {--durable=1}
  12. {--bind=key}
  13. {--exchange=amq.direct}
  14. {--type=direct}
  15. ';
  16. protected $description = 'Run the rabbitmq consumer';
  17. public function handle()
  18. {
  19. $time = -microtime(true);
  20. $handler = function (ConsumableMessage $message, AMQPMessage $otherMessage) use (&$counter) {
  21. echo $body = $message->getMessageBody();
  22. echo \PHP_EOL;
  23. try {
  24. return match ($body) {
  25. \strcmp($body, "finish") === 0 => $otherMessage->getChannel()->close(),
  26. \strcmp($body, "requeue") === 0 => $message->nack(),
  27. default => $body,
  28. };
  29. $message->ack();
  30. } catch (Throwable) {
  31. $message->nack();
  32. }
  33. };
  34. $consume = [
  35. 'allowed_methods' => null,
  36. 'non_blocking' => false,
  37. // Instead of disabling the timeout entirely, consider using a high value (for example, a few hours).
  38. 'timeout' => 3600000, // in milliseconds
  39. ];
  40. $consumer = [
  41. // Every consumer has an identifier that is used by client libraries to
  42. // determine what handler to invoke for a given delivery. Their names
  43. // vary from protocol to protocol. Consumer tags and subscription
  44. // IDs are two most commonly used terms. RabbitMQ documentation
  45. // tends to use the former.
  46. 'tag' => '',
  47. // If the no-local field is set the server will not send messages to the connection that published them.
  48. 'no_local' => false,
  49. // does not expect acknowledgements for messages
  50. 'no_ack' => false,
  51. // Request exclusive consumer access, meaning only this consumer can access the queue.
  52. 'exclusive' => false,
  53. // If set, the server will not respond to the method. The client should
  54. // not wait for a reply method. If the server could not complete the
  55. // method it will raise a channel or connection exception.
  56. 'no_wait' => false,
  57. 'arguments' => [],
  58. 'ticket' => null,
  59. ];
  60. $exchange = [
  61. 'name' => $this->option('exchange'),
  62. 'type' => $this->option('type'),
  63. 'declare' => false, // should declare
  64. ];
  65. $queue = [
  66. 'name' => $this->argument('queue'),
  67. 'declare' => true,
  68. 'passive' => false,
  69. // If set when creating a new queue, the queue will be marked as durable.
  70. // Durable queues remain active when a server restarts.
  71. 'durable' => $this->option('durable') === "1",
  72. // Exclusive queues may only be accessed by the current connection, and
  73. // are deleted when that connection closes. Passive declaration of an
  74. // exclusive queue by other connections are not allowed.
  75. 'exclusive' => false,
  76. // If set, the queue is deleted when all consumers have finished using it.
  77. // The last consumer can be cancelled either explicitly or because its
  78. // channel is closed. If there was no consumer ever on the queue, it
  79. // won't be deleted.
  80. 'auto_delete' => false,
  81. 'no_wait' => false,
  82. 'arguments' => [],
  83. 'ticket' => null,
  84. ];
  85. // Quality of service
  86. $qos = [
  87. 'enabled' => true,
  88. // The client can request that messages be sent in advance so that when
  89. // the client finishes processing a message, the following message is
  90. // already held locally, rather than needing to be sent down the
  91. // channel. Prefetching gives a performance improvement.
  92. 'prefetch_size' => 0,
  93. // Specifies a prefetch window in terms of whole messages.
  94. 'prefetch_count' => 2,
  95. 'global' => false,
  96. ];
  97. app('amqp')->consume(
  98. handler: $handler,
  99. bindingKey: $this->option('bind'),
  100. options: \compact(
  101. 'consume',
  102. 'consumer',
  103. 'exchange',
  104. 'queue',
  105. 'qos'
  106. ),
  107. );
  108. echo sprintf('%f', $time += microtime(true));
  109. echo PHP_EOL;
  110. }
  111. }