The Messenger component makes it easy to send and receive messages between
applications or different components of a single application. I emphasise
“easily” because this component provides an ingenious abstraction of a whole
sending and receiving machinery. One line of code is enough to send a message:
This message can be sent asynchronously to an AMQP broker, a Redis server, an
The messenger can also receive a message from the outside and direct it to an
internal handler. A handler is a piece of custom code that you add to process
The third scenario of using the Messenger recalls the event dispatcher of
Symfony. The message is processed synchronously and does not leave the
A message can be an object of any class. Message contains only useful
information without regard to transport issues. Usually, it is very useful to
attach additional information to a message about its processing by the message
bus. That is why the message is put in an envelope (an instance of Envelope
class) to which we can attach the stamps (instances of Stamp class). For
example ReceivedStamp marks the message received from the outside by a
receiver. Envelopes containing ReceivedStamp will be ignored by senders so
that the message from the outside will not be resent to the outside. Instead,
the bus message will look for a handler in the application.
MessageBus is a pivot of the Messenger. It accepts all messages and directs
them to the recipients. Despite its great importance it is a small class that
with one property and one method:
The whole machinery of receiving and sending messages resides in the multiple
middlewares. The only real responsibility of the bus message is to select the
first middleware and to pass it an envelope and all the registered middlewares.
It is worth noting that there may be more than one message bus.
All middlewares implement MiddlewareInterface. Middleware receives a message,
processes it and then optionally passes it to the next middleware. And so on.
The order of middlewares is very important. Each bus has its own stack of
middlewares. When the configuration of a message bus is entrusted to the
FrameworkBundle it adds the following middlewares:
add_bus_name_stamp_middleware adds BusNameStamp that contains the bus name
dispatching the message.
reject_redelivered_message_middleware is only used when a message arrives
from an AMQP broker. When the message is presented the second time this
middleware sends a negative ack to the AMQP broker and stops the middleware
chain (hence it stops the treatment of the message). This happens when the
AMQP broker delivers the message to the application and does not receive an
acknowledgment (ack), then it redelivers the same message.
reject_redelivered_message_middleware exists to break the infinite loop of
redelivery of problematic messages.
dispatch_after_current_bus retains the processing of new messages sent by a
handler (who is already processing its message). More information in the
failed_message_processing_middleware. An exception may be thrown while
processing a message. In this case the message will be redirected to a
“failed” queue with the SentToFailureTransportStamp stamp. The
“messenger:failed:retry” command will re-dispatch these messages once again
and this middleware will correct these messages as if they came from a normal
Your middlewares. If you add your own middleware in the default
configuration, FrameworkBundle will add it in this position. In addition to
your custom middleware you can also use those provided by Symfony:
TraceableMiddleware, ValidationMiddleware. Your custom middleware can
change/treat/log messages, add or delete stamps, stop the middleware chain.
send_message passes a message to a sender and stops the middleware chain
if it is the message created in the application. If there is no suitable
sender for a message, it will be passed to the next middleware. If the
envelope carries ReceivedStamp then send_message middleware does nothing
to avoid resending the external message to the outside.
handle_message chooses a suitable handler in the application passes it the
Send or handle ?
Messages are either sent to the outside by a sender or processed synchronously
Handlers declare which type of messages they want to process by indicating a
class, a superclass or an interface of messages. A message will be passed to the
all interested handlers. Handlers receive only the message itself without any
stamp. When a message is processed synchronously it is possible to use
HandledStamp to retrieve the result returned by a handler:
HandledStamp is only added by HandleMessageMiddleware middleware when
several conditions are met:
the message reaches this middleware. That is, no previous middleware has
stopped the middleware chain,
there is at least one handler that is interested in this type of message. The
handler treats a message without throwing an exception and returns a value.
If a message is sent by a sender in SendMessageMiddleware the latter stops the
execution of the middleware chain and HandleMessageMiddleware does not run:
RPC with AMQP
The functionality proposed in the pull request is quite specific, hence it is
possible that this idea is not accepted by the community. However, it is
interesting to carry out a proof of concept and test the flexibility of the
Messenger component. In this article as well as in the pull request I will
focus only on AMQP transport.
When a message is processed synchronously, it does not quit the application.
A client (that generated and sent the message) will anyway wait for the handler
to treat the message. Hence the client can retrieve the result returned by the
If the message gets sent to the outside, its processing is
often asynchronous. The sender just sends the message without waiting for the
message to be treated. Consequently, the external handler is not able to provide
the result to the client. However, it is possible to use the pattern called
Remote Procedure Call or RPC:
The queue “messages” is an ordinary queue: several customers can send requests
to it, several handlers can consume requests from it. During sending a request
to a queue “messages” the client creates an exclusive temporary queue. It is a
callback queue dedicated exclusively for this client, i.e. there can only be one
consumer connected to this queue. The opaque name of a callback queue is
attached to the request as the “reply_to” property. Once the request is sent
the client can continue executing, then it can stop and wait for the response.
The response will be put in the callback queue.
The handler sends its response in a queue indicated in the “reply_to” property
of the request:
The response gets dispatched to the waiting client. Having received the response
the client destroys the connection to the callback queue. Therefore AMQP broker
automatically deletes it.
Add PRC to the Messenger component
Firstly, give the customer the opportunity to indicate that it will wait for the
response from a handler. The perfect candidate for this is a new ReplyStamp
ReplyStamp will also be used to retrieve the response:
Here is a client that is waiting for the response for the message processed
synchronously (without sending it to the outside):
And here is a client that is waiting for the response for the message processed
asynchronously (sent to the AMQP broker):
In my opinion it is important that the client is aware that the request is
treated asynchronously. The code that is explicit about it helps anticipate
potential problems and facilitate debugging. For this reason it is better to use
a specific AmqpReplyStamp stamp rather than reusing ReplyStamp to hide RPC.
AmqpReplyStamp must be attached to the request envelope when it is sent to
AMQP broker. To attach this stamp, you must first create a temporary callback
queue. AmqpSender is a perfect place for this functionality:
The $connection object contains useful methods for interacting with AMQP
broker. We need to add the
Symfony\Component\Messenger\Transport\AmqpExt::createReplyQueue() method to be
able to create a callback queue:
On the one hand, the Messenger component now contains everything to allow
clients to wait for a response. On the other hand, the result returned by a
handler must be transmitted to the client that waits for it.
A handler receives the message as is, without envelopes nor stamps. Therefore it
is not aware if the client is waiting for a response or not. It is also a bad
idea to make handlers responsible for sending the response. It is rather the
Messenger component itself needs to take care of that. So, a place for response
logic have to be found. As mentioned before HandleMessageMiddleware takes care
of choosing and invoking a suitable handler. This is the latest middleware in
the middleware chain. There are therefore two choices: modify
HandleMessageMiddleware or add new middleware at the end of the middleware
chain. In my opinion creating a new ReplyMiddleware is the best option:
ReplyMiddleware relies on
Symfony\Component\Messenger\Transport\AmqpExt\Connection::reply() method that
has to be added:
But first ReplyMiddleware needs to access the current AMQP connection in order
to call the reply() method. Although there may be better solutions I put the
AMQP connection in AmqpReceivedStamp:
I dared to break a backward compatibility in the context of this pull request
because ot is rather proof of concept. The layer of backward compatibility can
be added later.
It remains to add the new middleware to the default stack of middlewares. When
FrameworkBundle registers buses it takes care of theirs stacks of middleware.
We just need to add reply_middleware to the end of the default stack of
FrameworkExtension helps to configure middleware stacks for each bus in an
application. For example this configuration will insert two middlewares
MyMiddleware and ValidationMiddleware between
FailedMessageProcessingMiddleware(last in “before” array) and
SendMessageMiddleware (first in “after” array):
To add a middleware at the end you have to compose an entire middleware stack:
The final touch is to declare the ReplyMiddleware as a service in
Play on stamps
The following diagram shows a synchronous handling of a message. Note the stamps
added by the client code and by the Messenger component to make the result
available to the client.
When a message is sent asynchronously, the Messenger takes care of sending a
request and receiving a response. Note that AmqpSender adds AmqpReplyStamp
that allows a client to wait for a response.
The request is serialized with its envelope and stamps by sender. It should be
mentioned that stamps that implement NonSendableStampInterface are not sent.
This is particularly the case for AmqpStamp, AmapReplyStamp, SentStamp.
The new ReplyStamp is sent with the envelope to notify a recipient handler
that the client is waiting for a response. AmqpReplyStamp contain the the
connection to callback queue, it should not be sent. The following diagram shows
the processing of a request by a recipient: