|
|
<?php
namespace App\Console\Commands;
use Anik\Amqp\Exchanges; use GuzzleHttp\Psr7\Header; use Illuminate\Support\Arr; use PhpAmqpLib\Wire\AMQPTable; use Illuminate\Console\Command; use Anik\Amqp\ProducibleMessage; use Anik\Laravel\Amqp\Facades\Amqp; use PhpAmqpLib\Message\AMQPMessage; use Anik\Amqp\Exchanges\{Fanout, Direct, Topic, Headers, Exchange};
class Publisher extends Command { protected $signature = 'publish {exchange} {--declare=1} {--type=direct} {--bind=key} {--durable=1}
{--delivery=1} {--mandatory=1} {--count=1} {--usleep=1} ';
public function handle() { $time = -microtime(true); echo PHP_EOL;
$requeue = "requeue #"; $requeued = rand(1, ((int) $this->option('count'))-1);
$finish = "finish #"; $finished = rand($requeued, ((int) $this->option('count')));
for ($index = 1; $index <= (int) $this->option('count'); $index++) { $payload = "message #";
if ($index == $requeued) { echo "HERE 1".PHP_EOL; $payload = $requeue; }
if ($index == $finished) { echo "HERE 2".PHP_EOL; $payload = $finish; }
$message = [ 'content_type' => 'text/plain', 'delivery_mode' => $this->option('delivery') === "1" ? AMQPMessage::DELIVERY_MODE_NON_PERSISTENT : AMQPMessage::DELIVERY_MODE_PERSISTENT, 'content_encoding' => 'utf-8', ];
$publish = [ // When a published message cannot be routed to any queue,
// and the publisher set the mandatory message property to
// true, the message will be returned to it. The publisher
// must have a returned message handler set up in order to
// handle the return (e.g. by logging an error or retrying with a different exchange).
'mandatory' => $this->option('mandatory') === "1", // This flag is deprecated as of RabbitMQ 2.9 and will raise
// an exception and close the channel if used.
'immediate' => false, 'ticket' => null, 'batch_count' => 500, ];
$exchange = [ 'name' => $this->argument('exchange'), 'declare' => $this->option('declare') === "1", // should declare
'passive' => false, // By using durability (Durable, Transient) properties, we can
// make the message to survive even after server restarts. If
// we select Durable, then the message will survive even
// after server restart. In case, if we select Tansient,
// then message will not service after server restart.
'durable' => $this->option('durable') === "1", // By using auto delete property, we can set whether an exchange
// can delete if we unbind assigned queue.
'auto_delete' => false, // If we set this property yes, then the exchange may not be
// used directly by publishers, but only when bound to other
// exchanges.
'internal' => false, 'no_wait' => false, 'arguments' => [], 'ticket' => null, ];
$message = new ProducibleMessage( message: $payload.$index, properties: $publish + $message, );
$other_options = [];
Amqp::publish( messages: $message, routingKey: $this->option('bind'), // In rabbitmq, direct exchange will deliver a messages to the
// queues based on the message routing key
exchange: $this->getExchangeType($this->option('type'), $exchange), // In rabbitmq, fanout exchange will route messages to all of
// the queues that are bound to it.
// exchange: Fanout::make($exchange),
// https://www.tutlane.com/tutorial/rabbitmq/rabbitmq-exchanges
// exchange: Topic::make($exchange),
// exchange: Headers::make($exchange),
options: $other_options, );
usleep((int) $this->option('usleep') * 1000); }
echo sprintf('%f', $time += microtime(true)); echo PHP_EOL; }
public function getExchangeType(string $type, array $exchange): Exchange { return match ($type) { 'direct' => Direct::make($exchange), 'fanout' => Fanout::make($exchange), 'topic' => Topic::make($exchange), 'headers' => Headers::make($exchange), }; } }
|