<mule xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:data-mapper="http://www.mulesoft.org/schema/mule/ee/data-mapper" xmlns:sfdc="http://www.mulesoft.org/schema/mule/sfdc" xmlns:file="http://www.mulesoft.org/schema/mule/file" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
http://www.mulesoft.org/schema/mule/ee/data-mapper http://www.mulesoft.org/schema/mule/ee/data-mapper/current/mule-data-mapper.xsd
http://www.mulesoft.org/schema/mule/sfdc http://www.mulesoft.org/schema/mule/sfdc/current/mule-sfdc.xsd">
<sfdc:config name="Salesforce" username="username" password="password" securityToken="SpBdsf98af9tTR3m3YVcm4Y5q0y0R" doc:name="Salesforce">
<sfdc:connection-pooling-profile initialisationPolicy="INITIALISE_ONE" exhaustedAction="WHEN_EXHAUSTED_GROW"/>
</sfdc:config>
<data-mapper:config name="new_mapping_grf" transformationGraphPath="new_mapping.grf" doc:name="DataMapper"/>
<data-mapper:config name="new_mapping_1_grf" transformationGraphPath="new_mapping_1.grf" doc:name="DataMapper"/>
<data-mapper:config name="leads_grf" transformationGraphPath="leads.grf" doc:name="DataMapper"/>
<data-mapper:config name="csv_to_lead_grf" transformationGraphPath="csv-to-lead.grf" doc:name="DataMapper"/>
<batch:job max-failed-records="1000" name="Create Leads" doc:name="Create Leads">
<batch:threading-profile poolExhaustedAction="WAIT"/>
<batch:input>
<file:inbound-endpoint path="src/test/resources/input" moveToDirectory="src/test/resources/output" responseTimeout="10000" doc:name="File"/>
<data-mapper:transform config-ref="csv_to_lead_grf" doc:name="CSV to Lead"/>
</batch:input>
<batch:process-records>
<batch:step name="lead-check" doc:name="Lead Check">
<enricher source="#[payload.size() > 0]" target="#[recordVars['exists']]" doc:name="Message Enricher">
<sfdc:query config-ref="Salesforce" query="dsql:SELECT Id FROM Lead WHERE Email = '#[payload["Email"]]'" doc:name="Find Lead"/>
</enricher>
</batch:step>
<batch:step name="insert-lead" doc:name="Insert Lead" accept-expression="#[recordVars['exists']]">
<logger message="Got Record #[payload], it exists #[recordVars['exists']]" level="INFO" doc:name="Logger"/>
<batch:commit size="200" doc:name="Batch Commit">
<sfdc:create config-ref="Salesforce" type="Lead" doc:name="Insert Lead">
<sfdc:objects ref="#[payload]"/>
</sfdc:create>
</batch:commit>
</batch:step>
<batch:step name="log-failures" accept-policy="ONLY_FAILURES" doc:name="Log Failures">
<logger message="Got Failure #[payload]" level="INFO" doc:name="Log Failure"/>
</batch:step>
</batch:process-records>
<batch:on-complete>
<logger message="#[payload.loadedRecords] Loaded Records #[payload.failedRecords] Failed Records" level="INFO" doc:name="Log Results"/>
</batch:on-complete>
</batch:job>
</mule>
Batch Processing Reference
Mule runtime engine version 3.8 reached its End of Life on November 16, 2021. For more information, contact your Customer Success Manager to determine how to migrate to the latest Mule version. |
Enterprise, CloudHub
Terminology
Term | Description | Element |
---|---|---|
Batch |
A group of records which Mule processes individually, by record. |
n/a |
Batch Commit |
A scope which accumulates records into chunks to prepare bulk upserts to external source or service. |
|
Batch Job |
The top-level element in an application in which Mule processes a message payload as a batch of records. The term batch job is inclusive of all four phases of processing: Input, Load and Dispatch, Process, and On Complete. |
|
Batch Job Instance |
An occurrence in a Mule application resulting from the execution of a batch job in a Mule flow. you can let Mule create the batch job instance in the Load and Dispatch phase for you, or you can specify a distinct job instance Id through a Mule Expression. This instance exists eternally. |
|
Batch Job Result |
POJO which contains information about the processing results of a batch job instance. You can use it to get information about execution progress. |
|
Batch Message Processor |
An element in a Mule flow which triggers execution of a batch job. |
|
Batch Phase |
Sequentially ordered stages through which batches pass as Mule processes them. |
|
Batch Step |
A child element of the batch job within which multiple message processors act upon records in a batch. |
|
Record |
A small part of a large message’s payload; a single instance of the result of Mule’s action to split a serialized message payload (i.e. a collection or array) into pieces for processing. |
n/a |
Block size |
Batch records are queued and scheduled in blocks. This element determines the size of the block that will be used for the job’s instances. |
|
Batch Elements
Studio Palette | XML Editor | Use |
---|---|---|
Batch |
|
Defines a batch "flow". |
Batch Commit |
|
Accumulates records into chunks to prepare bulk upserts to external source or service. |
Batch Reference |
|
Set within a Mule flow, triggers start of batch job. |
Batch Threading Profile |
|
Configures details regarding threads upon which Mule processes batch jobs. |
Record Variable |
|
Sets or removes |
Elements and Attributes
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
unique name for message processor |
x |
Defines unique identifier for batch commit wrapper. |
|
integer |
x |
Defines number of records to collect before initiating upsert chunk of records to external source or service. |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
batch job name |
x |
Identifies the batch job to execute. |
|
unique name for message processor |
x |
Defines unique identifier for batch reference message processor; can be an expression. |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
n/a |
n/a |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
unique name for job |
x |
Defines unique identifier for job. |
|
|
|
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
n/a |
n/a |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
n/a |
n/a |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
unique name for message processor |
x |
Defines unique identifier for batch reference message processor. |
|
name for record-level variable |
x |
Identifies record-level variable for removal. |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
unique name for message processor |
x |
Defines unique identifier for batch reference message processor. |
|
MEL expression |
x |
Defines value of named variable. |
|
|
name for record-level variable |
x |
Defines unique name for record-level variable. |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
unique name for step |
x |
Defines unique identifier for step inside the batch job. |
|
ALL |
ALL = step processes all records, failed and successful. |
||
|
MEL expression |
Step processes only those records which, relative to the expression, evaluate to true (evaluate to false = skip record). |
Element | Attribute | Value | Attribute Req’d | Attribute Description |
---|---|---|---|---|
|
|
WAIT |
Defines what a batch job should do if all threads are active. |
|
|
integer |
Defines the maximum number of active threads upon which Mule processes batch jobs. |
||
|
integer |
Defines the minimum number of active threads upon which Mule processes batch jobs. |
||
|
integer |
Defines, in milliseconds, the time a thread should live and remain idle before becoming inactive. |
||
|
integer |
Defines how long a batch job should wait for a thread to become available before timing out. |
||
|
integer |
Defines the size of the "overflow" memory which holds batch jobs while waiting for a thread to become available. |
Batch Commit Connectors
Several Anypoint Connectors have the ability to handle record-level errors without failing a whole batch commit (i.e. upsert). At runtime, these connectors keep track of which records were successfully accepted by the target resource, and which failed to upsert. Thus, rather than failing a complete group of records during a commit activity, the connector simply upserts as many records as it can, and tracks any failures for notification. The short – but soon to grow – list of such connectors follows:
-
Salesforce
-
NetSuite
BatchJobResult Processing Statistics
Statistic | Description |
---|---|
|
A String indicating the id of the executed job instance. |
|
A long indicating the number of milliseconds the batch job spent in executing state. |
|
A boolean indicating whether an exception was found on the on the complete phase. |
|
A boolean indicating whether an exception was found on the on the input phase. |
|
A boolean indicating whether an exception was found on the on the input phase. |
|
A long indicating the number of records that failed processing. |
|
If an exception was found in the input phase, then that Exception is returned; otherwise |
|
A long indicating the number of records loaded so far. Once the loading phase is completed, it should be equal to totalRecords. |
|
If an exception was found in the loading phase, then that Exception is returned; otherwise |
|
If an exception was found in the on complete phase, then that Exception is returned; otherwise |
|
A long indicating the number of records processed so far. It equals successfulRecords failedRecords, but it could be lower than totalRecords if the job is not finished. |
|
A long indicating the number of records processed so far. |
|
Total number of records in the batch. |
Example
For a full description of the example and steps the batch job takes in each phase of processing, see Batch Processing. |
XML Editor
If you copy-paste the code into your instance of Studio, be sure to enter your own values for the the global Salesforce connector:
How do I get a Salesforce security token?
|
See Also
-
Learn more about filters in batch processing.
-
Learn more about batch commit.
-
Learn more about setting and removing record-level variables.
-
Learn MEL expressions you can use in Batch jobs to simplify error handling
-
Review the basic anatomy of batch processing in Mule.