<flow name="asynchronousToSynchronous">
<vm:inbound-endpoint path="anyUniqueEndpointName" exchange-pattern="one-way"/>
<vm:outbound-endpoint path="output" exchange-pattern="one-way"/>
</flow>
Flow Processing Strategies
A flow processing strategy determines how Mule implements message processing for a given flow. There are several decisions that have to be made:
-
Should the message be processed synchronously (on a single thread) or asynchronously (writing the message to a queue for another thread to run)?
-
If asynchronously, what are the properties of the pool of threads used to process messages found in the queue?
-
Again, if asynchronously, what are the properties of the queue?
Mule features two main flow processing strategies, each one optimal for certain flows. This page introduces both of these processing strategies. It then covers the criteria Mule uses to determine the optimal processing strategy for each flow. Next, this page explains the circumstances under which you may want to override Mule’s initial choice of processing strategy. Finally, this page covers the parameters you can change or fine-tune in the processing strategy used for a given flow.
About the Synchronous Flow Processing Strategy
The synchronous approach is used to processes each message on the same thread that initially received the message. After the flow receives a message, all processing, including the processing of the response, is done in that same thread The one exception is the processing for sections explicitly marked with the async
element. The synchronous strategy is ideally suited to flows where:
-
The sender of the message expects a response. This is known as a "request-response" exchange pattern.
-
The flow needs to meet the requirements of transactional processing. In other words, all the steps in the flow are considered a single unit, which must succeed entirely or fail entirely. Additionally, appropriate parties (such as the sender of the message or the administrator of the business process encapsulated by the flow) must be notified of the result. This means that a transactional flow must not hand off processing to other threads, where errors can occur after the transaction is completed.
-
The flow’s inbound endpoint must be notified of all errors that occur during the processing of the message. This situation is discussed further in Reliability Patterns.
About the Queued-Asynchronous Flow Processing Strategy
The queued-asynchronous approach uses a queue to decouple the flow’s receiver from the other steps in the flow. This means that once the receiver places a message into a queue, it can immediately return and accept a new incoming message. Furthermore, each message waiting in the queue can be assigned a different thread from the pool of threads. This is managed by a work manager, which manages multiple threads and dynamically chooses a thread to process the message taken from the queue. In this way, all assigned threads can execute simultaneously. Such parallel processing is ideal for situations where the receiver can, at peak times, accept messages significantly faster than the rest of the flow can process those messages.
The specific type of queue implemented for the queued-asynchronous flow processing strategy is known as a SEDA queue .
|
Under the queued-asynchronous processing strategy, the receiver does not have to wait before accepting the next message, and the processing speed for the rest of the steps in the flow is effectively multiplied, because multiple messages are being processed at the same time.
However, the increased throughput facilitated by the asynchronous approach comes at the cost of transactional reliability. Also, the queued-asynchronous approach, which uses two threads to process each message, is not suitable for request-response exchange patterns, which need to be performed entirely on a single thread.
How Mule Selects a Flow Processing Strategy
Each flow varies in the degree to which it can benefit from the transactional reliability of synchronous processing, as opposed to the high throughput of the queued-asynchronous alternative. Mule automatically weighs key factors specific to each flow and decides whether to set up synchronous or asynchronous processing.
To select a processing strategy for a given flow, Mule evaluates that flow’s exchange pattern.
Flows employ one of two exchange patterns, as follows (note that a flow’s exchange pattern is determined by the exchange pattern of its inbound endpoint):
-
Request-Response exchange patterns involve a message submitted to the flow’s receiver by some external sender, who then waits for a response from the flow.
-
One-way exchange patterns.
In addition, either of these exchange patterns might involve a transaction.
As the following table details, Mule chooses a flow processing strategy based on the exchange pattern used by the flow and whether or not the flow is transactional:
Exchange Pattern | Transactional? | Flow Processing Strategy |
---|---|---|
Request-Response |
yes |
Synchronous |
Request-Response |
no |
Synchronous |
One-way |
yes |
Synchronous |
One-way |
no |
Queued-Asynchronous |
Changing the Processing Strategy for Your Flow
Mule’s choice of processing strategy is almost always optimal for the flow to which it is applied. However, the following options exist:
-
In cases where Mule has selected queued-asynchronous processing for a flow, you can specify a synchronous flow instead (see Forcing a Specific Flow Processing Strategy). You might want to specify a synchronous flow to achieve reliability. Recall that the synchronous strategy is ideally suited for flows where the flow’s inbound endpoint must be notified of all errors that occur during the processing of the message. This situation is discussed further in Reliability Patterns.
Note that you cannot force a synchronous flow to become asynchronous. -
You can accept Mule’s choice of flow processing strategy, but then proceed to fine-tune that strategy as well. See Fine-Tuning a Processing Strategy. Note that you can only fine-tune a queued-asynchronous strategy. You cannot do any fine-tuning for a synchronous flow.
-
You can create a custom flow processing strategy to fit your exact needs. For instance, you might prefer a queued-asynchronous flow that uses an increased number of threads to handle high peak loads. See Creating a Processing Strategy.
Forcing a Specific Flow Processing Strategy
The procedure to change a processing strategy for an individual flow is straightforward – you don’t need to set queuing profiles or service threading profiles. The only change allowed is to force a flow that would otherwise be asynchronous to be synchronous. Flows configured for request-response or transactional processing cannot be converted from synchronous to asynchronous. To force a flow to be synchronous, add the processingStrategy attribute to the flow that you want to change and set it to synchronous
. This is illustrated in the code examples below:
The original configuration
The modified configuration:
<flow name="asynchronousToSynchronous" processingStrategy="synchronous">
<vm:inbound-endpoint path="anyUniqueEndpointName" exchange-pattern="one-way"/>
<vm:outbound-endpoint path="output" exchange-pattern="one-way"/>
</flow>
Fine-Tuning a Processing Strategy
You can fine-tune a queued-asynchronous processing strategy by:
-
Changing the number of threads available to the flow.
-
Limiting the number of messages that can be queued.
-
Specifying a queue store to persist data.
You achieve this fine-tuning by specifying parameters for a global processing strategy, then referencing the parameters within the flow or flows you wish to fine-tune. If you don’t specify a certain configuration parameter at either the global or local levels, Mule sets a default value for that parameter.
The following example defines a global processing strategy (asynchronous-processing-strategy
), which specifies maxThreads="500"
. Together, this parameter and its value specify the maximum number of threads available for use by the queue. The example also presents a flow which references the global processing strategy. This flow:
-
Will be asynchronous, because it refers to the asynchronous-processing strategy.
-
Will allow up to 500 concurrent threads, because of the value set for
maxThreads
.
<queued-asynchronous-processing-strategy name="allow500Threads" maxThreads="500"/>
<flow name="manyThreads" processingStrategy="allow500Threads">
<vm:inbound-endpoint path="manyThreads" exchange-pattern="one-way"/>
<vm:outbound-endpoint path="output" exchange-pattern="one-way"/>
</flow>
The following table lists the configuration parameters you can specify for the queued-asynchronous strategy. (The synchronous processing strategy cannot be configured):
name | type | queued only | description | optional |
---|---|---|---|---|
maxBufferSize |
integer |
no |
Determines how many requests are queued when the pool reaches maximum capacity and the pool exhausted action is WAIT. The buffer is used as an overflow. |
yes |
maxQueueSize |
integer |
yes |
The maximum number of messages that can be queued. |
yes |
maxThreads |
integer |
no |
The maximum number of threads that can be used. |
yes |
minThreads |
integer |
no |
The number of idle threads kept in the pool when there is no load. |
yes |
poolExhaustedAction |
enum |
no |
When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks |
yes |
queueTimeout |
integer |
yes |
The timeout used when taking events from the queue. |
yes |
threadTTL |
integer |
no |
Determines how long an inactive thread is kept in the pool before being discarded. |
yes |
threadWaitTimeout |
integer |
no |
How long to wait in milliseconds when the pool exhausted action is WAIT. If the value is negative, the wait is infinite. |
yes |
Configuring the Queue Object store
For the queued-asynchronous strategy, you can implement message persistence by specifying a queue store. If you don’t specify an object store, Mule provides a default in-memory store. However, for a cluster, Mule creates the default in-memory store in the shared memory grid. For details, see Mule Object Stores.
Creating a Processing Strategy
If neither the synchronous nor asynchronous processing strategies fit your needs, and fine-tuning the asynchronous strategy is not sufficient, you can create a custom processing strategy. You create the custom strategy through the <custom-processing-strategy>
element and configure it using Spring bean properties. This custom processing strategy must implement the org.mule.api.processor.ProcessingStrategy
interface.
The following code example illustrates a custom processing strategy:
<custom-processing-strategy name="customStrategy" class="org.mule.CustomProcessingStrategy">
<spring:property name="threads" value="500"/>
</custom-processing-strategy>
Reusing Processing Strategies
You can use a named processing strategy, such as the ones created in the previous two sections, on as many flows in an application as you like. Simply:
-
Declare the processing strategy, as in:
<queued-asynchronous-processing-strategy name="allow500Threads" maxThreads="500"/>
-
Refer to it in appropriate flows, for instance:
<flow name="acceptOrders" processingStrategy="allow500Threads">
<vm:inbound-endpoint path="acceptOrders" exchange-pattern="one-way"/>
<vm:outbound-endpoint path="commonProcessing" exchange-pattern="one-way"/>
</flow>
<flow name="processNewEmployee" processingStrategy="allow500Threads">
<vm:inbound-endpoint path="processNewEmployee" exchange-pattern="one-way"/>
<vm:outbound-endpoint path="commonProcessing" exchange-pattern="one-way"/>
</flow>
<flow name="receiveInvoice" processingStrategy="allow500Threads">
<vm:inbound-endpoint path="receiveInvoice" exchange-pattern="one-way"/>
<vm:outbound-endpoint path="commonProcessing" exchange-pattern="one-way"/>
</flow>
See Also
-
Check out the Asynchronous Message Cheat Sheet on our MuleSoft Blog.