You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
4.6 KiB

3 years ago
  1. <?php
  2. namespace App\Console\Commands;
  3. use Anik\Amqp\Exchanges;
  4. use GuzzleHttp\Psr7\Header;
  5. use Illuminate\Support\Arr;
  6. use PhpAmqpLib\Wire\AMQPTable;
  7. use Illuminate\Console\Command;
  8. use Anik\Amqp\ProducibleMessage;
  9. use Anik\Laravel\Amqp\Facades\Amqp;
  10. use PhpAmqpLib\Message\AMQPMessage;
  11. use Anik\Amqp\Exchanges\{Fanout, Direct, Topic, Headers, Exchange};
  12. class Publisher extends Command
  13. {
  14. protected $signature = 'publish
  15. {exchange}
  16. {--declare=1}
  17. {--type=direct}
  18. {--bind=key}
  19. {--durable=1}
  20. {--delivery=1}
  21. {--mandatory=1}
  22. {--count=1}
  23. {--usleep=1}
  24. ';
  25. public function handle()
  26. {
  27. $time = -microtime(true);
  28. echo PHP_EOL;
  29. for ($index = 1; $index <= (int) $this->option('count'); $index++) {
  30. $payload = rand(1,100) <= 20 ? "finish" : "Message #$index";
  31. $message = [
  32. 'content_type' => 'text/plain',
  33. 'delivery_mode' => $this->option('delivery') === "1" ? AMQPMessage::DELIVERY_MODE_NON_PERSISTENT : AMQPMessage::DELIVERY_MODE_PERSISTENT,
  34. 'content_encoding' => 'utf-8',
  35. ];
  36. $publish = [
  37. // When a published message cannot be routed to any queue,
  38. // and the publisher set the mandatory message property to
  39. // true, the message will be returned to it. The publisher
  40. // must have a returned message handler set up in order to
  41. // handle the return (e.g. by logging an error or retrying with a different exchange).
  42. 'mandatory' => $this->option('mandatory') === "1",
  43. // This flag is deprecated as of RabbitMQ 2.9 and will raise
  44. // an exception and close the channel if used.
  45. 'immediate' => false,
  46. 'ticket' => null,
  47. 'batch_count' => 500,
  48. ];
  49. $exchange = [
  50. 'name' => $this->argument('exchange'),
  51. 'declare' => $this->option('declare') === "1", // should declare
  52. 'passive' => false,
  53. // By using durability (Durable, Transient) properties, we can
  54. // make the message to survive even after server restarts. If
  55. // we select Durable, then the message will survive even
  56. // after server restart. In case, if we select Tansient,
  57. // then message will not service after server restart.
  58. 'durable' => $this->option('durable') === "1",
  59. // By using auto delete property, we can set whether an exchange
  60. // can delete if we unbind assigned queue.
  61. 'auto_delete' => false,
  62. // If we set this property yes, then the exchange may not be
  63. // used directly by publishers, but only when bound to other
  64. // exchanges.
  65. 'internal' => false,
  66. 'no_wait' => false,
  67. 'arguments' => [],
  68. 'ticket' => null,
  69. ];
  70. $message = new ProducibleMessage(
  71. message: json_encode($payload),
  72. properties: $publish + $message,
  73. );
  74. $other_options = [];
  75. Amqp::publish(
  76. messages: $message,
  77. routingKey: $this->option('bind'),
  78. // In rabbitmq, direct exchange will deliver a messages to the
  79. // queues based on the message routing key
  80. exchange: $this->getExchangeType($this->option('type'), $exchange),
  81. // In rabbitmq, fanout exchange will route messages to all of
  82. // the queues that are bound to it.
  83. // exchange: Fanout::make($exchange),
  84. // https://www.tutlane.com/tutorial/rabbitmq/rabbitmq-exchanges
  85. // exchange: Topic::make($exchange),
  86. // exchange: Headers::make($exchange),
  87. options: $other_options,
  88. );
  89. usleep((int) $this->option('usleep') * 1000);
  90. }
  91. echo sprintf('%f', $time += microtime(true));
  92. echo PHP_EOL;
  93. }
  94. public function getExchangeType(string $type, array $exchange): Exchange
  95. {
  96. return match ($type) {
  97. 'direct' => Direct::make($exchange),
  98. 'fanout' => Fanout::make($exchange),
  99. 'topic' => Topic::make($exchange),
  100. 'headers' => Headers::make($exchange),
  101. };
  102. }
  103. }