You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
4.9 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. <?php
  2. namespace App\Console\Commands;
  3. use Illuminate\Console\Command;
  4. use Anik\Amqp\ConsumableMessage;
  5. use PhpAmqpLib\Message\AMQPMessage;
  6. use Throwable;
  7. class Consumer extends Command
  8. {
  9. protected $signature = 'consume
  10. {queue}
  11. {--durable=1}
  12. {--bind=key}
  13. {--exchange=amq.direct}
  14. {--type=direct}
  15. ';
  16. protected $description = 'Run the rabbitmq consumer';
  17. public function handle()
  18. {
  19. $time = -microtime(true);
  20. $line = fn(string $string) => $string.\PHP_EOL;
  21. $handler = function (ConsumableMessage $message, AMQPMessage $otherMessage) use ($line) {
  22. echo $line($body = $message->getMessageBody());
  23. try {
  24. if (\str_contains($body, "finish")) {
  25. echo $line("finish");
  26. $message->ack();
  27. $otherMessage->getChannel()->getConnection()->close();
  28. } elseif (\str_contains($body, "requeue")) {
  29. echo $line("requeue");
  30. $message->reject();
  31. } else {
  32. echo $line("ack");
  33. $message->ack();
  34. }
  35. } catch (Throwable $e) {
  36. echo $line($e->getMessage());
  37. }
  38. };
  39. $consume = [
  40. 'allowed_methods' => null,
  41. 'non_blocking' => false,
  42. // Instead of disabling the timeout entirely, consider using a high value (for example, a few hours).
  43. 'timeout' => 3600000, // in milliseconds
  44. ];
  45. $consumer = [
  46. // Every consumer has an identifier that is used by client libraries to
  47. // determine what handler to invoke for a given delivery. Their names
  48. // vary from protocol to protocol. Consumer tags and subscription
  49. // IDs are two most commonly used terms. RabbitMQ documentation
  50. // tends to use the former.
  51. 'tag' => '',
  52. // If the no-local field is set the server will not send messages to the connection that published them.
  53. 'no_local' => false,
  54. // does not expect acknowledgements for messages
  55. 'no_ack' => false,
  56. // Request exclusive consumer access, meaning only this consumer can access the queue.
  57. 'exclusive' => false,
  58. // If set, the server will not respond to the method. The client should
  59. // not wait for a reply method. If the server could not complete the
  60. // method it will raise a channel or connection exception.
  61. 'no_wait' => false,
  62. 'arguments' => [],
  63. 'ticket' => null,
  64. ];
  65. $exchange = [
  66. 'name' => $this->option('exchange'),
  67. 'type' => $this->option('type'),
  68. 'declare' => false, // should declare
  69. ];
  70. $queue = [
  71. 'name' => $this->argument('queue'),
  72. 'declare' => true,
  73. 'passive' => false,
  74. // If set when creating a new queue, the queue will be marked as durable.
  75. // Durable queues remain active when a server restarts.
  76. 'durable' => $this->option('durable') === "1",
  77. // Exclusive queues may only be accessed by the current connection, and
  78. // are deleted when that connection closes. Passive declaration of an
  79. // exclusive queue by other connections are not allowed.
  80. 'exclusive' => false,
  81. // If set, the queue is deleted when all consumers have finished using it.
  82. // The last consumer can be cancelled either explicitly or because its
  83. // channel is closed. If there was no consumer ever on the queue, it
  84. // won't be deleted.
  85. 'auto_delete' => false,
  86. 'no_wait' => false,
  87. 'arguments' => [],
  88. 'ticket' => null,
  89. ];
  90. // Quality of service
  91. $qos = [
  92. 'enabled' => true,
  93. // The client can request that messages be sent in advance so that when
  94. // the client finishes processing a message, the following message is
  95. // already held locally, rather than needing to be sent down the
  96. // channel. Prefetching gives a performance improvement.
  97. 'prefetch_size' => 0,
  98. // Specifies a prefetch window in terms of whole messages.
  99. 'prefetch_count' => 2,
  100. 'global' => false,
  101. ];
  102. app('amqp')->consume(
  103. handler: $handler,
  104. bindingKey: $this->option('bind'),
  105. options: \compact(
  106. 'consume',
  107. 'consumer',
  108. 'exchange',
  109. 'queue',
  110. 'qos'
  111. ),
  112. );
  113. echo sprintf('%f', $time += microtime(true));
  114. echo PHP_EOL;
  115. }
  116. }