We are going to write the simplest possible Python program to process data with Apache Storm. We will take this quick start example from Apache Storm and write another version of that. The goal is that our explanation here is simpler to understand than the Apache Storm one. Or you can use this one to help understand the other one.
We are going to use the StreamParser Python API to write a program to create a spout and bolt to process streaming data. Read our Storm overview to get an understanding of the basic Storm concepts.
Note: Storm is designed to work with streaming data. But we do not have any streaming data as input yet, so we will just create some random values and a continuous loop and process that. The whole concept of streaming data needs further explanation, so we will example how to put Apache Kafka in front of Storm in another post.
Our program is composed of two simple parts. These two parts are the minimum you need to use Storm:
spout:read the temperature in Celsius from a few cities.
bolt: output that and add the temperature in Fahrenheit as another value in the tuple.
We tie the spout and bolt together as a topology, meaning explicitly define which Python class is out spout and which is out bolt.
Then we hand over all of this to StreamParse which will submit the topology to Zookeeper, Nimbus, and Supervisor without us having to worry about any of the details of that.
[See Also: Improving UX Design by Using Simulation Tools like InVision and Flinto]
Requirements
- You need a working Apache Storm system. You can follow our overview to get an understanding of what it is then install Zookeeper and Storm following these instructions from Apache Storm. It is not necessary that you start Nimbus, Zookeeper, nor the Supervisor as the StreamParse API will do all that.
- Python and Pip
- Install the StreamParse API. Just type:
pip install streamparse - Install Leiningen. Do not use apt-get install anything as that version (for Ubuntu) is too old. Instead copy this source file exactly and rename it “lein” and put it in your path. Then run it. After it is installed you can type this to test it:lein version
- Now, run these 3 commands to set up the WordCount example.You can go to these instructions to understand what that does.sparse quickstart wordcount cd wordcountsparse runSo, here is what quickstart wordcount did. It created several files and the directory structure that you need. Here are the key files it created:wordcount/topologies/wordcount.clj wordcount/src/spouts/words.pywordcount/src/bolts/wordcount.py
- We will use the same directory structure for our sample here, as the StreamParse API requires that.
As you can see, words.py is the spout and wordcount.py is the bolt. Wordcount.clj defines the topology, meaning it says which spout and which bolt to run. Wordcount.clj is written in the Clojure programming language, which the StreamParse project says is easier to use that installing a bunch of software that you would need to define the topology using Python. - Now, let’s go through the pieces.
- Topology: topologies/weather.clj:
- It is pretty easy to follow the wordcount/topologies/wordcount.clj example to make your own topology file. Below we highlight some items.
- spouts.weatherSpout.WeatherSpout is the directory, folder, and Python class name.
- Weather,weather-spout, and weather-boltcount-bolt names do not matter. They can be anything.
- [“city” “temp” ] and [“city” “tempC” “tempF”] are lists of tuples. Note: the first time we worked through this example we make the mistake of using tuples and not lists and got not implemented error messages that were hard to track. The API documentation spells out the argument types for the functions used in the Python code below.
- (ns weather
- (:use [streamparse.specs])
- (:gen-class))
- (defn weather [options]
- [
- ;; spout configuration
- {“weather-spout” (python-spout-spec
- options
- “spouts.weatherSpout.WeatherSpout”
- [“city” “temp” ]
- )
- }
- ;; bolt configuration
- {“weather-boltcount-bolt” (python-bolt-spec
- options
- {“weather-spout” :shuffle}
- “bolts.weatherBolt.WeatherBolt”
- [“city” “tempC” “tempF”]
- )
- }
- ]
- )
- Spout: src/spouts/weatherSpout.py:
- The important thing to note here is that after the initialize method, Storm will call the next_tuple method forever, or until you control-C or otherwise cancel the program. That is what makes this suitable for working with streaming data, meaning data that never stops coming in.
- You use emit to add the list of tuples to Storm which the bolt will then consume:
- import urllib2
- import json
- from streamparse.spout import Spout
- import itertools
- from random import randint
- class WeatherSpout(Spout):
- def initialize(self, stormconf, context):
- self.cities= (
- [“Berlin”, 21],
- [“New_York”, 22],
- [“Paris”, 23],
- [“Bogota”, 24],
- )
- def next_tuple(self):
- i = randint(0,3)
- citytemp = self.cities[i]
- city = citytemp[0]
- temp = citytemp[1]
- self.emit([city, temp])
- self.log(‘SPOUT %s: %s’ % (city, temp))
- [See Also: Leveraging Swagger to Manage and Harmonize Rest APIs]
- Bolt: src/bolts/weatherBolt.py:
- This takes the list of tuples created by the weatherSpout.py spout and outputs another list of tuples. So it emulates a continuously running process, like reading streaming data, such a Tweets, and then running analytics on that or handing it off to some kind of storage, such as Hadoop.
- import re
- from streamparse.bolt import Bolt
- class WeatherBolt(Bolt):
- def process(self, tup):
- city = tup.values[0]
- tempC = tup.values[1]
- tempF = (int(tempC) * 1.8) + 32
- self.emit([city, tempC, tempF])
- self.log(‘BOLT city %s: C=%s F=%s’ % (city, tempC, tempF))
Run it
- Now, When you run:
- sparse run -n weather
- You should see this
So that is a basic example of how to use StreamParse to write an Apache Storm spout, bolt, and topology. You could, of course, use Java, or you can use Ruby or other languages if you investigate APIs for those.
If you get anything wrong, the stack trace it generates is helpful in solving most errors. Other than that just go back and compare your code to this code and the StreamParse to see where you have made a mistake.
Everything you need to know about outsourcing technology development
Access a special Introduction Package with everything you want to know about outsourcing your technology development. How should you evaluate a partner? What components of your solution that are suitable to be handed off to a partner? These answers and more below.