Go Back   CORTEX Forums > Best Practices > Subject Matter Expertise > Data Integration Forum > Data Integration News Feeds
Register Blogs FAQ Members List Calendar Search Today's Posts Mark Forums Read

Real-time streaming data aggregation

This is a discussion on Real-time streaming data aggregation within the Data Integration News Feeds forums, part of the Data Integration Forum category; Dear Kettle users, Most of you usually use a data integration engine to process data in a batch-oriented way.* Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, ...


Reply
 
LinkBack Thread Tools Search this Thread Display Modes
Old 28th July 2011, 08:45 PM   #1
News Bot
 
Join Date: Nov 2007
Posts: 15,067
Latest News Headlines is on a distinguished road
Post Real-time streaming data aggregation

Dear Kettle users,

Most of you usually use a data integration engine to process data in a batch-oriented way.* Pentaho Data Integration (Kettle) is typically deployed to run monthly, nightly, hourly workloads.* Sometimes folks run micro-batches of work every minute or so.* However, it’s lesser known that our beloved transformation engine can also be used to stream data indefinitely (never ending) from a source to a target.* This sort of data integration is sometimes referred to as being “streaming“, “real-time“, “near real-time“, “continuous” and so on.* Typical examples of situations where you have a never-ending supply of data that needs to be processed the instance it becomes available are JMS (Java Message Service), RDBMS log sniffing, on-line fraud analyses, web or application log sniffing or of-course … Twitter!* Since Twitter is easily accessed it’s common for examples to pop up regarding it’s usage and in this blog post too we will use this service to demo the Pentaho Data Integration capabilities wrt to processing streaming data.

Here’s what we want to do:
  1. Continuously read all the tweets that are being sent on Twitter.
  2. Extract all the hash-tags used
  3. Count the number of hash-tags used in a one-minute time-window
  4. Report on all the tags that are being used more than once
  5. Put the output in a browser window, continuously update every minute.
This is a very generic example but the logic of this can be applied to different fields like JMS, HL7, log sniffing and so on.* It differs from the excellent work that Vincent from Open-BI described earlier this week on his blog in the sense that his Talend job finishes where ours will never end and where ours will do time-based aggregation in contrast to aggregation over a finite data set.

Also note that in order for Kettle to fully support multiple streaming data sources we would have to implement support for “windowed” (time-based) joins and other nifty things.* We’ve seen very little demand for this sort of requirement in the past, perhaps because people don’t know it’s possible with Kettle.* In any case, if you currently are in need of full streaming data support, have a look at SQLStream, they can help you. SQLStream is co-founded by Pentaho’s Julian Hyde of Mondrian fame.

OK, let’s see how we can solve our little problem with Kettle instead…

1. Continuously read all the tweets that are being sent on Twitter.

For this we are going to use one of the public Twitter web services, one that delivers a never-ending stream of JSON messages:
http://stream.twitter.com/1/statuses...limited=length

Since the format of the output is never-ending and specific in nature I wrote a small “User Defined Java Class” script:

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException{HttpClient client = SlaveConnectionManager.getInstance().createHttpCli ent();client.setTimeout(10000);client.setConnectio nTimeout(10000);Credentials creds = new UsernamePasswordCredentials(getParameter("USERNAME"), getParameter("PASSWORD"));client.getState().setCredentials(AuthScope.ANY, creds);client.getParams().setAuthenticationPreempt ive(true);HttpMethod method = new PostMethod("http://stream.twitter.com/1/statuses/sample.json?delimited=length");// Execute request//InputStream inputStream=null;BufferedInputStream bufferedInputStream=null;try {int result = client.executeMethod(method);// the response//inputStream = method.getResponseBodyAsStream();bufferedInputStre am = new BufferedInputStream(inputStream, 1000);StringBuffer bodyBuffer = new StringBuffer();int opened=0;int c;while ( (c=bufferedInputStream.read())!=-1 && !isStopped()) {char ch = (char)c;bodyBuffer.append(ch);if (ch=='{') opened++; else if (ch=='}') opened--;if (ch=='}' && opened==0) {// one JSON block, pass it on!//Object[] r = createOutputRow(new Object[0], data.outputRowMeta.size());String jsonString = bodyBuffer.toString();int startIndex = jsonString.indexOf("{");if (startIndex
Latest News Headlines is offline  
Digg this Post!Add Post to del.icio.usBookmark Post in TechnoratiTweet this Post!
Reply With Quote
Reply

Bookmarks

Thread Tools Search this Thread
Search this Thread:

Advanced Search
Display Modes

Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is On
HTML code is On
Trackbacks are On
Pingbacks are On
Refbacks are On


Similar Threads
Thread Thread Starter Forum Replies Last Post
OneTick Cep Eliminates Divide Between Processing Historical and Real-time Streaming D Latest News Headlines 2010 Q2 News Headlines 0 19th April 2010 09:39 PM
Splunk Adds Real-Time Search, Analysis and Monitoring for Live Streaming Data Latest News Headlines Other International Vendors 0 6th April 2010 01:16 AM
Panopticon EX Adds Support for Real-Time Streaming Data Latest News Headlines 2009 Q4 News Headlines 0 11th November 2009 03:00 AM
Panopticon EX Adds Support for Real-Time Streaming Data Latest News Headlines Other International Vendors 0 11th November 2009 02:00 AM
Interactive Data Further Expands Coverage of Global Real-time Market Data Latest News Headlines 2009 Q4 News Headlines 0 11th November 2009 12:06 AM


All times are GMT +11. The time now is 05:57 PM.

© The Business Intelligence Group

Search Engine Optimization by vBSEO