You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

134 lines
5.0 KiB

3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
  1. <?php
  2. namespace App\Console\Commands;
  3. use Anik\Amqp\Exchanges;
  4. use GuzzleHttp\Psr7\Header;
  5. use Illuminate\Support\Arr;
  6. use PhpAmqpLib\Wire\AMQPTable;
  7. use Illuminate\Console\Command;
  8. use Anik\Amqp\ProducibleMessage;
  9. use Anik\Laravel\Amqp\Facades\Amqp;
  10. use PhpAmqpLib\Message\AMQPMessage;
  11. use Anik\Amqp\Exchanges\{Fanout, Direct, Topic, Headers, Exchange};
  12. class Publisher extends Command
  13. {
  14. protected $signature = 'publish
  15. {exchange}
  16. {--declare=1}
  17. {--type=direct}
  18. {--bind=key}
  19. {--durable=1}
  20. {--delivery=1}
  21. {--mandatory=1}
  22. {--count=1}
  23. {--usleep=1}
  24. ';
  25. public function handle()
  26. {
  27. $time = -microtime(true);
  28. echo PHP_EOL;
  29. $requeue = "requeue #";
  30. $requeued = rand(1, ((int) $this->option('count'))-1);
  31. $finish = "finish #";
  32. $finished = rand($requeued, ((int) $this->option('count')));
  33. for ($index = 1; $index <= (int) $this->option('count'); $index++) {
  34. $payload = "message #";
  35. if ($index == $requeued) {
  36. echo "HERE 1".PHP_EOL;
  37. $payload = $requeue;
  38. }
  39. if ($index == $finished) {
  40. echo "HERE 2".PHP_EOL;
  41. $payload = $finish;
  42. }
  43. $message = [
  44. 'content_type' => 'text/plain',
  45. 'delivery_mode' => $this->option('delivery') === "1" ? AMQPMessage::DELIVERY_MODE_NON_PERSISTENT : AMQPMessage::DELIVERY_MODE_PERSISTENT,
  46. 'content_encoding' => 'utf-8',
  47. ];
  48. $publish = [
  49. // When a published message cannot be routed to any queue,
  50. // and the publisher set the mandatory message property to
  51. // true, the message will be returned to it. The publisher
  52. // must have a returned message handler set up in order to
  53. // handle the return (e.g. by logging an error or retrying with a different exchange).
  54. 'mandatory' => $this->option('mandatory') === "1",
  55. // This flag is deprecated as of RabbitMQ 2.9 and will raise
  56. // an exception and close the channel if used.
  57. 'immediate' => false,
  58. 'ticket' => null,
  59. 'batch_count' => 500,
  60. ];
  61. $exchange = [
  62. 'name' => $this->argument('exchange'),
  63. 'declare' => $this->option('declare') === "1", // should declare
  64. 'passive' => false,
  65. // By using durability (Durable, Transient) properties, we can
  66. // make the message to survive even after server restarts. If
  67. // we select Durable, then the message will survive even
  68. // after server restart. In case, if we select Tansient,
  69. // then message will not service after server restart.
  70. 'durable' => $this->option('durable') === "1",
  71. // By using auto delete property, we can set whether an exchange
  72. // can delete if we unbind assigned queue.
  73. 'auto_delete' => false,
  74. // If we set this property yes, then the exchange may not be
  75. // used directly by publishers, but only when bound to other
  76. // exchanges.
  77. 'internal' => false,
  78. 'no_wait' => false,
  79. 'arguments' => [],
  80. 'ticket' => null,
  81. ];
  82. $message = new ProducibleMessage(
  83. message: $payload.$index,
  84. properties: $publish + $message,
  85. );
  86. $other_options = [];
  87. Amqp::publish(
  88. messages: $message,
  89. routingKey: $this->option('bind'),
  90. // In rabbitmq, direct exchange will deliver a messages to the
  91. // queues based on the message routing key
  92. exchange: $this->getExchangeType($this->option('type'), $exchange),
  93. // In rabbitmq, fanout exchange will route messages to all of
  94. // the queues that are bound to it.
  95. // exchange: Fanout::make($exchange),
  96. // https://www.tutlane.com/tutorial/rabbitmq/rabbitmq-exchanges
  97. // exchange: Topic::make($exchange),
  98. // exchange: Headers::make($exchange),
  99. options: $other_options,
  100. );
  101. usleep((int) $this->option('usleep') * 1000);
  102. }
  103. echo sprintf('%f', $time += microtime(true));
  104. echo PHP_EOL;
  105. }
  106. public function getExchangeType(string $type, array $exchange): Exchange
  107. {
  108. return match ($type) {
  109. 'direct' => Direct::make($exchange),
  110. 'fanout' => Fanout::make($exchange),
  111. 'topic' => Topic::make($exchange),
  112. 'headers' => Headers::make($exchange),
  113. };
  114. }
  115. }