FAILURE
Until Successful Scope
Mule Runtime Engine versions 3.5, 3.6, and 3.7 reached End of Life on or before January 25, 2020. For more information, contact your Customer Success Manager to determine how you can migrate to the latest Mule version. |
The until-successful scope processes messages through its processors until the process succeeds. By default, until-successful’s processing occurs asynchronously from the main flow. After passing a message into the until-successful scope, the main flow immediately regains control of the thread. However, you can configure until-successful to run synchronously relative to the main flow.
Until-successful repeatedly retries to process a message that is attempting to complete an activity such as:
-
Dispatching to outbound endpoints, for example, when calling a remote web service that may have availability issues.
-
Executing a component method, for example, when executing on a Spring bean that may depend on unreliable resources.
-
A sub-flow execution, to keep re-executing several actions until they all succeed,
-
Any other message processor execution, to allow more complex scenarios.
The until successful component requires a mandatory object store to work. However Anypoint Studio doesn’t enforce this. If an object store is not declared, Studio throws an InitialisationException. For more information, see Until-Successful and Object Store. The object store needs to be exclusive for each until-successful instance. |
See Also:
Success and Failure
As this scope continues to processes messages until successful, it is important to understand the definition of successful in context of Mule message processing.
Result | Description |
---|---|
A message processor within the until-successful scope throws an exception or contains an exception payload. Also, if an expression is provided in the attribute |
|
SUCCESS |
None of the message processors within the until-successful scope throw any exceptions or contain an exception payload, or they do not return any message at all (i.e. the flow ends in a one-way outbound endpoint) |
conditional |
Where you have configured a failure expression (see below), Mule evaluates the return message against the expression to dynamically determine if the action has failed or succeeded. |
Basic Attributes
Below is a table with a description of the main configurable attributes of the element:
Attribute | Description |
---|---|
objectStore-ref |
Reference to the org.mule.api.store.ListableObjectStore that is used to store events pending to process or reprocess |
maxRetries |
Specifies the maximum number of retries that will be attempted |
millisBetweenRetries |
Specifies the minimum interval between two attempts to process, in milliseconds. The actual interval depends on the previous execution, but should not exceed twice this number. The default value is 60000 milliseconds (one minute) |
failureExpression |
Specifies an expression that, when it evaluated to true, determines that the processing of one route was a failure. If no expression is provided, only an exception will be treated as a processing failure. |
ackExpression |
Specifies an expression that, when evaluated to true, determines the synchronous response of until-successful. |
deadLetterQueue-ref |
The endpoint or message processor to which undeliverable messages are sent after all retries have been executed unsuccessfully. |
Configuring failureExpression
If the scope fails, a RetryPolicyExhaustedException
is created, wrapped as a MessagingException
and passed to the exception handler of the flow that contains the until-successful
element.
The following illustrates how to configure the failureExpression
returned by an until-successful scope:
<until-successful objectStore-ref="objectStore"
failureExpression="#[message.inboundProperties['http.status'] != 202]"
maxRetries="6" secondsBetweenRetries="600">
<http:outbound-endpoint host="localhost" port="${port2}" path="flakey"
method="POST" exchange-pattern="one-way" />
</until-successful>
When all Else has Failed
If message processing keeps failing and the maximum number of retries is exceeded, the default behavior of the Until Successful message processor consists in logging the message details and dropping it.
Should you want to perform a specific action on the discarded message (for example, storing it in a file or database), it is possible to configure a Dead Letter Queue endpoint” where dropped messages are sent.
For more information, see Configuring a Dead Letter Queue.
Configuring a Dead Letter Queue
To manage messages which have exhausted the number of maxRetries
within the until-successful scope, you can define a DLQ (dead letter queue) endpoint to which Mule can send such messages. Refer to the code sample below for an example configuration.
<until-successful objectStore-ref="objectStore"
deadLetterQueue-ref="dlqChannel"
maxRetries="3"
secondsBetweenRetries="10">
...
</until-successful>
Asynchronous Until-Successful
By default, until-successful processes messages asynchronously relative to the main flow in which it resides. The section below describe configurations you can customize in your asynchronous until-successful.
Until-Successful and Object Store
This message processor needs an ListableObjectStore instance in order to persist messages pending (re)processing. There are several implementations available in Mule, including the following:
-
DefaultInMemoryObjectStore: default in-memory store
-
DefaultPersistentObjectStore: default persistent store
-
FileObjectStore: file-based store
-
QueuePersistenceObjectStore: global queue store
-
SimpleMemoryObjectStore: in-memory store
See Mule Object Stores for further information about object stores in Mule. The following code sample illustrates how to configure an in-memory store:
<spring:bean id="objectStore" class="org.mule.util.store.SimpleMemoryObjectStore" />
Customizing the Threading Profile of Asynchronous Until-Successful
This feature enables you to customize the threading profile of an asynchronous until-successful scope.
Studio Visual Editor
-
In the Properties Editor of the Until Successful Scope in your flow, click to access the Threading tab.
-
Click to select the Configure threading profile radio button.
-
Enter values in the threading profile fields to customize the threading behavior.
Attribute |
Type |
Required |
Default Value |
Description |
Max Buffer Size |
integer |
no |
Determines how many requests are queued when the pool is at maximum usage capacity and the pool exhausted action is WAIT. The buffer is used as an overflow.* |
|
Max Active Threads |
integer |
no |
The maximum number of threads that will be used. |
|
Max Idle Threads |
integer |
no |
The maximum number of idle or inactive threads that can be in the pool before they are destroyed. |
|
Pool Exhausted Action |
WAIT/DISCARD/DISCARD_OLDEST/ABORT/RUN |
no |
When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks. Possible values are: WAIT (wait until a thread becomes available; don’t use this value if the minimum number of threads is zero, in which case a thread may never become available), DISCARD (throw away the current request and return), DISCARD_OLDEST (throw away the oldest request and return), ABORT (throw a RuntimeException), and RUN (the default; the thread making the execute request runs the task itself, which helps guard against lockup). |
|
Thread TTL |
integer |
no |
Determines how long an inactive thread is kept in the pool before being discarded. |
|
Thread Wait Timeout |
integer |
no |
How long to wait in milliseconds when the pool exhausted action is WAIT. If the value is negative, it will wait indefinitely. |
Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing: * If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing. * *If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread. * If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
If you configure a threading profile with poolExhaustedAction=WAIT and a maxBufferSize of a positive value, the thread pool does not grow from maxThreadsIdle (corePoolSize) towards maxThreadsActive (maxPoolSize) unless the queue is completely filled up.
XML Editor or Standalone
To the until-successful element, add child element threading-profile
. Configure the attributes of the child element according to the table below.
<until-successful>
<threading-profile maxThreadsActive="1" maxThreadsIdle="1" poolExhaustedAction="RUN"/>
<set-payload/>
<until-successful>
Attribute |
Type |
Required |
Default Value |
Description |
maxBufferSize |
integer |
no |
Determines how many requests are queued when the pool is at maximum usage capacity and the pool exhausted action is WAIT. The buffer is used as an overflow.* |
|
maxThreadsActive |
integer |
no |
The maximum number of threads that will be used. |
|
maxThreadsIdle |
integer |
no |
The maximum number of idle or inactive threads that can be in the pool before they are destroyed. |
|
poolExhaustedAction |
WAIT/DISCARD/DISCARD_OLDEST/ABORT/RUN |
no |
When the maximum pool size or queue size is bounded, this value determines how to handle incoming tasks. Possible values are: WAIT (wait until a thread becomes available; don’t use this value if the minimum number of threads is zero, in which case a thread may never become available), DISCARD (throw away the current request and return), DISCARD_OLDEST (throw away the oldest request and return), ABORT (throw a RuntimeException), and RUN (the default; the thread making the execute request runs the task itself, which helps guard against lockup). |
|
threadTTL |
integer |
no |
Determines how long an inactive thread is kept in the pool before being discarded. |
|
threadWaitTimeout |
integer |
no |
How long to wait in milliseconds when the pool exhausted action is WAIT. If the value is negative, it will wait indefinitely. |
*Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:
-
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
-
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
-
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
If you configure a threading profile with poolExhaustedAction=WAIT and a maxBufferSize of a positive value, the thread pool does not grow from maxThreadsIdle (corePoolSize) towards maxThreadsActive (maxPoolSize) unless the queue is completely filled up.
Synchronous Until-Successful
Out of the box, the until-successful scope processes messages asynchronously. After passing a message into the until-successful scope, the main flow immediately regains control of the thread thus prohibiting any returned response from the processing activities which occur within the scope.
However, in some situations, you may need until-successful to process messages synchronously so that the main flow waits for processing within the scope to complete before continuing processing. To address these needs, the Mule enables you to configure the scope to process messages synchronously.
When set to process message synchronously, until-successful executes within the thread of the main flow, then returns the result scope’s processing on the same thread.
Studio Visual Editor
In the Threading tab of the Until Successful’s Properties Editor, click to select Synchronous.
XML Editor or Standalone
To the until-successful element, add the synchronous
attribute with the value set to true
.
<until-successful synchronous="true">
<set-payload/>
</until-successful>
When set to process synchronously, the until-successful scope does not accept the configuration of the following child element and attributes:
-
threading-profile
(synchronous until-successful does not need a ThreadPool) -
objectStore-ref
(synchronous until-successful is not required to persist messages between retries) -
deadLetterQueue-ref
(when the retry count is exhausted, Mule executes the exception strategy)
Example - Until-Successful Usage
<until-successful objectStore-ref="objectStore" maxRetries="5" secondsBetweenRetries="60" doc:name="Until Successful">
<http:request config-ref="HTTP_Request_Configuration" path="submit" method="POST" doc:name="HTTP"/>
</until-successful>
Example - Retry Sending to an HTTP-based Service
This example demonstrates how to retry sending to an HTTP-based service until success:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/tracking
http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/vm
http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd">
<http:request-config name="HTTP_Request_Configuration"
host="http://acme.com/api/flakey" port="8082"
doc:name="HTTP Request Configuration"/>
<spring:bean id="objectStore"
class="org.mule.util.store.SimpleMemoryObjectStore" />
<flow name="retrying-http-bridge">
<vm:inbound-endpoint exchange-pattern="one-way"
path="acme-bridge" doc:name="VM"/>
<until-successful objectStore-ref="objectStore" maxRetries="5"
failureExpression="#[header:INBOUND:http.status != 202]"
doc:name="Until Successful">
<http:request config-ref="HTTP_Request_Configuration"
path="/" method="POST" doc:name="HTTP"/>
</until-successful>
</flow>
</mule>
The Until Successful message processor relies on Mule ObjectStore for persisting the events it processes. In this example, we use an in-memory implementation: a persistent implementation would be required in order to ensure that nothing gets lost in case of a restart or crash.
This example retries every 10 minutes for an hour. Afterwards, the message is discarded.
This example interacts synchronously (request-response) with the outbound HTTP endpoint to ensure the remote web service correctly accepted the POSTed message (that is that it replied with a 202 status code).
Example - Retry Other Flows
The following example shows that other flows can be retried the same way:
<flow name="subflow-retrier">
<vm:inbound-endpoint path="signup"
exchange-pattern="request-response"/>
<until-successful objectStore-ref="objectStore"
ackExpression="#[message:correlationId]"
maxRetries="3"
secondsBetweenRetries="10">
<flow-ref name="signup-flow" />
</until-successful>
</flow>
Notice how the Until Successful message processor has been configured to synchronously acknowledge it has accepted the inbound event for processing by returning the current message correlation ID. Sending to the “signup” VM endpoint therefore returns the correlation ID of the message whose processing by the sub-flow named “signup-flow” is tried (and retried).
Example - Configure Object Stores
The following example demonstrates how to configure object stores in the following three situations:
-
idempotent filter with an in-memory object store
-
idempotent filter with a persistent object store
-
Until a successful scope occurs with an in-memory object store
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<!-- Global object store definition for a Listable Object Store, used in Flow 3 below. -->
<spring:beans>
<spring:bean id="myListableObjectStore" class="org.mule.util.store.SimpleMemoryObjectStore"/>
</spring:beans>
<http:listener-config name="HTTP_Listener_Configuration" host="localhost" port="8081" doc:name="HTTP Listener Configuration"/>
<!-- Idempotent Filter with In Memory Object Store -->
<flow name="Flow1_idempotentWithInMemoryStore" doc:name="Flow1_idempotentWithInMemoryStore">
<http:listener config-ref="HTTP_Listener_Configuration" path="idempotentInMemory" doc:name="HTTP"/>
<idempotent-message-filter idExpression="#[message.payload]" throwOnUnaccepted="true" storePrefix="Idempotent_Message" doc:name="Idempotent Message">
<in-memory-store name="myInMemoryObjectStore" entryTTL="120" expirationInterval="3600" maxEntries="60000" />
</idempotent-message-filter>
<set-payload value="YAY!" doc:name="Set Payload" />
<catch-exception-strategy doc:name="Catch Exception Strategy">
<set-payload value="NAY!" doc:name="Set Payload" />
</catch-exception-strategy>
</flow>
<!-- Idempotent Filter with Persistent File Store -->
<flow name="Flow2_idempotentWithTextFileStore" doc:name="Flow2_idempotentWithTextFileStore">
<http:listener config-ref="HTTP_Listener_Configuration" path="idempotentTextFile" doc:name="HTTP"/>
<idempotent-message-filter idExpression="#[message.payload]" throwOnUnaccepted="true" storePrefix="Idempotent_Message" doc:name="Idempotent Message">
<simple-text-file-store name="mySimpleTextFileStore" directory="#[server.tmpDir + '/objectstore']" entryTTL="120" expirationInterval="3600" maxEntries="60000" />
</idempotent-message-filter>
<set-payload value="YAY!" doc:name="Set Payload" />
<catch-exception-strategy doc:name="Catch Exception Strategy">
<set-payload value="NAY!" doc:name="Set Payload" />
</catch-exception-strategy>
</flow>
<!-- Until Successful Scope with In Memory Object Store -->
<flow name="Flow3_UntilSuccessfulWithListableObjectStore" doc:name="UntilSuccessfulWithListableObjectStore">
<http:listener config-ref="HTTP_Listener_Configuration" path="hey" doc:name="HTTP"/>
<until-successful objectStore-ref="myListableObjectStore" maxRetries="15" secondsBetweenRetries="1" doc:name="Until Successful">
<processor-chain doc:name="Processor Chain">
<message-filter throwOnUnaccepted="true">
<expression-filter expression="return Math.random() < 0.1" doc:name="Expression" />
</message-filter>
<logger message="This eventually happens." doc:name="Logger" />
</processor-chain>
</until-successful>
<set-payload value="Completed" doc:name="Set Payload" />
</flow>
</mule>
See Also
-
Learn more about Tuning Performance in Mule.
-
Learn more about Scopes in Mule in general.
-
Learn more about Flows and Subflows