Contact Us 1-800-596-4880

Creating Message Sources

In some cases it is necessary to create Message Sources instead of Message Processors. Basically, a Message Source receives or generates new messages to be processed by Mule.

On of the use cases of Message Sources is implementing Streaming APIs. The @Source annotation marks a method inside a @Module or @Connector annotated class as callable from a Mule flow and capable of generating Mule events. Each marked method will have a Message Source generated. The method must receive a SourceCallback as one of its arguments that represents the next message processor in the chain. It does not matter the order in which this parameter appears as long it is present in the method signature.

Example

Salesforce Connector supports the Salesforce Streaming API in which users can subscribe to topics and receive notifications when a new event related to that topic happens.

@Source
    public void subscribeTopic(String topic, final SourceCallback callback) {
        getBayeuxClient().subscribe(topic, new ClientSessionChannel.MessageListener() {
            @Override
            public void onMessage(ClientSessionChannel channel, Message message) {
                try {
                    callback.process(message.getData());
                } catch (Exception e) {
                    LOGGER.error(e);
                }
            }
        });
    }

This method can be invoked from Mule as follows:

<flow name="myFlow">
        <sfdc:subscribe-topic topic="/someTopic"/>
        <logger level="INFO" message="#[payload]"/>
        ...
    </flow>

It subscribes to a topic with the given parameter name, and when an update is received it will invoke the next message processor in the chain which in this case is a logger.