You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

134 lines
5.0 KiB

<?php
namespace App\Console\Commands;
use Anik\Amqp\Exchanges;
use GuzzleHttp\Psr7\Header;
use Illuminate\Support\Arr;
use PhpAmqpLib\Wire\AMQPTable;
use Illuminate\Console\Command;
use Anik\Amqp\ProducibleMessage;
use Anik\Laravel\Amqp\Facades\Amqp;
use PhpAmqpLib\Message\AMQPMessage;
use Anik\Amqp\Exchanges\{Fanout, Direct, Topic, Headers, Exchange};
class Publisher extends Command
{
protected $signature = 'publish
{exchange}
{--declare=1}
{--type=direct}
{--bind=key}
{--durable=1}
{--delivery=1}
{--mandatory=1}
{--count=1}
{--usleep=1}
';
public function handle()
{
$time = -microtime(true);
echo PHP_EOL;
$requeue = "requeue #";
$requeued = rand(1, ((int) $this->option('count'))-1);
$finish = "finish #";
$finished = rand($requeued, ((int) $this->option('count')));
for ($index = 1; $index <= (int) $this->option('count'); $index++) {
$payload = "message #";
if ($index == $requeued) {
echo "HERE 1".PHP_EOL;
$payload = $requeue;
}
if ($index == $finished) {
echo "HERE 2".PHP_EOL;
$payload = $finish;
}
$message = [
'content_type' => 'text/plain',
'delivery_mode' => $this->option('delivery') === "1" ? AMQPMessage::DELIVERY_MODE_NON_PERSISTENT : AMQPMessage::DELIVERY_MODE_PERSISTENT,
'content_encoding' => 'utf-8',
];
$publish = [
// When a published message cannot be routed to any queue,
// and the publisher set the mandatory message property to
// true, the message will be returned to it. The publisher
// must have a returned message handler set up in order to
// handle the return (e.g. by logging an error or retrying with a different exchange).
'mandatory' => $this->option('mandatory') === "1",
// This flag is deprecated as of RabbitMQ 2.9 and will raise
// an exception and close the channel if used.
'immediate' => false,
'ticket' => null,
'batch_count' => 500,
];
$exchange = [
'name' => $this->argument('exchange'),
'declare' => $this->option('declare') === "1", // should declare
'passive' => false,
// By using durability (Durable, Transient) properties, we can
// make the message to survive even after server restarts. If
// we select Durable, then the message will survive even
// after server restart. In case, if we select Tansient,
// then message will not service after server restart.
'durable' => $this->option('durable') === "1",
// By using auto delete property, we can set whether an exchange
// can delete if we unbind assigned queue.
'auto_delete' => false,
// If we set this property yes, then the exchange may not be
// used directly by publishers, but only when bound to other
// exchanges.
'internal' => false,
'no_wait' => false,
'arguments' => [],
'ticket' => null,
];
$message = new ProducibleMessage(
message: $payload.$index,
properties: $publish + $message,
);
$other_options = [];
Amqp::publish(
messages: $message,
routingKey: $this->option('bind'),
// In rabbitmq, direct exchange will deliver a messages to the
// queues based on the message routing key
exchange: $this->getExchangeType($this->option('type'), $exchange),
// In rabbitmq, fanout exchange will route messages to all of
// the queues that are bound to it.
// exchange: Fanout::make($exchange),
// https://www.tutlane.com/tutorial/rabbitmq/rabbitmq-exchanges
// exchange: Topic::make($exchange),
// exchange: Headers::make($exchange),
options: $other_options,
);
usleep((int) $this->option('usleep') * 1000);
}
echo sprintf('%f', $time += microtime(true));
echo PHP_EOL;
}
public function getExchangeType(string $type, array $exchange): Exchange
{
return match ($type) {
'direct' => Direct::make($exchange),
'fanout' => Fanout::make($exchange),
'topic' => Topic::make($exchange),
'headers' => Headers::make($exchange),
};
}
}