Rather than polling a resource for all its data with every call, you may want to acquire only the data that has been newly created or updated since the last call. To acquire only new or updated data, you need to keep a persistent record of either the item that was last processed, or the time at which your flow last polled the resource. In the context of Mule flows, this persistent record is called a watermark.
Typically, Mule sets a watermark to a default value the first time the flow runs, then uses it as necessary when running a query or making an outbound request (that is, calling a resource). Depending upon how the flow processes the results of the call, Mule may update the original value of the watermark or maintain the original value. As the value must persist across flows, Mule uses an object store for persistent storage. Built into the poll scope, object stores require no custom logic. You can configure watermarks by setting a couple of attributes.
Consider the following generic Mule flow.
This flow regularly polls a resource, then performs a series of operations on the resulting payload. With every poll, the application acquires only the data that is newly created or updated since the last call to the resource. In this example, Mule stores watermarks in two variables:
|
If you’re already comfortably familiar with Mule components in general, you might find this blog post to be a clear explanation, as it explains the watermark by replicating its behavior with a series of other Mule components.
|
The diagram below illustrates same flow including numbered steps. The step-by-step explanation below describes the activities Mule performs in the background with these two variables.
-
Mule looks for a variable in the object store with a name that matches the value of the Poll attribute "Variable Name"
. In this case the chosen name is lastModifiedID
.
-
If Mule finds a variable by this name, Mule exposes it by creating a flow variable (flowVar
) with the same name.
|
The first time the poll runs, no object store variable exists by this name. In this case, Mule creates a flow variable anyway, and loads it with the value you provide in the Default Expression attribute. In this case, the initial value is 0.
|
-
Mule polls the resource. Connectors inside the poll should include filters that accept the flowVars
as an attribute, as per the code below.
sinceId="#[flowVars['lastModifiedID']]"
-
Mule executes the rest of the flow.
-
When the flow has completed execution, Mule updates the value of the flowVars according to either the Update Expression
or a combination of the Selector Expression
and the chosen Selector
. In this case, the Selector Expression is #[payload.id]
, and the Selector is LAST
, so Mule inspects the ID attribute of each of the returned objects and picks the last of these as the new value for the lastmodifiedID
flowVars.
-
Mule saves the flowVars back into the object store. If no variable existed in the object store in step 1, Mule creates a new variable in the object store.
|
If you define a value in the optional “Object Store” poll attribute, Mule searches for an object store by your value instead of the default user object store.
|
List of Watermark Attributes
Attributes |
XML element |
Req’d? |
Default |
Description |
Variable Name |
variable
|
x |
- |
Identifies both the object store key that Mule uses to store the watermark, and the name of the flowVars where Mule exposes the watermark value to the user. |
Default Expression |
default-expression
|
x |
- |
If Mule cannot locate the object store key it uses the default expression to generate a value. This is useful for the first run of the flow. |
Update Expression |
update-expression
|
|
Value of the variable attribute. |
Mule uses the result of this expression to update the watermark once flow execution is complete. Use this expression as an alternative to a selector in case you need to follow a more complex logic. |
Selector |
selector
|
|
- |
The criteria Mule uses to pick the next value for the flowVars. There are four available selectors: MIN, MAX, FIRST, and LAST.
If you use this attribute, you must also provide a value for Selector Expression.
|
Selector Expression |
selector-expression
|
|
- |
Mule executes this expression on every object returned by the Poll. The Selector then collects the returned values and picks one according to the chosen criteria.
If you use this attribute, you must also provide a value for the Selector. |
Object Store |
object-store-ref
|
|
The default user object store. |
A reference to the object store in which you wish to store the watermarks. |
Configuring Polling with Watermarks
Studio Visual Editor
-
Follow the steps above to create a flow that polls Twitter for data every 1000 milliseconds, then logs the message payload.
-
Click to flow name bar to select the flow, then, in the properties editor, set the Processing Strategy to synchronous.
|
All flows use an asynchronous processing strategy by default. If you do not set the processing strategy to synchronous, polling with watermarks does not work!
|
-
Configure the Since Id attribute of the Twitter connector according to the table below.
Attribute |
Value |
Description |
Since Id |
#[flowVars['lastID']]
|
Instructs the connector to return only those tweets with an ID greater than the value of the lastID variable. lastID is a flow variable that Mule creates, then updates every time the poll runs. |
-
Select the poll scope, then edit its properties according to the table below.
Attribute |
Value |
Description |
XML |
Fixed Frequency Scheduler |
1000 |
Run the Poll every 1000 milliseconds. |
|
Start Delay |
0 |
Delays polling by 0 milliseconds |
|
Time Unit |
MILLISECONDS |
Use milliseconds as unit for the frequency and delay settings |
|
Enable Watermark |
true |
Enable using the Watermark |
|
Variable Name |
lastID
|
Mule creates two variables:
• A persistent object store variable with the provided name
• A flow variable that the Twitter Connector references in its sinceID filter. |
|
Default Expression |
-1 |
The value that lastID uses the first time Mule executes the poll, or whenever the watermark can’t be found. |
|
Selector |
FIRST |
Pick the FIRST value returned by the Selector Expression to update the lastID variable each time the flow execution completes. In this case, it takes the ID of the first tweet in the generated output (that is, the most recent one). |
|
Selector expression |
#[payload.id]
|
Return the ID of each object in the generated output, this value is passed on to the Selector. |
selector-expression="#[payload.id]"
|
Update Expression |
- |
Not needed. Selector and Selector Expression are being used. |
|
XML Editor or Standalone
-
Follow the steps above to create a flow that polls Twitter for data every 1000 milliseconds, then logs the message payload.
-
In the flow, set the value of the processingStrategy
attribute to synchronous
.
|
All flows use an asynchronous processing strategy by default. If you do not set the processing strategy to synchronous, polling with watermarks does not work!
|
<flow name="test1" doc:name="test1" processingStrategy="synchronous">
-
Within the poll
scope, add a watermark
child element according to the table below.
Element |
Description |
Sample XML |
watermark |
Keeps a persistent record of the last element that was processed, or the last time a sync was performed |
`<watermark variable="lastID" default-expression="-1" selector="FIRST" selector-expression="#[payload.id]"/> ` |
-
Add attributes to the watermark
child element according to the table below.
<watermark variable="lastID" default-expression="-1" selector="FIRST" selector-expression="#[payload.id]"/>
Attribute name |
Value |
Description |
Sample XML |
variable |
string |
Mule creates two variables: • A persistent object store variable with the provided name • A flow variable that the Twitter Connector references in its sinceID filter. |
|
default-expression |
integer |
The value that lastID uses the first time Mule executes the poll, or whenever the watermark can’t be found. |
default -expression= "-1"
|
Selector |
FIRST |
Pick the FIRST value returned by the Selector Expression to update the lastID variable each time the flow execution completes. In this case, it’s the ID of the first tweet in the generated output (that is, the most recent one). |
|
Selector expression |
#[payload.id]
|
Return the ID of each object in the generated output, this value is passed on to the Selector. |
selector-expression="#[payload.id]"
|
-
Configure the Since Id attribute of the Twitter connector according to the table below.
Attribute |
Value |
Description |
Sample XML |
sinceId |
string or Mule expression |
Instructs the connector to return only those tweets with an ID greater than the value of the lastID variable. lastID is a flow variable that Mule creates, then updates every time the poll runs. |
sinceId="#[flowVars['lastID']]"
|
<flow name="test1" doc:name="test1" processingStrategy="synchronous">
<poll frequency="1000" doc:name="Poll">
<watermark variable="lastID" default-expression="-1" selector="FIRST" selector-expression="#[payload.id]"/>
<twitter:get-user-timeline-by-screen-name config-ref="Twitter" doc:name="Twitter" screenName="MuleSoft" sinceId="#[flowVars['lastID']]"/>
</poll>
<logger message="#[payload]" level="INFO" doc:name="Logger"/>
</flow>