Contact Us 1-800-596-4880

Splitter Flow Control Examples

Mule runtime engine version 3.8 reached its End of Life on November 16, 2021. For more information, contact your Customer Success Manager to determine how to migrate to the latest Mule version.

Example Splitting and Aggregating with Asynchronous Flows

This example builds upon the Splitter basic example. Follow the steps below to run message fragments in asynchronous flows and then aggregate them back into a single message.

Studio Visual Editor

  1. Drag a VM connector to the end of the flow.

  2. Drag a second VM connector outside the existing flow, below it. This creates a new flow.

  3. Drag the existing aggregator and logger you had in the first flow to the new second flow, after the VM connector.

    splitter+flow+3
  4. Configure the two VM connectors. Change both their Queue Path to step2.

    vm4

    After you configure both VMs with the same Queue Path, they are linked. Messages that arrive to the first VM continue their path out of the second VM.

    What you have at this point appears to work identically to what you built in the first example. There is, however, one key difference: each fraction of the message is processed simultaneously rather than in sequence. If you deploy your app to a cluster of servers, this has a big effect on performance.

You should see four messages logged into the console: the first three should be short, one for every "actor" XML element (notice the ID attribute in each message). After these first three messages there should be a fourth, longer message, which is logged after the aggregator has run. Notice two things:

  • Although the aggregator was triggered three times, once for every fraction of the message that reached it, it produced one single output message, only when all of the fractions were in place

  • The aggregator assembles the message in the order in which fractions have arrived; the final message may be shuffled. If maintaining the original sequence is important to you, take a look at the Advanced Example 2 in this page.

XML Editor

  1. Add a second flow to your project.

    <flow name="SplitterExampleFlow1" >
      <file:inbound-endpoint path="./src/main/resources/" connector-ref="File" pollingFrequency="5000" responseTimeout="10000" doc:name="File">
        <file:filename-regex-filter pattern="vip.xml" caseSensitive="true"/>
      </file:inbound-endpoint>
      <splitter expression="#[xpath3('//*:actor/text()',payload,'NODESET')]" doc:name="Splitter"/>
      <logger message="#[payload]" level="INFO" doc:name="Logger"/>
      <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
      <logger message="#[payload]" level="INFO" doc:name="Logger"/>
    </flow>
    
    <flow name="xpathFlow">
    
    </flow>
  2. Move both loggers and the Collection Aggregator to the second flow and connect them and link both flows through a couple of VM connectors, an outbound connector in the first flow and an inbound connector in the second flow.

    <flow name="SplitterExampleFlow1" >
        <file:inbound-endpoint path="./src/main/resources/" connector-ref="File" pollingFrequency="5000" responseTimeout="10000" doc:name="File">
            <file:filename-regex-filter pattern="vip.xml" caseSensitive="true"/>
        </file:inbound-endpoint>
        <splitter expression="#[xpath3('//*:actor/text()',payload,'NODESET')]" doc:name="Splitter"/>
      <vm:outbound-endpoint exchange-pattern="one-way" path="step2" connector-ref="VM" doc:name="VM"/>
    </flow>
    <flow name="xpathFlow">
      <vm:inbound-endpoint exchange-pattern="one-way" path="step2" connector-ref="VM" doc:name="VM"/>
      <logger message="#[payload]" level="INFO" doc:name="Logger"/>
      <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
      <logger message="#[payload]" level="INFO" doc:name="Logger"/>
    </flow>

    Provide these same attributes for both VM connectors:

    Attribute Value

    exchange-pattern

    one-way

    After both VMs share the same Queue Path, they are linked. Messages that arrive to the first VM continue their path out of the second VM. What you have at this point appears to work identically to what you built in the first example. There is, however, one key difference: each fraction of the message processes simultaneously rather than in sequence. If you deploy your app to a cluster of servers this has a big effect on performance.

You should see four messages logged into the console: the first three should be short, one for every "actor" XML element (notice the ID attribute in each message). After these first three messages there should be a fourth, longer message, which is logged after the aggregator has run. Notice two things:

  • Although the aggregator was triggered three times, once for every fraction of the message that reached it, it produced one single output message, only when all of the fractions were in place.

  • The aggregator assembles the message in the order in which fractions have arrived; the final message may be shuffled. If maintaining the original sequence is important to you, take a look at the Advanced Example 2 in this page.

Full Example Code

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:vm="http://www.mulesoft.org/schema/mule/vm"
xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd">

<file:connector name="File" autoDelete="false" streaming="true" validateConnections="true" doc:name="File"/>
<vm:connector name="VM" validateConnections="true" doc:name="VM"/>

<flow name="SplitterExampleFlow1" >
  <file:inbound-endpoint path="./src/main/resources/" connector-ref="File" pollingFrequency="5000" responseTimeout="10000" doc:name="File">
    <file:filename-regex-filter pattern="vip.xml" caseSensitive="true"/>
  </file:inbound-endpoint>
  <splitter expression="#[xpath3('//*:actor/text()',payload,'NODESET')]" doc:name="Splitter"/>
  <vm:outbound-endpoint exchange-pattern="one-way" path="step2" connector-ref="VM" doc:name="VM"/>
</flow>
<flow name="xpathFlow">
  <vm:inbound-endpoint exchange-pattern="one-way" path="step2" connector-ref="VM" doc:name="VM"/>
  <logger message="#[payload]" level="INFO" doc:name="Logger"/>
  <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
  <logger message="#[payload]" level="INFO" doc:name="Logger"/>
</flow>
</mule>

Example Reordering Before Aggregating

This example builds upon the previous example.

If fractions of the message are being processed in parallel in different servers, there’s a good chance that they may take different lengths of time to be processed, and consequently fall out of order. The following example solves that problem.

Follow the steps below to:

  • Run message fragments in asynchronous flows.

  • Arrange them back into the original sequence.

  • Aggregate them back into a single message that follows the original sequence.

Studio Visual Editor

  1. Add a Resequencer Flow Control before the aggregator

    splitter+flow+5

    The Resequencer waits for all of the messages in the group to arrive (keeping track of MULE_CORRELATION_ID and MULE_CORRELATION_GROUP_SIZE) and then reorder them according to their MULE_CORRELATION_SEQUENCE index.

    The Resequencer outputs three distinct messages, so the Aggregator is still needed to merge them into one.

  2. Run the Mule project.

With the Resequencer in place, messages now reach the aggregator in the correct order and are assembled accordingly.

To really take advantage of splitting the message, you should deploy your app to a cluster of servers. By following the steps below, you can simulate the random delays of a cluster of servers.

The following is not an implementable solution but rather a proof of concept that highlights what occurs in the flow.

  1. Add a Groovy component in the second flow, between the VM and the logger.

    splitter+flow+6
  2. Copy the following code into the Groovy Component:

    random = new Random()
    randomInt = random.nextInt(10)*1000
    Thread.sleep(randomInt)
    return payload

    This snippet of code simply introduces a random delay of up to 10 seconds. As each message is running asynchronously, this delay can potentially alter the order in which messages move on to the next step, simulating what could happen in a real implementation with parallel servers processing each fraction of the message.

XML Editor

  1. Add a Resequencer Flow Control before the aggregator.

    <resequencer failOnTimeout="true" doc:name="Resequencer"/>
    Attribute Value

    failOnTimeout

    true

    doc:name

    Resequencer

    The Resequencer waits for all of the messages in the group to arrive (keeping track of MULE_CORRELATION_ID and MULE_CORRELATION_GROUP_SIZE) and then reorders them according to their MULE_CORRELATION_SEQUENCE index.
    The Resequencer outputs three distinct messages, so the Aggregator is still needed to merge them into one.

With the Resequencer in place, messages now reach the aggregator in the correct order and are assembled accordingly.

To really take advantage of splitting the message, you should deploy your app to a cluster of servers. By following the steps below, you can simulate the random delays of a cluster of servers.

The following is not an implementable solution but rather a proof of concept that highlights what occurs in the flow.
  1. Add a Groovy component in the second flow, between the VM and the first logger.

    <scripting:component doc:name="Groovy">
        <scripting:script engine="Groovy">
            <![CDATA[
                random = new Random()
                randomInt = random.nextInt(10)*1000
                Thread.sleep(randomInt)
                return payload
            ]]>
        </scripting:script>
    </scripting:component>

    This snippet of code simply introduces a random delay of up to 10 seconds. As each message is running asynchronously, this delay can potentially alter the order in which messages move on to the next step, simulating what could happen in a real implementation with parallel servers processing each fraction of the message.

Run the project.

You should now see three messages logged into the console, one for every "actor" XML element. These likely not have their MULE_CORRELATION_SEQUENCE indexes in order due to the random delays caused by the Groovy code.

Below these, you see a fourth longer message where these indexes are put back in order by the Resequencer.

INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [#text: Will Ferrell]
INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [#text: Christian Bale]
INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [#text: Liam Neeson]
INFO  YYYY-DD-MM HH:MM:SS,SSS [] org.mule.api.processor.LoggerMessageProcessor: [[#text: Christian Bale], [#text: Liam Neeson], [#text: Will Ferrell]]

Full Example Code

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:vm="http://www.mulesoft.org/schema/mule/vm" xmlns:mulexml="http://www.mulesoft.org/schema/mule/xml" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/xml http://www.mulesoft.org/schema/mule/xml/current/mule-xml.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd">

<file:connector name="File" autoDelete="false" streaming="true" validateConnections="true" doc:name="File"/>
<vm:connector name="VM" validateConnections="true" doc:name="VM"/>

<flow name="SplitterExampleFlow1" >
  <file:inbound-endpoint path="./src/main/resources/" connector-ref="File" pollingFrequency="15000" responseTimeout="10000" doc:name="File">
      <file:filename-regex-filter pattern="vip.xml" caseSensitive="true"/>
  </file:inbound-endpoint>
  <splitter expression="#[xpath3('//*:actor/text()',payload,'NODESET')]" doc:name="Splitter"/>
<vm:outbound-endpoint exchange-pattern="one-way" path="step2" connector-ref="VM" doc:name="VM"/>
</flow>

<flow name="xpathFlow">
  <vm:inbound-endpoint exchange-pattern="one-way" path="step2" connector-ref="VM" doc:name="VM"/>
  <scripting:component doc:name="Groovy">
    <scripting:script engine="Groovy">
      <![CDATA[random = new Random()
      randomInt = random.nextInt(10)*1000
      Thread.sleep(randomInt)
      return payload]]>
    </scripting:script>
  </scripting:component>
  <logger message="#[payload]" level="INFO" doc:name="Logger"/>
  <resequencer failOnTimeout="true" doc:name="Resequencer"/>
  <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
  <logger message="#[payload]" level="INFO" doc:name="Logger"/>
</flow>
</mule>