You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
134 lines
5.0 KiB
134 lines
5.0 KiB
<?php
|
|
|
|
namespace App\Console\Commands;
|
|
|
|
use Anik\Amqp\Exchanges;
|
|
use GuzzleHttp\Psr7\Header;
|
|
use Illuminate\Support\Arr;
|
|
use PhpAmqpLib\Wire\AMQPTable;
|
|
use Illuminate\Console\Command;
|
|
use Anik\Amqp\ProducibleMessage;
|
|
use Anik\Laravel\Amqp\Facades\Amqp;
|
|
use PhpAmqpLib\Message\AMQPMessage;
|
|
use Anik\Amqp\Exchanges\{Fanout, Direct, Topic, Headers, Exchange};
|
|
|
|
class Publisher extends Command
|
|
{
|
|
protected $signature = 'publish
|
|
{exchange}
|
|
{--declare=1}
|
|
{--type=direct}
|
|
{--bind=key}
|
|
{--durable=1}
|
|
|
|
{--delivery=1}
|
|
{--mandatory=1}
|
|
{--count=1}
|
|
{--usleep=1}
|
|
';
|
|
|
|
public function handle()
|
|
{
|
|
$time = -microtime(true);
|
|
echo PHP_EOL;
|
|
|
|
$requeue = "requeue #";
|
|
$requeued = rand(1, ((int) $this->option('count'))-1);
|
|
|
|
$finish = "finish #";
|
|
$finished = rand($requeued, ((int) $this->option('count')));
|
|
|
|
for ($index = 1; $index <= (int) $this->option('count'); $index++) {
|
|
$payload = "message #";
|
|
|
|
if ($index == $requeued) {
|
|
echo "HERE 1".PHP_EOL;
|
|
$payload = $requeue;
|
|
}
|
|
|
|
if ($index == $finished) {
|
|
echo "HERE 2".PHP_EOL;
|
|
$payload = $finish;
|
|
}
|
|
|
|
$message = [
|
|
'content_type' => 'text/plain',
|
|
'delivery_mode' => $this->option('delivery') === "1" ? AMQPMessage::DELIVERY_MODE_NON_PERSISTENT : AMQPMessage::DELIVERY_MODE_PERSISTENT,
|
|
'content_encoding' => 'utf-8',
|
|
];
|
|
|
|
$publish = [
|
|
// When a published message cannot be routed to any queue,
|
|
// and the publisher set the mandatory message property to
|
|
// true, the message will be returned to it. The publisher
|
|
// must have a returned message handler set up in order to
|
|
// handle the return (e.g. by logging an error or retrying with a different exchange).
|
|
'mandatory' => $this->option('mandatory') === "1",
|
|
// This flag is deprecated as of RabbitMQ 2.9 and will raise
|
|
// an exception and close the channel if used.
|
|
'immediate' => false,
|
|
'ticket' => null,
|
|
'batch_count' => 500,
|
|
];
|
|
|
|
$exchange = [
|
|
'name' => $this->argument('exchange'),
|
|
'declare' => $this->option('declare') === "1", // should declare
|
|
'passive' => false,
|
|
// By using durability (Durable, Transient) properties, we can
|
|
// make the message to survive even after server restarts. If
|
|
// we select Durable, then the message will survive even
|
|
// after server restart. In case, if we select Tansient,
|
|
// then message will not service after server restart.
|
|
'durable' => $this->option('durable') === "1",
|
|
// By using auto delete property, we can set whether an exchange
|
|
// can delete if we unbind assigned queue.
|
|
'auto_delete' => false,
|
|
// If we set this property yes, then the exchange may not be
|
|
// used directly by publishers, but only when bound to other
|
|
// exchanges.
|
|
'internal' => false,
|
|
'no_wait' => false,
|
|
'arguments' => [],
|
|
'ticket' => null,
|
|
];
|
|
|
|
$message = new ProducibleMessage(
|
|
message: $payload.$index,
|
|
properties: $publish + $message,
|
|
);
|
|
|
|
$other_options = [];
|
|
|
|
Amqp::publish(
|
|
messages: $message,
|
|
routingKey: $this->option('bind'),
|
|
// In rabbitmq, direct exchange will deliver a messages to the
|
|
// queues based on the message routing key
|
|
exchange: $this->getExchangeType($this->option('type'), $exchange),
|
|
// In rabbitmq, fanout exchange will route messages to all of
|
|
// the queues that are bound to it.
|
|
// exchange: Fanout::make($exchange),
|
|
// https://www.tutlane.com/tutorial/rabbitmq/rabbitmq-exchanges
|
|
// exchange: Topic::make($exchange),
|
|
// exchange: Headers::make($exchange),
|
|
options: $other_options,
|
|
);
|
|
|
|
usleep((int) $this->option('usleep') * 1000);
|
|
}
|
|
|
|
echo sprintf('%f', $time += microtime(true));
|
|
echo PHP_EOL;
|
|
}
|
|
|
|
public function getExchangeType(string $type, array $exchange): Exchange
|
|
{
|
|
return match ($type) {
|
|
'direct' => Direct::make($exchange),
|
|
'fanout' => Fanout::make($exchange),
|
|
'topic' => Topic::make($exchange),
|
|
'headers' => Headers::make($exchange),
|
|
};
|
|
}
|
|
}
|