option('count'))-1); $finish = "finish #"; $finished = rand($requeued, ((int) $this->option('count'))); for ($index = 1; $index <= (int) $this->option('count'); $index++) { $payload = "message #"; if ($index == $requeued) { echo "HERE 1".PHP_EOL; $payload = $requeue; } if ($index == $finished) { echo "HERE 2".PHP_EOL; $payload = $finish; } $message = [ 'content_type' => 'text/plain', 'delivery_mode' => $this->option('delivery') === "1" ? AMQPMessage::DELIVERY_MODE_NON_PERSISTENT : AMQPMessage::DELIVERY_MODE_PERSISTENT, 'content_encoding' => 'utf-8', ]; $publish = [ // When a published message cannot be routed to any queue, // and the publisher set the mandatory message property to // true, the message will be returned to it. The publisher // must have a returned message handler set up in order to // handle the return (e.g. by logging an error or retrying with a different exchange). 'mandatory' => $this->option('mandatory') === "1", // This flag is deprecated as of RabbitMQ 2.9 and will raise // an exception and close the channel if used. 'immediate' => false, 'ticket' => null, 'batch_count' => 500, ]; $exchange = [ 'name' => $this->argument('exchange'), 'declare' => $this->option('declare') === "1", // should declare 'passive' => false, // By using durability (Durable, Transient) properties, we can // make the message to survive even after server restarts. If // we select Durable, then the message will survive even // after server restart. In case, if we select Tansient, // then message will not service after server restart. 'durable' => $this->option('durable') === "1", // By using auto delete property, we can set whether an exchange // can delete if we unbind assigned queue. 'auto_delete' => false, // If we set this property yes, then the exchange may not be // used directly by publishers, but only when bound to other // exchanges. 'internal' => false, 'no_wait' => false, 'arguments' => [], 'ticket' => null, ]; $message = new ProducibleMessage( message: $payload.$index, properties: $publish + $message, ); $other_options = []; Amqp::publish( messages: $message, routingKey: $this->option('bind'), // In rabbitmq, direct exchange will deliver a messages to the // queues based on the message routing key exchange: $this->getExchangeType($this->option('type'), $exchange), // In rabbitmq, fanout exchange will route messages to all of // the queues that are bound to it. // exchange: Fanout::make($exchange), // https://www.tutlane.com/tutorial/rabbitmq/rabbitmq-exchanges // exchange: Topic::make($exchange), // exchange: Headers::make($exchange), options: $other_options, ); usleep((int) $this->option('usleep') * 1000); } echo sprintf('%f', $time += microtime(true)); echo PHP_EOL; } public function getExchangeType(string $type, array $exchange): Exchange { return match ($type) { 'direct' => Direct::make($exchange), 'fanout' => Fanout::make($exchange), 'topic' => Topic::make($exchange), 'headers' => Headers::make($exchange), }; } }