Contact Us 1-800-596-4880

Batch Processing Reference

Mule runtime engine version 3.8 reached its End of Life on November 16, 2021. For more information, contact your Customer Success Manager to determine how to migrate to the latest Mule version.

Enterprise, CloudHub

Terminology

Term Description Element

Batch

A group of records which Mule processes individually, by record.

n/a

Batch Commit

A scope which accumulates records into chunks to prepare bulk upserts to external source or service.

batch:commit

Batch Job

The top-level element in an application in which Mule processes a message payload as a batch of records. The term batch job is inclusive of all four phases of processing: Input, Load and Dispatch, Process, and On Complete.

batch:job

Batch Job Instance

An occurrence in a Mule application resulting from the execution of a batch job in a Mule flow. you can let Mule create the batch job instance in the Load and Dispatch phase for you, or you can specify a distinct job instance Id through a Mule Expression. This instance exists eternally.

job-instance-id

Batch Job Result

POJO which contains information about the processing results of a batch job instance. You can use it to get information about execution progress.

BatchJobResult

Batch Message Processor

An element in a Mule flow which triggers execution of a batch job.

batch:execute

Batch Phase

Sequentially ordered stages through which batches pass as Mule processes them.

batch:input
batch:process-records
`batch:on-complete `

Batch Step

A child element of the batch job within which multiple message processors act upon records in a batch.

batch:step

Record

A small part of a large message’s payload; a single instance of the result of Mule’s action to split a serialized message payload (i.e. a collection or array) into pieces for processing.

n/a

Block size

Batch records are queued and scheduled in blocks. This element determines the size of the block that will be used for the job’s instances.

block-size

Batch Elements

Studio Palette XML Editor Use

Batch

batch:job

Defines a batch "flow".

Batch Commit

batch:commit

Accumulates records into chunks to prepare bulk upserts to external source or service.

Batch Reference

batch:execute

Set within a Mule flow, triggers start of batch job.

Batch Threading Profile

batch:threading-profile

Configures details regarding threads upon which Mule processes batch jobs.

Record Variable

batch:record-variable-transformer

Sets or removes recordVars on records. Valid only within batch steps inside the Process batch phase.

Elements and Attributes

Element Attribute Value Attribute Req’d Attribute Description

batch:commit

doc:name

unique name for message processor

x

Defines unique identifier for batch commit wrapper.

size

integer

x

Defines number of records to collect before initiating upsert chunk of records to external source or service.

Element Attribute Value Attribute Req’d Attribute Description

batch:execute

name

batch job name

x

Identifies the batch job to execute.

doc:name

unique name for message processor

x

Defines unique identifier for batch reference message processor; can be an expression.

Element Attribute Value Attribute Req’d Attribute Description

batch:input

none

n/a

n/a

Element Attribute Value Attribute Req’d Attribute Description

batch:job

name

unique name for job

x

Defines unique identifier for job.

max-failed-records

0
-1
other integer

0 = tolerates no failures, stops processing batch immediately.
-1= tolerates all failures, never stops processing because of failed record.
integer = defines maximum number of failed records a batch tolerates before stopping processing.

Element Attribute Value Attribute Req’d Attribute Description

batch:on-complete

none

n/a

n/a

Element Attribute Value Attribute Req’d Attribute Description

batch:process-records

none

n/a

n/a

Element Attribute Value Attribute Req’d Attribute Description

batch:remove-record-variable-transformer

doc:name

unique name for message processor

x

Defines unique identifier for batch reference message processor.

variableName

name for record-level variable

x

Identifies record-level variable for removal.

Element Attribute Value Attribute Req’d Attribute Description

batch:set-record-variable-transformer

doc:name

unique name for message processor

x

Defines unique identifier for batch reference message processor.

value

MEL expression

x

Defines value of named variable.

variableName

name for record-level variable

x

Defines unique name for record-level variable.

Element Attribute Value Attribute Req’d Attribute Description

batch:step

name

unique name for step

x

Defines unique identifier for step inside the batch job.

accept-policy

ALL
FAILURES_ONLY
NO_FAILURES

ALL = step processes all records, failed and successful.
FAILURES_ONLY = step processes only records which failed in a preceding step.
NO_FAILURES = step processes only records which succeeded in all preceding steps.

accept-expression

MEL expression

Step processes only those records which, relative to the expression, evaluate to true (evaluate to false = skip record).

Element Attribute Value Attribute Req’d Attribute Description

batch:threading-profile

poolExhaustedAction

WAIT
WAIT
DISCARD
DISCARD_OLDEST
ABORT
RUN

Defines what a batch job should do if all threads are active.
WAIT = (Default) wait until next thread is available
DISCARD = discard waiting batch job
DISCARD_OLDEST = discard the oldest waiting batch job
ABORT = abort processing the batch job
RUN = don’t wait for a thread to become available, run the batch job synchronously

maxThreadsActive

integer

Defines the maximum number of active threads upon which Mule processes batch jobs.

maxThreadsIdle

integer

Defines the minimum number of active threads upon which Mule processes batch jobs.

threadTTL

integer

Defines, in milliseconds, the time a thread should live and remain idle before becoming inactive.

threadWaitTimeout

integer

Defines how long a batch job should wait for a thread to become available before timing out.

maxBufferSize

integer

Defines the size of the "overflow" memory which holds batch jobs while waiting for a thread to become available.

Batch Commit Connectors

Several Anypoint Connectors have the ability to handle record-level errors without failing a whole batch commit (i.e. upsert). At runtime, these connectors keep track of which records were successfully accepted by the target resource, and which failed to upsert. Thus, rather than failing a complete group of records during a commit activity, the connector simply upserts as many records as it can, and tracks any failures for notification. The short – but soon to grow – list of such connectors follows:

  • Salesforce

  • NetSuite

BatchJobResult Processing Statistics

Statistic Description

batchJobInstanceId

A String indicating the id of the executed job instance.

elapsedTimeInMillis

A long indicating the number of milliseconds the batch job spent in executing state.

failedOnCompletePhase

A boolean indicating whether an exception was found on the on the complete phase.

failedOnInputPhase

A boolean indicating whether an exception was found on the on the input phase.

failedOnLoadingPhase

A boolean indicating whether an exception was found on the on the input phase.

failedRecords

A long indicating the number of records that failed processing.

inputPhaseException

If an exception was found in the input phase, then that Exception is returned; otherwise null is returned. Notice that there’s a correlation between this statistic and failedOnInputPhase.

loadedRecords

A long indicating the number of records loaded so far. Once the loading phase is completed, it should be equal to totalRecords.

loadingPhaseException

If an exception was found in the loading phase, then that Exception is returned; otherwise null is returned. Notice that there’s a correlation between this statistic and failedOnLoadingPhase.

onCompletePhaseException

If an exception was found in the on complete phase, then that Exception is returned; otherwise null is returned. Notice that there’s a correlation between this statistic and failedOnCompletePhase.

processedRecords

A long indicating the number of records processed so far. It equals successfulRecords failedRecords, but it could be lower than totalRecords if the job is not finished.

successfulRecords

A long indicating the number of records processed so far.

totalRecords

Total number of records in the batch.

Example

For a full description of the example and steps the batch job takes in each phase of processing, see Batch Processing.

XML Editor

If you copy-paste the code into your instance of Studio, be sure to enter your own values for the the global Salesforce connector:

  • username

  • password

  • security token

How do I get a Salesforce security token?

  1. Log in to your Salesforce account. From your account menu (your account is labeled with your name), select Setup.

  2. In the left navigation bar, under the My Settings heading, click to expand the Personal folder.

  3. Click Reset My Security Token. Salesforce resets the token and emails you the new one.

  4. Access the email that Salesforce sent and copy the new token onto your local clipboard.

  5. In the application in your instance of Anypoint Studio, click the Global Elements tab.

  6. Double-click the Salesforce global element to open its Global Element Properties panel. In the Security Token field, paste the new Salesforce token you copied from the email. Alternatively, configure the global element in the XML Editor.

<mule xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:data-mapper="http://www.mulesoft.org/schema/mule/ee/data-mapper" xmlns:sfdc="http://www.mulesoft.org/schema/mule/sfdc" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans"  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd

http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd

http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd

http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd

http://www.mulesoft.org/schema/mule/ee/data-mapper http://www.mulesoft.org/schema/mule/ee/data-mapper/current/mule-data-mapper.xsd

http://www.mulesoft.org/schema/mule/sfdc http://www.mulesoft.org/schema/mule/sfdc/current/mule-sfdc.xsd">

    <sfdc:config name="Salesforce" username="username" password="password" securityToken="SpBdsf98af9tTR3m3YVcm4Y5q0y0R" doc:name="Salesforce">
        <sfdc:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
    </sfdc:config>

    <data-mapper:config name="new_mapping_grf" transformationGraphPath="new_mapping.grf" doc:name="DataMapper"/>

    <data-mapper:config name="new_mapping_1_grf" transformationGraphPath="new_mapping_1.grf" doc:name="DataMapper"/>

    <data-mapper:config name="leads_grf" transformationGraphPath="leads.grf" doc:name="DataMapper"/>

    <data-mapper:config name="csv_to_lead_grf" transformationGraphPath="csv-to-lead.grf" doc:name="DataMapper"/>

    <batch:job max-failed-records="1000" name="Create Leads" doc:name="Create Leads">
        <batch:threading-profile poolExhaustedAction="WAIT"/>
        <batch:input>
            <file:inbound-endpoint path="src/test/resources/input" moveToDirectory="src/test/resources/output" responseTimeout="10000" doc:name="File"/>
            <data-mapper:transform config-ref="csv_to_lead_grf" doc:name="CSV to Lead"/>
        </batch:input>

        <batch:process-records>
            <batch:step name="lead-check" doc:name="Lead Check">
                <enricher source="#[payload.size() &gt; 0]" target="#[recordVars['exists']]" doc:name="Message Enricher">
                    <sfdc:query config-ref="Salesforce" query="dsql:SELECT Id FROM Lead WHERE Email = '#[payload[&quot;Email&quot;]]'" doc:name="Find Lead"/>
                </enricher>
            </batch:step>
            <batch:step name="insert-lead"  doc:name="Insert Lead" accept-expression="#[recordVars['exists']]">
                <logger message="Got Record #[payload], it exists #[recordVars['exists']]" level="INFO" doc:name="Logger"/>
                <batch:commit size="200" doc:name="Batch Commit">
                    <sfdc:create config-ref="Salesforce" type="Lead" doc:name="Insert Lead">
                        <sfdc:objects ref="#[payload]"/>
                    </sfdc:create>
                </batch:commit>
            </batch:step>
            <batch:step name="log-failures" accept-policy="ONLY_FAILURES" doc:name="Log Failures">
                <logger message="Got Failure #[payload]" level="INFO" doc:name="Log Failure"/>
            </batch:step>
        </batch:process-records>

        <batch:on-complete>
            <logger message="#[payload.loadedRecords] Loaded Records #[payload.failedRecords] Failed Records" level="INFO" doc:name="Log Results"/>
        </batch:on-complete>
    </batch:job>
</mule>

See Also