Contact Us 1-800-596-4880

Salesforce to Database Example

Mule Runtime Engine versions 3.5, 3.6, and 3.7 reached End of Life on or before January 25, 2020. For more information, contact your Customer Success Manager to determine how you can migrate to the latest Mule version.

Enterprise, CloudHub

This application illustrates how to use batch processing to synchronize Salesforce information with a database.

Batch

Batch Processing allows you to split a payload into individual elements to process each individually. This functionality is particularly useful when working with streaming input or when engineering "near real-time" data integration between SaaS applications.

Database Connector

The Database Connector provides a standardized way to access to any JDBC relational database, consistently using one same interface for every case. This connector allows you to run diverse SQL operations on your database, including Select, Insert, Update, Delete, and even Stored Procedures.

Message Enriching

Mule uses Message Enrichers to enrich message payloads with data (i.e. add to the payload), rather than changing payload contents. Mule enriches a message’s payload so that other message processors in the application can access the original payload.

Assumptions

This document assumes that you are familiar with Mule ESB and the Anypoint Studio interface.

This document describes the details of the example within the context of Anypoint Studio, Mule ESB’s graphical user interface (GUI), and includes configuration details for both the visual and XML editors.

Example Use Case

This application queries a Salesforce account for new or updated contacts at a regular interval, then processes the returned payload one record at a time. It checks to see if a contact currently exists in the database, then updates or creates a new contact accordingly. Once the process is complete for the entire batch, a success message is logged.

Although this use case could be met without using batch processing – treating the entire list of contacts returned by Salesforce as a whole – using batch makes this process more reliable as any errors that occur in single record will not propagate beyond record level.

Set Up and Run the Example

Complete the following procedure to create, then run this example in your own instance of Anypoint Studio. You can create template applications straight out of the box in Anypoint Studio and tweak the configurations of this use case-based template to create your own customized applications in Mule.

  1. Create the example application in Anypoint Studio.

  2. Set up Salesforce credentials:

    1. Log in to your Salesforce account. From your account menu (your account is labeled with your name), select Setup.

    2. In the left navigation bar, under the Personal Setup heading, click to expand the My Personal Information folder.

    3. Click Reset My Security Token. Salesforce resets the token and emails you the new one.

    4. Access the email that Salesforce sent and copy the new token onto your local clipboard.

    5. In the package explorer, open src/main/resources/connector.properties

    6. Complete the file with your own username, password, and security token.

  3. Create a Database and set up credentials:

    1. Create a new MySQL Database

      If you do not have a MySQL database available for your use, you can install MySQL on your local computer. Please visit dev.MySQL.com to download and install a free version. It is a good idea to also install the MySQL workbench. Please also configure a MySQL username and password for use with this project.
    2. The project requires the following database configuration:

      • MySQL Database Schema: SFtoDB_Example

      • One table: contact

      • Four fields: email, first_name, last_name, last_modified

      • One or more rows of data should be inserted into the table

      • A user that must have read and write access to this data.

    3. You can execute the following SQL statement to produce this schema and populate one row of data. See the green box below for tips on executing this SQL.

      USE SFtoDB_Example;
      CREATE TABLE contact (
          ID INT(11) NOT NULL AUTO_INCREMENT,
          email varchar(255) NOT NULL,
          first_name varchar(255) DEFAULT NULL,
          last_name varchar(255) DEFAULT NULL,
          last_modified varchar(255) NOT NULL,
      PRIMARY KEY (email)
      );
      INSERT INTO contacts VALUES (NULL, "leonardmule@mulesoft.com", "Leonard", "Mule", "");

      If you are using the MySQL Workbench, first create the schema SFtoDB_Example , and then execute the above SQL statements from within the MySQL Workbench SQL Editor.

      Alternatively, you can use the MySQL command-line tool, as follows:

      1. Execute this command: CREATE DATABASE SFtoDB_Example;. This will create the database schema.

      2. Save the above SQL code-block to a text file, SQL_for_example.sql

      3. Then execute the SQL with this command: mysql SFtoDB_Example < SQL_for_example.sql

      In either case, you may need to assign a user for access to the schema as well. Please see MySQL Documentation for more information on using the MySQL Workbench or the command-line tool.

    4. In the package explorer, open src/main/resources/connector.properties

    5. Complete the file with your own DB credentials

  4. In the Package Explorer, right-click the connect-with-salesforce project name, then select Run As > Mule Application. Studio runs the application on the embedded server.

How It Works

Unlike typical Mule projects that are organized into Flows, this project runs a Batch Process. The process is divided into three stages where actions have different scopes:

Stage Activities

Input

Polls Salesforce at regular intervals for new contacts.

Process Records

Checks if record exists in DB, then updates/creates DB record.

On Complete

Logs a success message.

The Process Records stage is divided into two separate batch steps: the first step checks if the record exists in the DB, the second adds/updates these in the DB. If, while processing a record, the the first step fails, the second step does not process the failed record.

full

Input

Every 30 minutes, the Poll scope triggers a new request to the Salesforce connector. The Salesforce connector is set to perform the query below, where the timestamp flow variable is periodically updated to the time of the last iteration of the poll:

SELECT Email,FirstName,LastModifiedDate,LastName FROM Contact WHERE LastModifiedDate > #[flowVars['timestamp']]

The response returned by the Salesforce connector is a list of contacts.

Studio Visual Editor

input

XML Editor

<batch:input>
    <poll doc:name="Poll">
        <fixed-frequency-scheduler frequency="30" startDelay="10" timeUnit="MINUTES"/>
        <watermark default-expression="#['1900-12-11T14:16:00.000Z']" selector="MAX" selector-expression="#[payload.LastModifiedDate]" variable="timestamp"/>
        <sfdc:query config-ref="Salesforce_Configuration" doc:name="Query Salesforce" query="dsql:SELECT Email,FirstName,LastModifiedDate,LastName FROM Contact WHERE LastModifiedDate > #[flowVars['timestamp']]"/>
    </poll>
</batch:input>

Process Records

The process records stage of the batch job process the records – each representing a single contact – one at a time. If one of these records fails, the entire task will not fail with it; Mule skips the record, moving on to process the next one.

process

Batch Step 1

In this step, the DataMapper first renames the fields so that they match those in the database. The Database connector issues the following query to the database:

SELECT first_name,last_name,email FROM contact WHERE email=#[payload.email]

Because the Database connector is inside a message enricher scope, Mule does not overwrite the payload with the response from the database query, rather, it adds the response to the message as an additional variable. Thus, all of the information that had originated from Salesforce is retained and can be passed on to the next step.

The message enricher creates two record variables:

  • dbRecord: stores the response of the database query

  • exists: indicates whether a contact already exists in the database, according to the response to the query

Studio Visual Editor

step1

XML Editor

<batch:step name="Batch_Step1">
    <data-mapper:transform config-ref="contact_to_map" doc:name="Contact To Map"/>
    <enricher doc:name="Message Enricher">
        <db:select config-ref="MySQL_Configuration" doc:name="Check existence in Database">
            <db:parameterized-query><![CDATA[SELECT first_name,last_name,email FROM contact WHERE email=#[payload.email]]]></db:parameterized-query>
        </db:select>
        <enrich source="#[payload.size() > 0]" target="#[recordVars['exists']]"/>
        <enrich source="#[payload]" target="#[recordVars['dbRecord']]"/>
    </enricher>
</batch:step>

Batch Step 2

Mule executes the second batch step only if the first step is successful. Depending on the value the message enricher stored in the flowVar exists (true - the contact exists; false - the contact does not exist) a choice router routes the flow to one of the following processing paths:

  • exists = false: the contact must be added as a new contact. The following insert query is carried out in the database:

INSERT INTO contact (first_name, last_name, email) VALUES (#[payload.first_name],#[payload.last_name],#[payload.email])
  • exists = true: Mule populates the recordVar dbRecord. The following update query is carried out in the database:

UPDATE contact SET first_name=#[payload.first_name],last_name=#[payload.last_name] WHERE email = #[payload.email]
  • If neither of these conditions is met, an error has occurred, so Mule logs a message to announce this error.

Studio Visual Editor

step2

XML Editor

<batch:step name="Batch_Stepx">
    <choice doc:name="Choice">
        <when expression="#[recordVars['exists']==false]">
            <db:insert config-ref="MySQL_Configuration" doc:name="Create contact">
                <db:parameterized-query><![CDATA[INSERT INTO contact (first_name, last_name, email) VALUES (#[payload.first_name],#[payload.last_name],#[payload.email])]]></db:parameterized-query>
            </db:insert>
        </when>
        <when expression="#[recordVars['exists']==true and recordVars['dbRecord'] != null]">
            <db:update config-ref="MySQL_Configuration" doc:name="Update Contact">
                <db:parameterized-query><![CDATA[UPDATE contact SET first_name=#[payload.first_name],last_name=#[payload.last_name] WHERE email = #[payload.email]]]></db:parameterized-query>
            </db:update>
        </when>
        <otherwise>
            <logger doc:name="Logger" level="INFO" message="Error with #[payload.email] contact"/>
        </otherwise>
    </choice>
</batch:step>

On Complete

The On Complete stage of the batch process executes once, after all of the records have been processed, whether successful, failed or skipped. In this case, a logger announces the completion of the task.

Studio Visual Editor

complete

XML Editor

<batch:on-complete>
    <logger doc:name="Log completion" level="INFO" message="Batch sf->db has finished"/>
</batch:on-complete>

Complete Code

Studio Visual Editor

full

XML Editor

<mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:batch="http://www.mulesoft.org/schema/mule/batch" xmlns:context="http://www.springframework.org/schema/context" xmlns:data-mapper="http://www.mulesoft.org/schema/mule/ee/data-mapper" xmlns:db="http://www.mulesoft.org/schema/mule/db" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:sap="http://www.mulesoft.org/schema/mule/sap" xmlns:sfdc="http://www.mulesoft.org/schema/mule/sfdc" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/sfdc http://www.mulesoft.org/schema/mule/sfdc/current/mule-sfdc.xsd
http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd
http://www.mulesoft.org/schema/mule/db http://www.mulesoft.org/schema/mule/db/current/mule-db.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd
http://www.mulesoft.org/schema/mule/ee/data-mapper http://www.mulesoft.org/schema/mule/ee/data-mapper/current/mule-data-mapper.xsd
http://www.mulesoft.org/schema/mule/sap http://www.mulesoft.org/schema/mule/sap/current/mule-sap.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-current.xsd">


    <data-mapper:config doc:name="contact_to_map" name="contact_to_map" transformationGraphPath="contact_to_map.grf"/>

   <context:property-placeholder location="connectors.properties"/>
    <sfdc:config doc:name="Salesforce" name="Salesforce_Configuration" password="${sfdc.password}" securityToken="${sfdc.securityToken}" username="${sfdc.user}">
        <sfdc:connection-pooling-profile exhaustedAction="WHEN_EXHAUSTED_GROW" initialisationPolicy="INITIALISE_ONE"/>
    </sfdc:config>
    <db:mysql-config database="${mysql.database}" doc:name="MySQL Configuration" host="${mysql.host}" name="MySQL_Configuration" password="${mysql.password}" port="3306" user="${mysql.user}"/>
    <batch:job name="salesforce-to-database-Batch1">
        <batch:threading-profile poolExhaustedAction="WAIT"/>
        <batch:input>
            <poll doc:name="Poll">
                <fixed-frequency-scheduler frequency="30" startDelay="10" timeUnit="MINUTES"/>
                <watermark default-expression="#['1900-12-11T14:16:00.000Z']" selector="MAX" selector-expression="#[payload.LastModifiedDate]" variable="timestamp"/>
                <sfdc:query config-ref="Salesforce_Configuration" doc:name="Query Salesforce" query="dsql:SELECT Email,FirstName,LastModifiedDate,LastName FROM Contact WHERE LastModifiedDate > #[flowVars['timestamp']]"/>
            </poll>
        </batch:input>
        <batch:process-records>
            <batch:step name="Batch_Step1">
                <data-mapper:transform config-ref="contact_to_map" doc:name="Contact To Map"/>
                <enricher doc:name="Message Enricher">
                    <db:select config-ref="MySQL_Configuration" doc:name="Check existence in Database">
                        <db:parameterized-query><![CDATA[SELECT first_name,last_name,email FROM contact WHERE email=#[payload.email]]]></db:parameterized-query>
                    </db:select>
                    <enrich source="#[payload.size() > 0]" target="#[recordVars['exists']]"/>
                    <enrich source="#[payload]" target="#[recordVars['dbRecord']]"/>
                </enricher>
            </batch:step>
            <batch:step name="Batch_Stepx">
                <choice doc:name="Choice">
                    <when expression="#[recordVars['exists']==false]">
                        <db:insert config-ref="MySQL_Configuration" doc:name="Create contact">
                            <db:parameterized-query><![CDATA[INSERT INTO contact (first_name, last_name, email) VALUES (#[payload.first_name],#[payload.last_name],#[payload.email])]]></db:parameterized-query>
                        </db:insert>
                    </when>
                    <when expression="#[recordVars['exists']==true and recordVars['dbRecord'] != null]">
                        <db:update config-ref="MySQL_Configuration" doc:name="Update Contact">
                            <db:parameterized-query><![CDATA[UPDATE contact SET first_name=#[payload.first_name],last_name=#[payload.last_name] WHERE email = #[payload.email]]]></db:parameterized-query>
                        </db:update>
                    </when>
                    <otherwise>
                        <logger doc:name="Logger" level="INFO" message="Error with #[payload.email] contact"/>
                    </otherwise>
                </choice>
            </batch:step>
        </batch:process-records>
        <batch:on-complete>
            <logger doc:name="Log completion" level="INFO" message="Batch sf->db has finished"/>
        </batch:on-complete>
    </batch:job>

</mule>

See Also