USE SFtoDB_Example;
CREATE TABLE contact (
ID INT(11) NOT NULL AUTO_INCREMENT,
email varchar(255) NOT NULL,
first_name varchar(255) DEFAULT NULL,
last_name varchar(255) DEFAULT NULL,
last_modified varchar(255) NOT NULL,
PRIMARY KEY (email)
);
INSERT INTO contacts VALUES (NULL, "leonardmule@mulesoft.com", "Leonard", "Mule", "");
Salesforce to Database Example
Mule Runtime Engine versions 3.5, 3.6, and 3.7 reached End of Life on or before January 25, 2020. For more information, contact your Customer Success Manager to determine how you can migrate to the latest Mule version. |
Enterprise, CloudHub
This application illustrates how to use batch processing to synchronize Salesforce information with a database.
Batch
Batch Processing allows you to split a payload into individual elements to process each individually. This functionality is particularly useful when working with streaming input or when engineering "near real-time" data integration between SaaS applications.
Database Connector
The Database Connector provides a standardized way to access to any JDBC relational database, consistently using one same interface for every case. This connector allows you to run diverse SQL operations on your database, including Select, Insert, Update, Delete, and even Stored Procedures.
Message Enriching
Mule uses Message Enrichers to enrich message payloads with data (i.e. add to the payload), rather than changing payload contents. Mule enriches a message’s payload so that other message processors in the application can access the original payload.
Assumptions
This document assumes that you are familiar with Mule ESB and the Anypoint Studio interface.
This document describes the details of the example within the context of Anypoint Studio, Mule ESB’s graphical user interface (GUI), and includes configuration details for both the visual and XML editors.
Example Use Case
This application queries a Salesforce account for new or updated contacts at a regular interval, then processes the returned payload one record at a time. It checks to see if a contact currently exists in the database, then updates or creates a new contact accordingly. Once the process is complete for the entire batch, a success message is logged.
Although this use case could be met without using batch processing – treating the entire list of contacts returned by Salesforce as a whole – using batch makes this process more reliable as any errors that occur in single record will not propagate beyond record level.
Set Up and Run the Example
Complete the following procedure to create, then run this example in your own instance of Anypoint Studio. You can create template applications straight out of the box in Anypoint Studio and tweak the configurations of this use case-based template to create your own customized applications in Mule.
-
Create the example application in Anypoint Studio.
-
Set up Salesforce credentials:
-
Log in to your Salesforce account. From your account menu (your account is labeled with your name), select Setup.
-
In the left navigation bar, under the Personal Setup heading, click to expand the My Personal Information folder.
-
Click Reset My Security Token. Salesforce resets the token and emails you the new one.
-
Access the email that Salesforce sent and copy the new token onto your local clipboard.
-
In the package explorer, open
src/main/resources/connector.properties
-
Complete the file with your own username, password, and security token.
-
-
Create a Database and set up credentials:
-
Create a new MySQL Database
If you do not have a MySQL database available for your use, you can install MySQL on your local computer. Please visit dev.MySQL.com to download and install a free version. It is a good idea to also install the MySQL workbench. Please also configure a MySQL username and password for use with this project. -
The project requires the following database configuration:
-
MySQL Database Schema: SFtoDB_Example
-
One table: contact
-
Four fields: email, first_name, last_name, last_modified
-
One or more rows of data should be inserted into the table
-
A user that must have read and write access to this data.
-
-
You can execute the following SQL statement to produce this schema and populate one row of data. See the green box below for tips on executing this SQL.
If you are using the MySQL Workbench, first create the schema SFtoDB_Example , and then execute the above SQL statements from within the MySQL Workbench SQL Editor.
Alternatively, you can use the MySQL command-line tool, as follows:
-
Execute this command:
CREATE DATABASE SFtoDB_Example;
. This will create the database schema. -
Save the above SQL code-block to a text file, SQL_for_example.sql
-
Then execute the SQL with this command:
mysql SFtoDB_Example < SQL_for_example.sql
In either case, you may need to assign a user for access to the schema as well. Please see MySQL Documentation for more information on using the MySQL Workbench or the command-line tool.
-
-
In the package explorer, open
src/main/resources/connector.properties
-
Complete the file with your own DB credentials
-
-
In the Package Explorer, right-click the connect-with-salesforce project name, then select Run As > Mule Application. Studio runs the application on the embedded server.
How It Works
Unlike typical Mule projects that are organized into Flows, this project runs a Batch Process. The process is divided into three stages where actions have different scopes:
Stage | Activities |
---|---|
Input |
Polls Salesforce at regular intervals for new contacts. |
Process Records |
Checks if record exists in DB, then updates/creates DB record. |
On Complete |
Logs a success message. |
The Process Records stage is divided into two separate batch steps: the first step checks if the record exists in the DB, the second adds/updates these in the DB. If, while processing a record, the the first step fails, the second step does not process the failed record.
Input
Every 30 minutes, the Poll scope triggers a new request to the Salesforce connector. The Salesforce connector is set to perform the query below, where the timestamp
flow variable is periodically updated to the time of the last iteration of the poll:
SELECT Email,FirstName,LastModifiedDate,LastName FROM Contact WHERE LastModifiedDate > #[flowVars['timestamp']]
The response returned by the Salesforce connector is a list of contacts.
Studio Visual Editor
XML Editor
<batch:input>
<poll doc:name="Poll">
<fixed-frequency-scheduler frequency="30" startDelay="10" timeUnit="MINUTES"/>
<watermark default-expression="#['1900-12-11T14:16:00.000Z']" selector="MAX" selector-expression="#[payload.LastModifiedDate]" variable="timestamp"/>
<sfdc:query config-ref="Salesforce_Configuration" doc:name="Query Salesforce" query="dsql:SELECT Email,FirstName,LastModifiedDate,LastName FROM Contact WHERE LastModifiedDate > #[flowVars['timestamp']]"/>
</poll>
</batch:input>
Process Records
The process records stage of the batch job process the records – each representing a single contact – one at a time. If one of these records fails, the entire task will not fail with it; Mule skips the record, moving on to process the next one.
Batch Step 1
In this step, the DataMapper first renames the fields so that they match those in the database. The Database connector issues the following query to the database:
SELECT first_name,last_name,email FROM contact WHERE email=#[payload.email]
Because the Database connector is inside a message enricher scope, Mule does not overwrite the payload with the response from the database query, rather, it adds the response to the message as an additional variable. Thus, all of the information that had originated from Salesforce is retained and can be passed on to the next step.
The message enricher creates two record variables:
-
dbRecord
: stores the response of the database query -
exists
: indicates whether a contact already exists in the database, according to the response to the query
Studio Visual Editor
XML Editor
<batch:step name="Batch_Step1">
<data-mapper:transform config-ref="contact_to_map" doc:name="Contact To Map"/>
<enricher doc:name="Message Enricher">
<db:select config-ref="MySQL_Configuration" doc:name="Check existence in Database">
<db:parameterized-query><![CDATA[SELECT first_name,last_name,email FROM contact WHERE email=#[payload.email]]]></db:parameterized-query>
</db:select>
<enrich source="#[payload.size() > 0]" target="#[recordVars['exists']]"/>
<enrich source="#[payload]" target="#[recordVars['dbRecord']]"/>
</enricher>
</batch:step>
Batch Step 2
Mule executes the second batch step only if the first step is successful. Depending on the value the message enricher stored in the flowVar exists
(true
- the contact exists; false
- the contact does not exist) a choice router routes the flow to one of the following processing paths:
-
exists =
false
: the contact must be added as a new contact. The following insert query is carried out in the database:
INSERT INTO contact (first_name, last_name, email) VALUES (#[payload.first_name],#[payload.last_name],#[payload.email])
-
exists = true
: Mule populates the recordVardbRecord
. The following update query is carried out in the database:
UPDATE contact SET first_name=#[payload.first_name],last_name=#[payload.last_name] WHERE email = #[payload.email]
-
If neither of these conditions is met, an error has occurred, so Mule logs a message to announce this error.
Studio Visual Editor
XML Editor
<batch:step name="Batch_Stepx">
<choice doc:name="Choice">
<when expression="#[recordVars['exists']==false]">
<db:insert config-ref="MySQL_Configuration" doc:name="Create contact">
<db:parameterized-query><![CDATA[INSERT INTO contact (first_name, last_name, email) VALUES (#[payload.first_name],#[payload.last_name],#[payload.email])]]></db:parameterized-query>
</db:insert>
</when>
<when expression="#[recordVars['exists']==true and recordVars['dbRecord'] != null]">
<db:update config-ref="MySQL_Configuration" doc:name="Update Contact">
<db:parameterized-query><![CDATA[UPDATE contact SET first_name=#[payload.first_name],last_name=#[payload.last_name] WHERE email = #[payload.email]]]></db:parameterized-query>
</db:update>
</when>
<otherwise>
<logger doc:name="Logger" level="INFO" message="Error with #[payload.email] contact"/>
</otherwise>
</choice>
</batch:step>
On Complete
The On Complete
stage of the batch process executes once, after all of the records have been processed, whether successful, failed or skipped. In this case, a logger announces the completion of the task.
Studio Visual Editor
XML Editor
<batch:on-complete>
<logger doc:name="Log completion" level="INFO" message="Batch sf->db has finished"/>
</batch:on-complete>
Complete Code
Studio Visual Editor
XML Editor
<mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:context="http://www.springframework.org/schema/context" xmlns:data-mapper="http://www.mulesoft.org/schema/mule/ee/data-mapper" xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:sap="http://www.mulesoft.org/schema/mule/sap" xmlns:sfdc="http://www.mulesoft.org/schema/mule/sfdc" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/sfdc http://www.mulesoft.org/schema/mule/sfdc/current/mule-sfdc.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/ee/data-mapper http://www.mulesoft.org/schema/mule/ee/data-mapper/current/mule-data-mapper.xsd
http://www.mulesoft.org/schema/mule/sap http://www.mulesoft.org/schema/mule/sap/current/mule-sap.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-current.xsd">
<data-mapper:config doc:name="contact_to_map" name="contact_to_map" transformationGraphPath="contact_to_map.grf"/>
<context:property-placeholder location="connectors.properties"/>
<sfdc:config doc:name="Salesforce" name="Salesforce_Configuration" password="${sfdc.password}" securityToken="${sfdc.securityToken}" username="${sfdc.user}">
<sfdc:connection-pooling-profile exhaustedAction="WHEN_EXHAUSTED_GROW" initialisationPolicy="INITIALISE_ONE"/>
</sfdc:config>
<db:mysql-config database="${mysql.database}" doc:name="MySQL Configuration" host="${mysql.host}" name="MySQL_Configuration" password="${mysql.password}" port="3306" user="${mysql.user}"/>
<batch:job name="salesforce-to-database-Batch1">
<batch:threading-profile poolExhaustedAction="WAIT"/>
<batch:input>
<poll doc:name="Poll">
<fixed-frequency-scheduler frequency="30" startDelay="10" timeUnit="MINUTES"/>
<watermark default-expression="#['1900-12-11T14:16:00.000Z']" selector="MAX" selector-expression="#[payload.LastModifiedDate]" variable="timestamp"/>
<sfdc:query config-ref="Salesforce_Configuration" doc:name="Query Salesforce" query="dsql:SELECT Email,FirstName,LastModifiedDate,LastName FROM Contact WHERE LastModifiedDate > #[flowVars['timestamp']]"/>
</poll>
</batch:input>
<batch:process-records>
<batch:step name="Batch_Step1">
<data-mapper:transform config-ref="contact_to_map" doc:name="Contact To Map"/>
<enricher doc:name="Message Enricher">
<db:select config-ref="MySQL_Configuration" doc:name="Check existence in Database">
<db:parameterized-query><![CDATA[SELECT first_name,last_name,email FROM contact WHERE email=#[payload.email]]]></db:parameterized-query>
</db:select>
<enrich source="#[payload.size() > 0]" target="#[recordVars['exists']]"/>
<enrich source="#[payload]" target="#[recordVars['dbRecord']]"/>
</enricher>
</batch:step>
<batch:step name="Batch_Stepx">
<choice doc:name="Choice">
<when expression="#[recordVars['exists']==false]">
<db:insert config-ref="MySQL_Configuration" doc:name="Create contact">
<db:parameterized-query><![CDATA[INSERT INTO contact (first_name, last_name, email) VALUES (#[payload.first_name],#[payload.last_name],#[payload.email])]]></db:parameterized-query>
</db:insert>
</when>
<when expression="#[recordVars['exists']==true and recordVars['dbRecord'] != null]">
<db:update config-ref="MySQL_Configuration" doc:name="Update Contact">
<db:parameterized-query><![CDATA[UPDATE contact SET first_name=#[payload.first_name],last_name=#[payload.last_name] WHERE email = #[payload.email]]]></db:parameterized-query>
</db:update>
</when>
<otherwise>
<logger doc:name="Logger" level="INFO" message="Error with #[payload.email] contact"/>
</otherwise>
</choice>
</batch:step>
</batch:process-records>
<batch:on-complete>
<logger doc:name="Log completion" level="INFO" message="Batch sf->db has finished"/>
</batch:on-complete>
</batch:job>
</mule>
See Also
-
Learn more about the Database Connector.
-
Understand Batch Processing.
-
Learn more about the Datamapper.
-
Learn about Anypoint Connectors.
-
Understand the Poll Scope.
-
Read more about the Choice Flow Control.
-
Read more about the Message Enricher.
-
Learn more about DataSense Query Language to write queries in Mule connectors which support DSQL.