Delve into the Symfony Messenger

Delve into the Symfony Messenger

This is an english translation of my article “Plongée au coeur du composant Messenger” written for AFSY (Association Francophone des Utilisateurs de Symfony).

The Messenger component makes it easy to send and receive messages between applications or different components of a single application. I emphasise “easily” because this component provides an ingenious abstraction of a whole sending and receiving machinery. One line of code is enough to send a message:

$bus->dispatch($message);

This message can be sent asynchronously to an AMQP broker, a Redis server, an API endpoint:

Send a message

The messenger can also receive a message from the outside and direct it to an internal handler. A handler is a piece of custom code that you add to process specific messages:

Receive a message

The third scenario of using the Messenger recalls the event dispatcher of Symfony. The message is processed synchronously and does not leave the application:

Handle a message syncrhroniously

What’s inside?

A message can be an object of any class. Message contains only useful information without regard to transport issues. Usually, it is very useful to attach additional information to a message about its processing by the message bus. That is why the message is put in an envelope (an instance of Envelope class) to which we can attach the stamps (instances of Stamp class). For example ReceivedStamp marks the message received from the outside by a receiver. Envelopes containing ReceivedStamp will be ignored by senders so that the message from the outside will not be resent to the outside. Instead, the bus message will look for a handler in the application.

MessageBus is a pivot of the Messenger. It accepts all messages and directs them to the recipients. Despite its great importance it is a small class that with one property and one method:

Message bus class

The whole machinery of receiving and sending messages resides in the multiple middlewares. The only real responsibility of the bus message is to select the first middleware and to pass it an envelope and all the registered middlewares.

It is worth noting that there may be more than one message bus.

All middlewares implement MiddlewareInterface. Middleware receives a message, processes it and then optionally passes it to the next middleware. And so on. The order of middlewares is very important. Each bus has its own stack of middlewares. When the configuration of a message bus is entrusted to the FrameworkBundle it adds the following middlewares:

  • add_bus_name_stamp_middleware adds BusNameStamp that contains the bus name dispatching the message.
  • reject_redelivered_message_middleware is only used when a message arrives from an AMQP broker. When the message is presented the second time this middleware sends a negative ack to the AMQP broker and stops the middleware chain (hence it stops the treatment of the message). This happens when the AMQP broker delivers the message to the application and does not receive an acknowledgment (ack), then it redelivers the same message. reject_redelivered_message_middleware exists to break the infinite loop of redelivery of problematic messages.
  • dispatch_after_current_bus retains the processing of new messages sent by a handler (who is already processing its message). More information in the official docs.
  • failed_message_processing_middleware. An exception may be thrown while processing a message. In this case the message will be redirected to a “failed” queue with the SentToFailureTransportStamp stamp. The “messenger:failed:retry” command will re-dispatch these messages once again and this middleware will correct these messages as if they came from a normal receiver.
  • Your middlewares. If you add your own middleware in the default configuration, FrameworkBundle will add it in this position. In addition to your custom middleware you can also use those provided by Symfony: TraceableMiddleware, ValidationMiddleware. Your custom middleware can change/treat/log messages, add or delete stamps, stop the middleware chain.
  • send_message passes a message to a sender and stops the middleware chain if it is the message created in the application. If there is no suitable sender for a message, it will be passed to the next middleware. If the envelope carries ReceivedStamp then send_message middleware does nothing to avoid resending the external message to the outside.
  • handle_message chooses a suitable handler in the application passes it the message.

Send or handle ?

Messages are either sent to the outside by a sender or processed synchronously by handlers.

Handlers declare which type of messages they want to process by indicating a class, a superclass or an interface of messages. A message will be passed to the all interested handlers. Handlers receive only the message itself without any stamp. When a message is processed synchronously it is possible to use HandledStamp to retrieve the result returned by a handler:

$envelope = $bus->dispatch(new Message());
$handledStamp = $envelope->last(HandledStamp::class);
$result = $handledStamp->getResult();

HandledStamp is only added by HandleMessageMiddleware middleware when several conditions are met:

  • the message reaches this middleware. That is, no previous middleware has stopped the middleware chain,
  • there is at least one handler that is interested in this type of message. The handler treats a message without throwing an exception and returns a value.
namespace Symfony\Component\Messenger\Middleware;

class HandleMessageMiddleware implements MiddlewareInterface
{
    private $handlersLocator;

    /**
    * {@inheritdoc}
    */
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $handler = null;
        $message = $envelope->getMessage();

        foreach ($this->handlersLocator->getHandlers($envelope) as $handlerDescriptor) {
            $handler = $handlerDescriptor->getHandler();

            // Invoking the handler.
            // The result returned by the handler is saved in HandledStamp.
            $handledStamp = HandledStamp::fromDescriptor($handlerDescriptor, $handler($message));

            // Attach the stamp to the envelope
            $envelope = $envelope->with($handledStamp);
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

If a message is sent by a sender in SendMessageMiddleware the latter stops the execution of the middleware chain and HandleMessageMiddleware does not run:

namespace Symfony\Component\Messenger\Middleware;

class SendMessageMiddleware implements MiddlewareInterface
{
    /**
    * @var SendersLocatorInterface
    */
    private $sendersLocator;

    /**
    * {@inheritdoc}
    */
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        $sender = null;

        if ($envelope->all(ReceivedStamp::class)) {
            // It is a received message, do not send it back.
        } else {
            foreach ($this->getSenders($envelope, $redeliveryStamp) as $alias => $sender) {
                $envelope = $envelope->with(new SentStamp(/** ... */));
                $envelope = $sender->send($envelope);
            }
        }

        // Pass the envelope to the next middleware (HandleMessageMiddleware) if
        // both conditions are met:
        // - no sender is suitable for this message,
        // - the message was received from the outside (that is, the envelope
        //   carries ReceivedStamp).
        if (null === $sender) {
            return $stack->next()->handle($envelope, $stack);
        }

        // If the message was sent it should not be processed by the next
        // middleware. That is why the next middleware is not invoked.
        return $envelope;
    }

    /**
    * @return iterable|SenderInterface[]
    */
    private function getSenders(Envelope $envelope, ?RedeliveryStamp $redeliveryStamp): iterable
    {
        return $this->sendersLocator->getSenders($envelope);
    }
}

RPC with AMQP

The functionality proposed in the pull request is quite specific, hence it is possible that this idea is not accepted by the community. However, it is interesting to carry out a proof of concept and test the flexibility of the Messenger component. In this article as well as in the pull request I will focus only on AMQP transport.

When a message is processed synchronously, it does not quit the application. A client (that generated and sent the message) will anyway wait for the handler to treat the message. Hence the client can retrieve the result returned by the handler.

If the message gets sent to the outside, its processing is often asynchronous. The sender just sends the message without waiting for the message to be treated. Consequently, the external handler is not able to provide the result to the client. However, it is possible to use the pattern called Remote Procedure Call or RPC:

RPC send request

The queue “messages” is an ordinary queue: several customers can send requests to it, several handlers can consume requests from it. During sending a request to a queue “messages” the client creates an exclusive temporary queue. It is a callback queue dedicated exclusively for this client, i.e. there can only be one consumer connected to this queue. The opaque name of a callback queue is attached to the request as the “reply_to” property. Once the request is sent the client can continue executing, then it can stop and wait for the response. The response will be put in the callback queue.

The handler sends its response in a queue indicated in the “reply_to” property of the request:

RPC send response

The response gets dispatched to the waiting client. Having received the response the client destroys the connection to the callback queue. Therefore AMQP broker automatically deletes it.

Add PRC to the Messenger component

Firstly, give the customer the opportunity to indicate that it will wait for the response from a handler. The perfect candidate for this is a new ReplyStamp stamp:

// Attach ReplyStamp to the envelope to indicate that the client will wait for
// the response.
$envelope = $messageBus->dispatch($message, [new ReplyStamp()]);

ReplyStamp will also be used to retrieve the response:

namespace Symfony\Component\Messenger\Stamp;

/**
 * Stamp used to identify that client wants a response.
 * Client gets a response from a handler via this stamp.
 */
class ReplyStamp implements StampInterface
{
    /**
    * @var mixed
    */
    private $response;

    /**
     * @param mixed $response
     */
    public function setResponse($response): void
    {
        $this->response = $response;
    }

    /**
     * @return mixed
     */
    public function getResponse()
    {
        return $this->response;
    }
}

Here is a client that is waiting for the response for the message processed synchronously (without sending it to the outside):

// Indicate that the client will wait for the response.
// The message is handled during "MessageBud::dispatch" method.
$envelope = $messageBus->dispatch($message, [new ReplyStamp()]);

// Retrieve the result.
$replyStamp = $envelope->last(ReplyStamp::class);
$result = $replyStamp->getResponse();

And here is a client that is waiting for the response for the message processed asynchronously (sent to the AMQP broker):

// Indicate that the client will wait for the response.
// The message is sent to AMQP broker during "MessageBud::dispatch" method.
$envelope = $messageBus->dispatch($message, [new ReplyStamp()]);

// take care of other things while the request is being routed to handler and
// processed by it.

// Wait for the response. The getResponse method is blocking.
$replyStamp = $envelope->last(AmqpReplyStamp::class);
$result = $replyStamp->getResponse();

In my opinion it is important that the client is aware that the request is treated asynchronously. The code that is explicit about it helps anticipate potential problems and facilitate debugging. For this reason it is better to use a specific AmqpReplyStamp stamp rather than reusing ReplyStamp to hide RPC.

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;

/**
 * Stamp added by @see AmqpSender when the client waits for a response from a handler.
 * The client retrieves the response from an exclusive temporary queue.
 * This stamp implements NonSendableStampInterface because it should not be sent
 * to a handler via AMQP brocker. It is only the client who is interested in
 * this stamp to be able to retrieve the result.
 */
class AmqpReplyStamp implements NonSendableStampInterface
{
    /**
     * Callback queue.
     *
     * @var \AMQPQueue
     */
    private $replyQueue;

    /**
     * @param \AMQPQueue $replyQueue
     */
    public function __construct(\AMQPQueue $replyQueue)
    {
        $this->replyQueue = $replyQueue;
    }

    /**
     * @return mixed
     */
    public function getResponse()
    {
        $response = null;

        // Consume messages from the callback queue. It's a blocking call.
        $this->replyQueue->consume(function(\AMQPEnvelope $envelope) use (&$response) {
            $response = $envelope->getBody();
    
            // Stop consuming messages after receiving the response.
            return false;
        });

        return $response;
    }
}

AmqpReplyStamp must be attached to the request envelope when it is sent to AMQP broker. To attach this stamp, you must first create a temporary callback queue. AmqpSender is a perfect place for this functionality:

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\TransportException;
use Symfony\Component\Messenger\Stamp\DelayStamp;
use Symfony\Component\Messenger\Stamp\ReplyStamp;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

/**
 * Symfony Messenger sender to send messages to AMQP brokers using PHP's AMQP extension.
 *
 * @author Samuel Roze <samuel.roze@gmail.com>
 */
class AmqpSender implements SenderInterface
{
    private $serializer;
    private $connection;

    public function __construct(Connection $connection, SerializerInterface $serializer = null)
    {
        $this->connection = $connection;
        $this->serializer = $serializer ?? new PhpSerializer();
    }

    /**
     * {@inheritdoc}
     */
    public function send(Envelope $envelope): Envelope
    {
        $encodedMessage = $this->serializer->encode($envelope);

        /** @var DelayStamp|null $delayStamp */
        $delayStamp = $envelope->last(DelayStamp::class);
        $delay = $delayStamp ? $delayStamp->getDelay() : 0;

        /** @var AmqpStamp|null $amqpStamp */
        $amqpStamp = $envelope->last(AmqpStamp::class);

+       if ($replyStamp = $envelope->last(ReplyStamp::class)) {
+           // Declare a new exclusive queue, create an AmqpReplyStamp and add it
+           // to the request's envelope.
+           $replyQueue = $this->connection->createReplyQueue();
+           $amqpReplyStamp = new AmqpReplyStamp($replyQueue);
+           $envelope = $envelope->with($amqpReplyStamp);
+
+           // Add a 'reply_to' property to the request to let the handler know
+           // which queue to send the response to.
+           $amqpStamp = AmqpStamp::createWithAttributes(['reply_to' => $replyQueue->getName()], $amqpStamp);
+       }

        // ...

        try {
            // Delegate the message sending to the $connection object.
            $this->connection->publish(
                $encodedMessage['body'],
                $encodedMessage['headers'] ?? [],
                $delay,
                $amqpStamp
            );
        } catch (\AMQPException $e) {
            throw new TransportException($e->getMessage(), 0, $e);
        }

        return $envelope;
    }
}

The $connection object contains useful methods for interacting with AMQP broker. We need to add the Symfony\Component\Messenger\Transport\AmqpExt::createReplyQueue() method to be able to create a callback queue:

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Exception\InvalidArgumentException;

/**
 * An AMQP connection.
 *
 * @author Samuel Roze <samuel.roze@gmail.com>
 */
class Connection
{
    /**
     * @var AmqpFactory
     */
    private $amqpFactory;

    /**
     * @var \AMQPChannel|null
     */
    private $amqpChannel;

    // ...

+   /**
+    * Create an exclusive queue to get a response from.
+    */
+   public function createReplyQueue(): \AMQPQueue
+   {
+       $queue = $this->amqpFactory->createQueue($this->channel());
+       $queue->setFlags(\AMQP_EXCLUSIVE);
+       $queue->declareQueue();
+
+       return $queue;
+   }

    public function channel(): \AMQPChannel
    {
        if (null === $this->amqpChannel) {
            // creer un channel
        }

        return $this->amqpChannel;
    }

    // ...
}

On the one hand, the Messenger component now contains everything to allow clients to wait for a response. On the other hand, the result returned by a handler must be transmitted to the client that waits for it.

A handler receives the message as is, without envelopes nor stamps. Therefore it is not aware if the client is waiting for a response or not. It is also a bad idea to make handlers responsible for sending the response. It is rather the Messenger component itself needs to take care of that. So, a place for response logic have to be found. As mentioned before HandleMessageMiddleware takes care of choosing and invoking a suitable handler. This is the latest middleware in the middleware chain. There are therefore two choices: modify HandleMessageMiddleware or add new middleware at the end of the middleware chain. In my opinion creating a new ReplyMiddleware is the best option:

namespace Symfony\Component\Messenger\Middleware;

use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Stamp\HandledStamp;
use Symfony\Component\Messenger\Stamp\ReplyStamp;
use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp;

/**
 * Middleware responsible for replying results returned by handler.
 */
class ReplyMiddleware implements MiddlewareInterface
{
    /**
    * {@inheritdoc}
    */
    public function handle(Envelope $envelope, StackInterface $stack): Envelope
    {
        // Take action if the message has been successfully handled and there is
        // a client that is waiting for a response.
        if (($handledStamp = $envelope->last(HandledStamp::class))
            && ($replyStamp = $envelope->last(ReplyStamp::class))
        ) {
            $response = $handledStamp->getResult();

            // If the message has been dispached synchronously, then just save
            // the handler's response.
            $replyStamp->setResponse($response);

            // If the message has been received by @see AmqpReceiver, then the
            // response must be sent to the callback queue.
            if ($amqpRecievedStamp = $envelope->last(AmqpReceivedStamp::class)) {
                $replyTo = $amqpRecievedStamp->getAmqpEnvelope()->getReplyTo();
                $amqpRecievedStamp->getConnection()->reply($response, $replyTo);
            }
        }

        return $stack->next()->handle($envelope, $stack);
    }
}

ReplyMiddleware relies on Symfony\Component\Messenger\Transport\AmqpExt\Connection::reply() method that has to be added:

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Exception\InvalidArgumentException;

/**
 * An AMQP connection.
 *
 * @author Samuel Roze <samuel.roze@gmail.com>
 */
class Connection
{
    // ...

+   /**
+    * Put a response to a callback queue.
+    *
+    * @param string $response
+    * @param string $replyTo Queue name to reply to
+    */
+   public function reply(string $response, string $replyTo)
+   {
+       $defaultExchange = new \AMQPExchange($this->channel());
+       $defaultExchange->publish($response, $replyTo);
+   }

    public function channel(): \AMQPChannel
    {
        if (null === $this->amqpChannel) {
            // creer un channel
        }

        return $this->amqpChannel;
    }

    // ...
}

But first ReplyMiddleware needs to access the current AMQP connection in order to call the reply() method. Although there may be better solutions I put the AMQP connection in AmqpReceivedStamp:

namespace Symfony\Component\Messenger\Transport\AmqpExt;

use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;

/**
 * Stamp applied when a message is received from Amqp.
 */
class AmqpReceivedStamp implements NonSendableStampInterface
{
+   private $connection;
    private $amqpEnvelope;
    private $queueName;

-   public function __construct(\AMQPEnvelope $amqpEnvelope, string $queueName)
+   public function __construct(Connection $connection, \AMQPEnvelope $amqpEnvelope, string $queueName)
    {
+       $this->connection = $connection;
        $this->amqpEnvelope = $amqpEnvelope;
        $this->queueName = $queueName;
    }

+   public function getConnection(): Connection
+   {
+       return $this->connection;
+   }
+
    public function getAmqpEnvelope(): \AMQPEnvelope
    {
        return $this->amqpEnvelope;
    }

    public function getQueueName(): string
    {
        return $this->queueName;
    }
}

I dared to break a backward compatibility in the context of this pull request because ot is rather proof of concept. The layer of backward compatibility can be added later.

It remains to add the new middleware to the default stack of middlewares. When FrameworkBundle registers buses it takes care of theirs stacks of middleware. We just need to add reply_middleware to the end of the default stack of middlewares:

namespace Symfony\Bundle\FrameworkBundle\DependencyInjection;

use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Loader\XmlFileLoader;
use Symfony\Component\HttpKernel\DependencyInjection\Extension;

/**
 * FrameworkExtension.
 *
 * @author Fabien Potencier <fabien@symfony.com>
 * @author Jeremy Mikola <jmikola@gmail.com>
 * @author Kévin Dunglas <dunglas@gmail.com>
 * @author Grégoire Pineau <lyrixx@lyrixx.info>
 */
class FrameworkExtension extends Extension
{
    //...

    private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig, array $validationConfig)
    {
        $loader->load('messenger.xml');

        $defaultMiddleware = [
            'before' => [
                ['id' => 'add_bus_name_stamp_middleware'],
                ['id' => 'reject_redelivered_message_middleware'],
                ['id' => 'dispatch_after_current_bus'],
                ['id' => 'failed_message_processing_middleware'],
            ],
            'after' => [
                ['id' => 'send_message'],
                ['id' => 'handle_message'],
+               ['id' => 'reply_middleware'],
            ],
        ];

        foreach ($config['buses'] as $busId => $bus) {
            $middleware = $bus['middleware'];

            if ($bus['default_middleware']) {
                $middleware = array_merge($defaultMiddleware['before'], $middleware, $defaultMiddleware['after']);
            }
            // ...
        }
        // ...
    }
}

FrameworkExtension helps to configure middleware stacks for each bus in an application. For example this configuration will insert two middlewares MyMiddleware and ValidationMiddleware between FailedMessageProcessingMiddleware(last in “before” array) and SendMessageMiddleware (first in “after” array):

framework:
    messenger:
        buses:
            messenger.bus.default:
                middleware:
                    - 'App\Middleware\MyMiddleware'
                    - validation

To add a middleware at the end you have to compose an entire middleware stack:

framework:
    messenger:
        buses:
            messenger.bus.default:
                default_middleware: false
                middleware:
                    - 'App\Middleware\MyMiddleware'
                    - add_bus_name_stamp_middleware
                    - reject_redelivered_message_middleware
                    - dispatch_after_current_bus
                    - failed_message_processing_middleware
                    - send_message
                    - handle_message
                    - reply_middleware

The final touch is to declare the ReplyMiddleware as a service in Symfony/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml:

<?xml version="1.0" ?>

<container xmlns="http://symfony.com/schema/dic/services"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd">

    <services>
+       <service id="messenger.middleware.reply_middleware" class="Symfony\Component\Messenger\Middleware\ReplyMiddleware" />
    </services>
</container>

Play on stamps

The following diagram shows a synchronous handling of a message. Note the stamps added by the client code and by the Messenger component to make the result available to the client.

Messenger stamps. Handle synchronously

When a message is sent asynchronously, the Messenger takes care of sending a request and receiving a response. Note that AmqpSender adds AmqpReplyStamp that allows a client to wait for a response.

Messenger stamps. Send request asynchronously

The request is serialized with its envelope and stamps by sender. It should be mentioned that stamps that implement NonSendableStampInterface are not sent. This is particularly the case for AmqpStamp, AmapReplyStamp, SentStamp. The new ReplyStamp is sent with the envelope to notify a recipient handler that the client is waiting for a response. AmqpReplyStamp contain the the connection to callback queue, it should not be sent. The following diagram shows the processing of a request by a recipient:

Messenger stamps. Receive result asynchronously