If you have been following some of my recent blogs you will have noticed that I managed to get:
- Mule running on a raspberry pi
- A bash script to turn on an LED on a raspberry pi
- Mule executing the above bash script
I thought I’d bring all these concepts together and see what I could come up with. The result? Using Twitter, I can send a hashtag command and control the LED on my raspberry pi via the open source ESB platform Mulesoft.
This demo brings together a lot of integration concepts and technologies and expands on the concept of Integration in a Box, the Internet of Things and also APIs.
Firstly, the LED can be replaced with a relay. This allows it to control lights in your how. It the relay can also control servo motor to open curtains or sunroofs or even to switch on your air con or TV.
Secondly, from a different angle, the Raspberry Pi + Mule combination can be thought of the API layer between a user’s mobile device and the device they are wanting to control.
I grabbed some inspiration from the polling and watermark training video from Mulesoft and this blog https://www.appnovation.com/blog/polling-and-watermarking-mule from Ryan Carter expanding on this concept in relation to Twitter.
How it works?
You know the saying about pictures and 1000 words so I’ll walk through the flow above.
- Poll my Twitter account (@cloudnthings) making use of the watermark feature so only new tweets are obtained.
- Log the maxId for reference.
- Put the tweets into my own variable
- Loop through each tweet and for each individual tweet
- Assign the actual tweet itself to a variable
- Log the tweet for reference.
- Search the tweet for the key hashtag (#on or #off)
- Execute the associated bash script via the Groovy component or do nothing
- Catch any errors via a reference exception strategy.
The XML source code is provided at the bottom in case anyone is interested.
See it in action!
The integration searches for the hash tag #on and #off. You’ll notice that I have Mule polling every 2 sec for demo purposes. This is not recommended as you’ll hit a 429 limit exceeded after a few minutes. (https://dev.twitter.com/rest/public/rate-limits)
You’ll also see that after I hit send at around the 12s mark, it takes ~14s for the LED to turn on at the 36s mark.
Tips and Tricks and Commentary
The payload has maxId and sinceId which can be used for watermarking. I couldn’t get the watermark working using update-expression=”#[payload.sinceId]” as in the example blog. It took a bit of time but it finally worked with update-expression=”#[payload.maxId]”. Maybe the API has been updated or changed slightly. Not sure.
The way watermarking works is that each tweet has a unique id. Mule will remember the latest one via maxId and in subsequent polls, it will add since_id=maxId which prevents already processed tweets from surfacing. This is a very neat feature.
My sinceId was always zero so I ended up assigning my sinceId the value of maxId. This made it work.
<twitter:search config-ref="Twitter" query="@cloudnthings" sinceId="#[flowVars.max_id]" doc:name="Twitter"/>
Out of interest, when I used update-expression=”#[payload.sinceId]”, upon inspecting the payload, the nextResults string was incorrect hence I kept getting the same set of 15 tweets over and over.
Accessing The Tweet Payload
The Twitter payload is interesting and I struggled with this at first. To access the Twitter text from the search operation the syntax is:
However, had I used a different Twitter operation such as “Get user timeline by screen name” then the syntax would be:
payload.accessLevel or payload.text
I was interested in using regular expressions within the choice filter but couldn’t get it working so decided to stick to using the contains keyword:
#[flowVars.tweet contains '#on']
If I do figure out how to use regex in the choice filter (no the regex filter itself) I’ll update this write up.
It would be good if the documentation (http://mulesoft.github.io/twitter-connector/3.2.0/mule/twitter-config.html) specified how many results are returned from the search operation or if there is a limit on the result set.
One thing to note is that if the Groovy component throws an exception and your exception strategy handles this, the watermark id will be lost and you’ll end up with:
ERROR 2015-10-09 15:27:51,593 [pool-15-thread-1] org.mule.transport.polling.watermark.Watermark: Exception found updating watermark org.mule.api.expression.ExpressionRuntimeException: Execution of the expression "payload.maxId" failed.
As you can see in the video, I set the polling frequency to 2 seconds and after sending the tweet, it took ~14 seconds for the message to be picked up by Mule and for the LED to turn on.
I achieved “Integration in a Box” so to speak and will upgrade to a wifi dongle soon to increase portability. My Raspberry Pi with Mule combined is my interface (ie my API if you will) between my smartphone and any other device that I connect my Pi to.
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking" xmlns:json="http://www.mulesoft.org/schema/mule/json" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:twitter="http://www.mulesoft.org/schema/mule/twitter" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:spring="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/twitter http://www.mulesoft.org/schema/mule/twitter/current/mule-twitter.xsd http://www.mulesoft.org/schema/mule/json http://www.mulesoft.org/schema/mule/json/current/mule-json.xsd http://www.mulesoft.org/schema/mule/ee/tracking http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd"> <http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/> <twitter:config name="Twitter" accessKey="24947988-INlVUIgQZopAMhbaiNXBfhRyEbwsvLz4JCe10Sgs" accessSecret="KEHCcbFIQnvqMlrmL29oAXE8HHvS1plNV0WsH5Hrwc" consumerKey="xxxx" consumerSecret="xxxx" doc:name="Twitter"/> <flow name="twitterFlow" processingStrategy="synchronous"> <poll doc:name="Poll"> <fixed-frequency-scheduler frequency="0" timeUnit="SECONDS"/> <watermark variable="max_id" default-expression="0" update-expression="#[payload.maxId]"/> <twitter:search config-ref="Twitter" query="@cloudnthings" sinceId="#[flowVars.max_id]" doc:name="Twitter"/> </poll> <logger message="#['maxId is: ' + payload.maxId]" level="INFO" doc:name="Twitter maxId"/> <set-variable variableName="array_of_tweets" value="#[payload.tweets]" doc:name="Set Tweet Array Variable"/> <foreach collection="#[flowVars.array_of_tweets]" doc:name="For Each"> <set-variable variableName="tweet" value="#[payload.text]" doc:name="Single tweet"/> <logger message="#[flowVars.tweet]" level="INFO" doc:name="Print out tweet"/> <choice doc:name="Choice"> <when expression="#[flowVars.tweet contains '#on']"> <scripting:component doc:name="turnon.sh"> <scripting:script engine="Groovy"><![CDATA["./turnon.sh".execute()]]></scripting:script> </scripting:component> <logger message="On command found. Script executed." level="INFO" doc:name="Log: found on"/> </when> <when expression="#[flowVars.tweet contains '#off']"> <scripting:component doc:name="turnoff.sh"> <scripting:script engine="Groovy"><![CDATA["./turnoff.sh".execute()]]></scripting:script> </scripting:component> <logger level="INFO" doc:name="Log: found off" message="Off command found. Script executed."/> </when> <otherwise> <logger message="No command found. Going back to sleep." level="INFO" doc:name="Sleep"/> </otherwise> </choice> </foreach> <exception-strategy ref="globalChoice_Exception_Strategy" doc:name="Reference Exception Strategy"/> </flow> </mule>