An Introduction to Oracle Stream Analytics and Stream Processing Kafka Data

Version 2

    By Robin Moffatt ace.gif

     

    Oracle Stream Analytics (OSA) is a graphical tool that provides “Business Insight into Fast Data”. In layman terms, that translates into an intuitive web-based interface for exploring, analysing, and manipulating streaming data sources in realtime. These sources can include REST, JMS queues, as well as Kafka. The inclusion of Kafka opens OSA up to integration with many new-build data pipelines that use this as a backbone technology.

    osa_ani_01.gif

    Previously known as Oracle Stream Explorer, it is part of the SOA component of Fusion Middleware (just as OBIEE and ODI are part of FMW too). In a recent blog it was positioned as “[…] part of Oracle Data Integration And Governance Platform.”. Its Big Data credentials include support for Kafka as source and target, as well as the option to execute across multiple nodes for scaling performance and capacity using Spark.

     

    I’ve been exploring OSA from the comfort of my own Mac, courtesy of Docker and a Docker image for OSA created by Guido Schmutz. The benefits of Docker are many and covered elsewhere, but what I loved about it in this instance was that I didn’t have to download a VM that was 10s of GB. Nor did I have to spend time learning how to install OSA from scratch, which whilst interesting wasn’t a priority compared to just trying to tool out and seeing what it could do. [Update] it turns out that installation is a piece of cake, and the download is less than 1Gb … but in general the principle still stands - Docker is a great way to get up and running quickly with something

    In this article we’ll take OSA for a spin, looking at some of the functionality and terminology, and then real examples of use with live Twitter data.

     

    To start with, we sign in to Oracle Stream Analytics:

    osa22.png

    From here, click on the Catalog link, where a list of all the resources are listed. Some of these resource types include:

    • Streams - definitions of sources of data such as Kafka, JMS, and a dummy data generator (event generator)
    • Connections - Servers etc from which Streams are defined
    • Explorations - front-end for seeing contents of Streams in realtime, as well as applying light transformations
    • Targets - destination for transformed streams

    Viewing Realtime Twitter Data with OSA

    The first example I’ll show is the canonical big data/streaming example everywhere – Twitter. Twitter is even built into OSA as a Stream source. If you go to https://dev.twitter.com you can get yourself a set of credentials enabling you to query the live Twitter firehose for given hashtags or users.

    With my twitter dev credentials, I create a new Connection in OSA:

     

    osa25.pngosa24.pngosa23.png

     

    Now we have an entry in the Catalog, for the Twitter connection:

     

    osa26.png

     

    ...from which we can create a Stream, using the connection and a set of hashtags or users for whom we want to stream tweets:

     

    osa28.pngosa27.png

     

    The Shape is basically the schema or data model that is applied for the stream. There is one built-in for Twitter, which we’ll use here:

    osa29.png

    When you click Save, if you get an error Unable to deploy OEP application then check the OSA log file for errors such as unable to reach Twitter, or invalid credentials.

     

    Assuming the Stream is created successfully you are then prompted to create an Exploration from where you can see the Stream in realtime:

    osa30.png

    Explorations can have multiple stream sources, and be used to transform the contents, which we’ll see later. For now, after clicking Create, we get our Exploration window, which shows the contents of the stream in realtime:

    osa_ani_01.gif

    At the bottom of the screen there’s the option to plot one or more charts showing the value of any numeric values in the stream, as can be seen in the animation above.

    I’ll leave this example here for now, but finish by using the Publish option from the Actions menu, which makes it available as a source for subsequent analyses.

     

    Adding Lookup Data to Streams

     

    Let's look now at some more of the options available for transforming and 'wrangling' streaming data with OSA. Here I’m going to show how two streams can be joined together (but not crossed) based on a common field, and the resulting stream used as the input for a subsequent process. The data is simulated, using a CSV file (read by OSA on a loop) and OSA's Event Generator.

     

    From the Catalog page I create a new Stream, using Event Generator as the Type:

    osa31.png

    On the second page of the setup I define how frequently I want the dummy events to be generated, and the specification for the dummy data:

    osa32.png

    The last bit of setup for the stream is to define the Shape, which is the schema of data that I’d like generated:

    osa33.png

    The Exploration for this stream shows the dummy data:

    osa34.png

    The second stream is going to be sourced from a very simple key/value CSV file:

    attr_id,attr_value   1,never   2,gonna   3,give   4,you   5,up  

    The stream type is CSV, and I can configure how often OSA reads from it, as well as telling OSA to loop back to the beginning when it's read to the end, thus simulating a proper stream. The ‘shape’ is picked up automatically from the file, based on the first row (headers) and then inferred data types:

    osa37.png

    The Exploration for the stream shows the five values repeatedly streamed through (since I ticked the box to ‘loop’ the CSV file in the stream):

    osa38.png

    Back on the Catalog page I’m going to create a new Exploration, but this time based on a Pattern. Patterns are pre-built templates for stream manipulation and processing. Here we’ll use the pattern for a “left outer join” between streams.

    osa41.png

    osa39.png

    The Pattern has a set of pre-defined fields that need to be supplied, including the stream names and the common field with which to join them. Note also that I’ve increased the Window Range. This is necessary so that a greater range of CSV stream events are used for the lookup. If the Range is left at the default of 1 second then only events from both streams occurring in the same second that match on attrid would be matched. Unless both streams happen to be in sync on the same attrid from the outset then this isn’t going to happen that often, and certainly wouldn’t in a real-life data stream.

     

    So now we have the two joined streams:

    osa42.png

    Within an Exploration it is possible to do light transformation work. By right-clicking on a column you can rename or remove it, which I’ve done here for the duplicated attrid (duplicated since it appears in both streams), as well as renamed the attrvalue:

    osa43.png

     

    Daisy-Chaining, Targets, and Topology

    Once an Exploration is Published it can be used as the Source for subsequent Explorations, enabling you to map out a pipeline based on multiple source streams and transformations. Here we're taking the exploration created just above that joined the two streams together, and using the output as the source for a new Exploration:

    osa44.png

    Since the Exploration is based on a previous one, the same stream data is available, but with the joins and transformations already applied

    osa45.png

    From here another transformation could be applied, such as replacing the value of one column conditionally based on that of another

    osa46.png

    Whilst OSA enables rapid analysis and transformation of inbound streams, it also lets you stream the transformed results outside of OSA, to a Target as we saw in the Kafka example above. As well as Kafka other technologies are supported as targets, including a REST endpoint, or a simple CSV file.

    osa48.png

    With a target configured, as well as an Exploration based on the output of another, the Topology comes in handy for visualising the flow of data. You can access this from the Topology icon in an Exploration page, or from the dropdown menu on the Catalog page against a given object:

    18Oracle_Stream_Analytics.png

     


     

    So that's some background to Oracle Stream Analytics and its basic component. Let's now see how OSA can be used with Kafka.

     

    Kafka is one of the foremost streaming technologies nowadays, for very good reasons. It is highly scalable and flexible, supporting multiple concurrent consumers. Oracle Streaming Analytics supports Kafka as both a source and target. To set up an inbound stream from Kafka, first we define the Connection:

    osa52.pngosa51.png

    Once the Connection is defined, we can create a Stream for a given Kafka topic:

    14Oracle_Stream_Analytics-1.png

    If you get an error at this point of Unable to deploy OEP application then check the OSA log - it could be a connectivity issue to Zookeeper.

    Exception in thread "SpringOsgiExtenderThread-286" org.springframework.beans.FatalBeanException:   Error in context lifecycle initialization; nested exception is com.bea.wlevs.ede.api.EventProcessingException:   org.I0Itec.zkclient.exception.ZkTimeoutException:   Unable to connect to zookeeper server within timeout: 6000  

    Assuming that the Stream is saved with no errors, you can then create an Exploration based on the stream and all being well, the live tweets are soon shown. Unlike the example at the top of this article, these tweets are coming in via Kafka, rather than the built-in OSA Twitter Stream. This is partly to demonstrate the Kafka capabilities, but also because the built-in OSA Twitter Stream only includes a subset of the available twitter data fields.

    osa_ani_02.gif

    Avro? Nope.

    Data in Kafka can be serialized in many formats, including Avro - which OSA doesn’t seem to like. No error is thrown to the GUI but the exploration remains blank.

    osa56.png

    Looking in the OSA log file there’s a whole lot of errors recorded similar to this:

       line 1:0 no viable alternative at character '?'   line 1:1 no viable alternative at character '?'   line 1:2 no viable alternative at character '?'   line 1:3 no viable alternative at character '?'   line 1:4 no viable alternative at character '?'   line 1:5 no viable alternative at character '?'   line 1:6 no viable alternative at character '?'   line 1:7 no viable alternative at character '?'   line 1:8 no viable alternative at character ''  

    JSON? Kinda.

     

    One of the challenges that I found working with OSA was defining the “Shape” (data model) of the inbound stream data. JSON is a format used widely as a technology-agnostic data interchange format, including for the twitter data that I was working with. You can see a sample record here. One of the powerful features of JSON is its ability to nest objects in a record, as well as create arrays of them. You can read more about this detail in an article here. Unfortunately it seems that OSA does not support flattening out JSON, meaning that only elements in the root of the model are accessible. For twitter, that means we can see the text, and who it was in reply to, but not the user who tweeted it, since the latter is a nested element (along with many other fields, including hashtags which are also an array):

     

    root
    |-- created_at: string (nullable = true)
    |-- entities: struct (nullable = true)
    | |-- hashtags: array (nullable = true)
    | | |-- element: struct (containsNull = true)
    | | | |-- indices: array (nullable = true)
    | | | | |-- element: long (containsNull = true)
    | | | |-- text: string (nullable = true)
    | |-- user_mentions: array (nullable = true)
    | | |-- element: struct (containsNull = true)
    | | | |-- id: long (nullable = true)
    | | | |-- id_str: string (nullable = true)
    | | | |-- indices: array (nullable = true)
    | | | | |-- element: long (containsNull = true)
    | | | |-- name: string (nullable = true)
    | | | |-- screen_name: string (nullable = true)
    |-- source: string (nullable = true)
    |-- text: string (nullable = true)
    |-- timestamp_ms: string (nullable = true)
    |-- truncated: boolean (nullable = true)
    |-- user: struct (nullable = true)
    | |-- followers_count: long (nullable = true)
    | |-- following: string (nullable = true)
    | |-- friends_count: long (nullable = true)
    | |-- name: string (nullable = true)
    | |-- screen_name: string (nullable = true)
    
    So what to do if the inbound streaming data is in nested-JSON format? It seems to me the only option is to pre-process it to 
    flatten it. There are a variety of tools that could be used here - in the first instance I’d generally reach for Logstash,
    it being the one I’m most familiar with. To get an idea of the schema of a JSON record you can use jsonschema.net. Funnily
    enough when I was researching this blog post I came across the exact same problem on a forum posted by … me! Early last year I
    was working with the same dataset, and had the same issue with embedded arrays. The way to do it in Logstash is with a bit of
    Ruby code to flatten the arrays, and a standard mutate to bring nested objects up to the root level. Sample code:
    mutate {  
        add_field => { "user_name" => "%{[user][name]}" }  
        add_field => { "user_screen_name" => "%{[user][screen_name]}" }  
    }
    ruby {  
        code => 'event["hashtags_array"] = event["[entities][hashtags]"].collect { |m| m["text"] } unless event["[entities][hashtags]"].nil?  
                 event["hashtags_list"] = event["hashtags_array"].join(",")  unless event["[hashtags_array]"].nil?'  
    }
    

    You can find the full Logstash code on gist here. With this logstash code running I set up a new OSA Stream pointing to the new Kafka topic that Logstash was writing, and added the flattened fields to the Shape:

    osa59.png

    We can then see in the Exploration the fields that we wanted to get at - user name, hashtags, and so on:

    osa_ani_03.gif

    Other Shape Gotchas

    One of the fields in Twitter data is ‘source’ - which unfortunately is a reserved identifier in the CQL language that OSA uses behind the scenes.

     

    Caused By: org.springframework.beans.FatalBeanException: Exception initializing channel; nested exception is com.
    .wlevs.ede.api.ConfigurationException: Event type [sx-10-16-Kafka_Technology_Tweets_JSON-1] of channel [channel] uses invalid
     or reserved CQL identifier = , source
    

     

    It’s not clear how to define a shape in which the source data field is named after a reserved identifier.

    Further Exploration of Twitter Streams with OSA

    Using the flattened Twitter stream coming via Kafka that I demonstrated above, let’s now look at more OSA functionality.

    Depending on the source of your data stream, and your purpose for analysing it, you may well want to filter out certain content. This can be done from the Exploration screen:

    osa61.png

    The Business Rules section of the Exploration enables you to define rules about the data and set field values based on it. This can be static values, or expressions based on data in the stream. There doens’t seem to be a way to add arbitrary fields via this, so I amended the Stream Shape to include a ‘spare’ field that I then populated:

    osa65.png

    Kafka Stream Transformation with OSA

     

    Here we’ll see how OSA can be used to ingest one Kafka topic, apply a transformation, and stream it to another Kafka topic.

    The OSA exploration screen offers a basic aggregation (‘summary’) function, here showing the number of tweets per language:

    osa62.png

    Using the Windows icon to the right of the Sources box the time window can be defined, along with the refresh frequency:

    osa64.png

    This means that the count of tweets per language will be calculated looking at the data for the past 30 seconds, and this will be evaluated every five seconds. More complex functionality such as pivoting on the group-by column (so as to be able to chart out the number of tweets per language as separate metrics) doesn’t seem to be present in this release; arguably this is moving over into per analytics territory such as would be found in Oracle’s Big Data Discovery.

     

    Taking the summarised stream (count of tweets, by language) I first Publish the exploration, making it available for use as the input to a subsequent exploration. Then from the Catalog page select a Pattern, which I’m going to use to build a stream showing the most common languages in the past five seconds. With the Top N pattern you specify the event stream (in this case, the summarised stream that I built above), and the metric by which to order the events which here is the count of tweets per language.

    osa66.png

    For completeness, I’m going to stream the output of this pattern exploration back to a Kafka topic

    osa68.pngosa69.png

    Note that I’ve defined a new Shape here based on the columns in the pattern. In the pattern itself I renamed the COUNT column to a clearer one (tweetcount5_sec). Renaming it wasn’t strictly necessary since it’s possible to define the field/shape mapping when you configure the Target:

    osa70.png

    For the target to take effect, I publish the pattern exploration, and then using kafka-console-consumer can see the topic being populated in realtime by OSA:

    osa_ani_04.gif

    Being able to apply transformations to streams in realtime like this and stream the results is pretty useful. There are some limitations to the capabilities of OSA through the front end GUI. For example, support for nested json, and integration with the Kafka Schema Registry to automatically derive Shapes for inbound topics would both be great. Lower-level, the option to specify the consumer group id, as well as the start point for consumption (beginning of topic, or streaming at the end) are both things that would probably be necessary sooner or later using OSA for full-blown development.

     

    OSA and Spatial

     

    One of the Patterns that OSA provides is a Spatial one, which can be used to analyse source data that includes geo-location data. This could be to simply plot the occurrence of the data point (as we'll see shortly) on a map. It can also be used in a more sophisticated manner, to track a given entity's movements on a map. An example of this could be a fleet of trucks reporting their position back at regular intervals. Areas on a map can be defined and conditions triggered as the entity enters or leaves the area. For now though, we'll keep it simple. Using the flattened Twitter stream from Kafka that I produced from Logstash above, I'm going to plot Tweets in realtime on a map, along with a very simplistic tagging of the broad area in which they came from.

     

    In my source Kafka topic I have two fields, latitude and longitude. I expose these as part of the 'flattening' of the JSON in this logstash script, since by default they're nested within the coordinates field and as an array too. When defining the Stream's Shape make sure you define the datatype correctly (Double) - OSA is not very forgiving of stupidity and I spent a frustrating time trying to work out why "-80.1422195" was coming through as zero - obviously defined as an Integer this was never going to work!

    Not entirely necessary, but useful for debug purposes, I setup an exploration based on the flattened Twitter stream, with a filter to only include tweets that had geo-location data in them. This way I knew what tweets I should expect to be seeing in the next step. One of the things that I have found with OSA is that it has a tendency to fail silently; instead of throwing errors you'll just not get any data. By setting up the filter exploration I could at least debug things a bit more easily.

    12Oracle_Stream_Analytics-2-1024x465.png

     

    After this I created a new object, a Map. A Map object defines a set of named areas, which could be sourced from a database table, or drawn manually - which is what I did here by setting the Map Type to 'None (Create Manually)'. One thing to note about the maps is that they're sourced online (openstreetmap.org) so you'll need an internet connection to do this. Once the Map is open, click the Polygon Tool icon and click-drag a shape around the area that you want to "geo-fence". Each area is given a name, and this is what is used in the streaming data to label the event's geographical area.

    24Oracle_Stream_Analytics-3-1024x603.png

     

    Having got our source data stream with geo-data in, and a Map on which to plot it and analyse the location of each event, we now use the Spatial General pattern to create an Exploration. The topology looks like this:

    11Oracle_Stream_Analytics-7-e1469218770977.png

    The fields in the Spatial General pattern are all pretty obvious. Object key is the field to use to track the same entity across multiple events, if you want to use the enter/exit/stay statuses. For tweets we just use 'Enter', but for people or vehicles, for example, you might get multiple status reports and want to track them on a map. For example, when a person is near a point of interest that you're tracking, or a vehicle has remained in a set area for too long.

    13Oracle_Stream_Analytics-5-1024x455.png

    If you let the Exploration now run, depending on the rate of event ingest, you'll sooner or later see points appearing on the map and event details underneath. The "status" column is populated (blank if the event is outside of the defined geo-fences), as is the "Place", based on the geo-fence names that you defined.

    osa_ani_05.gif

    Summary

    I can see OSA being used in two ways. The first as an ‘endpoint’ for streams with users taking actions based on the data, with some of the use cases listed here. The second is for prototyping transformations and analyses on streams prior to productionising them. The visual interface and immediacy of feedback on transformations applied means that users can quickly understand what further processing they may want to apply to the stream using actual streaming data to inform this.

     

    This latter concept - that of prototyping - is similar to that which we see with another of Oracle’s products, Big Data Discovery. With BDD users can analyse data in the organisation’s data reservoir, as well as apply transformations to it (read more). Just as BDD doesn’t replace OBIEE or Visual Analyzer but enables users to understand how they do want to model the data in these tools, OSA wouldn’t replace “production grade” integration done by Oracle Data Integrator. What it would do is allow users to get a clearer idea of the transformations they would want performed in it.

     

    OSA's user interface is easy to use and intuitive, and this is definitely a tool that you would put in front of technically minded business users. There are limitations to what can be achieved technically through the web GUI alone and something like Oracle Data Integrator (ODI) would still be a more appropriate fit for complex streaming work. At Oracle Open World last year it was announced (slides) that a beta would be starting for ODI using Spark Streaming for ETL and stream processing, so it'll be interesting to see this when it comes out.

     

    Further Reading

     

    About the Author

    Robin Moffatt (rmoff) is Head of R&D (Europe) at Rittman Mead, and an Oracle ACE. He specializes in OBIEE and Linux as well as ODI, and more recently delving into the worlds of Hadoop and Elasticsearch. His particular interests are data and analytics, systems architecture, performance testing and optimization. He blogs at http://ritt.md/rmoff and http://rmoff.net/ and can be found tweeting grumpy geek thoughts as @rmoff.

     

    This article originally appeared in two parts on the Rittman Mead blog

     


    This article represents the expertise, findings, and opinion of the author.  It has been published by Oracle in this space as part of a larger effort to encourage the exchange of such information within this Community, and to promote evaluation and commentary by peers. This article has not been reviewed by the relevant Oracle product team for compliance with Oracle's standards and practices, and its publication should not be interpreted as an endorsement by Oracle of the statements expressed therein.