Part I of the blog demonstrated development, deployment of individual microservices (on Oracle Application Container Cloud) and how they are loosely coupled using the Apache Kafka message hub (setup on Oracle Compute Cloud). This (second) part will continue building on the previous one and with the help of an application, it will explore microservice based stream processing and dive into the following areas

 

  • Kafka Streams: A stream processing library
  • Scalability: enable your application to handle increased demands
  • Handling state: this is a hard problem to solve when the application needs to be horizontally scalable

 

 

Technical Components

 

Open source technologies

The following open source components were used to build the sample application

 

Component

Description

 

 

Apache Kafka

A scalable, distributed pub-sub message hub

Kafka Streams

A library for building stream processing applications on top of Apache Kafka

Jersey

Used to implement REST and SSE services. Uses Grizzly as a (pluggable) runtime/container

Maven

Used as the standard Java build tool (along with its assembly plugin)

 

Oracle Cloud

The following Oracle Cloud services have been leveraged

 

Oracle Cloud Service

Description

 

 

Application Container Cloud

Serves as a scalable platform for running our

stream processing microservices

Compute Cloud

Hosts the Kafka cluster (broker)

 

Note: In addition to compute based (IaaS) Kafka hosting, Oracle Cloud now offers Event Hub Cloud. This is a compelling offering which provides Apache Kafka as a fully managed service along with other value added capabilities.

 

Hello Kafka Streams!

In simple words, Kafka Streams is a library which you can include in your Java based applications to build stream processing applications on top of Apache Kafka. Other distributed computing platforms like Apache Spark, Apache Storm etc. are widely used in the big data stream processing world, but Kafka Streams brings some unique propositions in this area

 

Kafka Streams: what & why

 

What

Why

 

 

Built on top of Kafka – leverages its scalable and fault tolerant capabilities

If you use Kafka in your ecosystem, it makes perfect sense to leverage Kafka Streams to churn streaming data to/from the Kafka topics

 

 

Microservices friendly

It’s a lightweight library which you use within your Java application. This means that you can use it to  build microservices style stream processing applications

 

 

Flexible deployment & elastic in nature

You’re not restricted to a specific deployment model (e.g. cluster-based). The application can be packaged and deployed in a flexible manner and scaled up and down easily

 

 

For fast data

Harness the power of Kafka streams to crunch high volume data in real time systems – it does not need to be at big data scale

 

 

Support for stateful processing

Helps manage local application state in a fault tolerant & scalable manner

 

 

Sample application: what’s new

 

In part I, the setup was as follows

  • A Kafka broker serving as the messaging hub
  • Producer application (on Application Container Cloud) pushing CPU usage metrics to Kafka
  • Consumer application (on Application Container Cloud) consuming those metrics from Kafka and exposes them as real time feed (using Server Sent Events)

 

Some parts of the sample have been modified to demonstrate some of the key concepts. Here is the gist

 

Component

Changes

 

 

Consumer API

The new consumer application leverages the Kafka Streams API on Application Container Cloud as compared to the traditional (polling based) Kafka Consumer client API (used in part I)

 

 

Consumer topology

We will deploy multiple instances of the Consumer application to scale our processing logic

 

 

Nature of metrics feed

The cumulative moving average of the CPU metrics per machine is calculated as opposed to the exact metric provided by the SSE feed in part I

 

 

Accessing the CPU metrics feed

the consumer application makes the CPU usage metrics available in the form of a REST API as compared to the SSE based implementation in part I

 

High level architecture

The basic architecture still remains the same i.e. microservices decoupled using a messaging layer

 

 

 

As mentioned above, the consumer application has undergone changes and is now based on the Kafka Streams API. We could have continued to use the traditional poll based Kafka Consumer client API as in part I, but the Kafka Streams API was chosen for a few reasons. Let’s go through them in detail and see how it fits in the context of the overall solution. At this point, ask yourself the following questions

 

  • How would you scale your consumer application?
  • How would you handle intermediate state (required for moving average calculation) spread across individual instances of your scaled out application?

 

Scalability

With Application Container Cloud you can spawn multiple instances of your stream processing application with ease (for more details, refer to the documentation)

 

But how does it help?

The sample application models CPU metrics being continuously sent by the producer application to a Kafka broker – for demonstration purposes, the number of machines (whose CPU metrics are being sent) have been limited to ten. But how would you handle large scale data

 

  • When the number of machines increases to scale of thousands?
  • Perhaps you want to factor in additional attributes (in addition to just the cpu usage)?
  • Maybe you want to execute all this at data-center scale?

 

The answer lies in distributing your computation across several processes and this is where horizontal scalability plays a key role.

When the CPU metrics are sent to a topic in Kafka, they are distributed to different partitions (using a default consistent hashing algorithm) – this is similar to sharding. This helps from multiple perspectives

  • When Kafka itself is scaled out (broker nodes are added) – individual partitions are replicated over these nodes for fault tolerance and high performance
  • From a consumer standpoint - multiple consumers (in the same group) automatically distribute the load among themselves

 

In the case of our example, each of the streams processing application instance is nothing but a (specialized) form of Kafka Consumer and takes up a non-overlapping set of partitions in Kafka for processing. For a setup where 2 instances which are processing data for 10 machines spread over 4 partitions in Kafka (broker). Here is a pictorial representation

 

 

 

Managing application state (at scale)

The processing logic in the sample application is not stateless i.e. it depends on previous state to calculate its current state. In the context of this application, state is

 

  • the cumulative moving average of a continuous stream of CPU metrics,
  • being calculated in parallel across a distributed set of instances, and
  • constantly changing i.e. the cumulative moving average of the machines handled by each application instance is getting updated with the latest results

 

If you confine the processing logic to a single node, the problem of localized state co-ordination would not have existed i.e. local state = global state. But this luxury is not available in a distributed processing system. Here is how our application handles it (thanks to Kafka Streams)

 

  • The local state store (a KV store) containing the machine to (cumulative moving average) CPU usage metric is sent to a dedicated topic in Kafka e.g. the in-memory-avg-store in our application (named cpu-streamz) will have a corresponding topic cpu-streamz-in-memory-avg-store-changelog in Kafka
  • This topic is called a changelog since it is a compacted one i.e. only the latest key-value pair is retained by Kafka. This is meant to achieve the goal (distributed state management) in the cheapest possible manner
  • During scale up – Kafka assigns some partitions to the new instance (see above example) and the state for those partitions (which were previously stored in another instance) are replayed from the Kafka changelog topic to build the state store for this new instance
  • When an instance crashes or is stopped – the partitions being handled by that instance is handed off to some other node and the state of the partition (stored in the Kafka changelog topic) is written to the local state store of the existing node to which the work was allotted

 

All in all, this ensures scalable and fault tolerant state management

 

Exposing application state

As mentioned above, the cumulative moving averages of CPU metrics of each machine is calculated across multiple nodes in parallel. In order to find out the global state of the system i.e. current average of all (or specific) machines, the local state stores need to be queried. The application provides a REST API for this

 

 

 

 

More details in the Testing section on how to see this in action

 

It's important to make note of these points with regards to the implementation of the REST API which in turns lets us get what we want - real time insight in to the moving averages of the CPU usage

 

  • Topology agnostic: Use a single access URL provided by Application Container Cloud (as highlighted in the diagram above). As a client, you do not have to be aware of individual application instances
  • Robust & flexible: Instances can be added or removed on the fly but the overall business logic (in this case it is calculation of the cumulative moving average of a stream of CPU metrics) will remain fault tolerant and adjust to the elastic topology changes

 

This is made possible by a combination of the following

 

  • Automatic load balancing: Application Container cloud load balances requests among multiple instances of your applications
  • Clustered setup: from an internal implementation perspective, your application instances can detect each other. For this to work, the isClustered attribute in the manifest.json is set to true and custom logic is implemented within the solution in order for the instance specific information to be discovered and used appropriately. However, this is an internal implementation detail and the user is not affected by it

Please look at the Code snippets section for some more details

  • Interactive queries: this capability in Kafka Streams enables external clients to introspect the state store of a stream processing application instance via a host-port configuration enabled within the app configuration

 

An in-depth discussion of Kafka Streams is not possible in a single blog. The above sections are meant to provide just enough background which is (hopefully) sufficient from the point of view of this blog post. Readers are encouraged to spend some time going through the official documentation and come back to this blog to continue hacking on the sample

 

Setup

You can refer to part I of the blog for the Apache Kafka related setup. The only additional step which needs to be executed is exposing the port on which your Zookeeper process is listening (its 2181 by default) – as this is required by the Kafka Streams library configuration. While executing the steps from section Open Kafka listener port section, ensure that you include the Oracle Compute Cloud configuration for 2181 (in addition to the Kafka broker port 9092)

 

Code

Maven dependenies

As mentioned earlier, from an application development standpoint, Kafka Streams is just a library. This is evident in the pom.xml

 

<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-streams</artifactId>
     <version>0.10.1.1</version>
</dependency>

 

The project also uses the appropriate Jersey libraries along with the Maven shade and assembly plugins to package the application  

Overview

The producer microservice remains the same and you can refer part I for the details. Let’s look at the revamped Consumer stream processing microservice

 

Class

Details

 

 

KafkaStreamsAppBootstrap

Entry point for the application. Kicks off Grizzly container, Kafka Stream processing pipeline

CPUMetricStreamHandler

Implements the processing pipeline logic and handles K-Stream configuration and the topology creation as well

MetricsResource

Exposes multiple REST endpoints for fetching CPU moving average metrics

Metric, Metrics

POJOs (JAXB decorated) to represent metric data. They are exchanged as JSON/XML payloads

GlobalAppState, Utils

Common utility classes

 

Now that you have a fair idea of what's going on within the application and an overview of the classes involved, it makes sense to peek at some of the relevant sections of the code

 

State store

 

    public static class CPUCumulativeAverageProcessor implements Processor<String, String> {
     ...................
        @Override
        public void init(ProcessorContext pc) {
            this.pc = pc;
            this.pc.schedule(12000); //invoke punctuate every 12 seconds
            this.machineToAvgCPUUsageStore = (KeyValueStore<String, Double>) pc.getStateStore(AVG_STORE_NAME);
            this.machineToNumOfRecordsReadStore = (KeyValueStore<String, Integer>) pc.getStateStore(NUM_RECORDS_STORE_NAME);
        }
     ...............

 

Cumulative Moving Average (CMA) calculation

 

..........
@Override
public void process(String machineID, String currentCPUUsage) {

            //turn each String value (cpu usage) to Double
            Double currentCPUUsageD = Double.parseDouble(currentCPUUsage);
            Integer recordsReadSoFar = machineToNumOfRecordsReadStore.get(machineID);
            Double latestCumulativeAvg = null;

            if (recordsReadSoFar == null) {
                PROC_LOGGER.log(Level.INFO, "First record for machine {0}", machineID);
                machineToNumOfRecordsReadStore.put(machineID, 1);
                latestCumulativeAvg = currentCPUUsageD;
            } else {
                Double cumulativeAvgSoFar = machineToAvgCPUUsageStore.get(machineID);
                PROC_LOGGER.log(Level.INFO, "CMA so far {0}", cumulativeAvgSoFar);

                //refer https://en.wikipedia.org/wiki/Moving_average#Cumulative_moving_average for details
                latestCumulativeAvg = (currentCPUUsageD + (recordsReadSoFar * cumulativeAvgSoFar)) / (recordsReadSoFar + 1);
                recordsReadSoFar = recordsReadSoFar + 1;
                machineToNumOfRecordsReadStore.put(machineID, recordsReadSoFar);
            }

            machineToAvgCPUUsageStore.put(machineID, latestCumulativeAvg); //store latest CMA in local state store
..........

 

 

Metrics POJO

 

@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class Metrics {
    private final List<Metric> metrics;

    public Metrics() {
        metrics = new ArrayList<>();
    }

    public Metrics add(String source, String machine, String cpu) {
        metrics.add(new Metric(source, machine, cpu));
        return this;
    }

    public Metrics add(Metrics anotherMetrics) {
        anotherMetrics.metrics.forEach((metric) -> {
            metrics.add(metric);
        });
        return this;
    }

    @Override
    public String toString() {
        return "Metrics{" + "metrics=" + metrics + '}';
    }
    
    public static Metrics EMPTY(){
        return new Metrics();
    }
    
}

 

 

Exposing REST API for state

 

@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response all_metrics() throws Exception {
        Response response = null;
        try {
            KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
            HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
            
          Metrics metrics = getLocalMetrics();

            ks.allMetadataForStore(storeName)
                    .stream()
                    .filter(sm -> !(sm.host().equals(thisInstance.host()) && sm.port() == thisInstance.port())) //only query remote node stores
                    .forEach(new Consumer<StreamsMetadata>() {
                        @Override
                        public void accept(StreamsMetadata t) {
                            String url = "http://" + t.host() + ":" + t.port() + "/metrics/remote";
                            //LOGGER.log(Level.INFO, "Fetching remote store at {0}", url);
                            Metrics remoteMetrics = Utils.getRemoteStoreState(url, 2, TimeUnit.SECONDS);
                            metrics.add(remoteMetrics);
                            LOGGER.log(Level.INFO, "Metric from remote store at {0} == {1}", new Object[]{url, remoteMetrics});
                        }
                    });

            response = Response.ok(metrics).build();
        } catch (Exception e) {
            LOGGER.log(Level.SEVERE, "Error - {0}", e.getMessage());
        }
        return response;
}

 

Host discovery

 

    public static String getHostIPForDiscovery() {
    String host = null;
        try {

            String hostname = Optional.ofNullable(System.getenv("APP_NAME")).orElse("streams");

            InetAddress inetAddress = Address.getByName(hostname);
            host = inetAddress.getHostAddress();

        } catch (UnknownHostException ex) {
            host = "localhost";
        }
        return host;
    }

Deployment to Application Container Cloud

 

Now that you have a fair idea of the application, it’s time to look at the build, packaging & deployment

 

Update deployment descriptors

 

The metadata files for the producer application are the same. Please refer to part I for details on how to update them. The steps below are relevant to the (new) stream processing consumer microservice.

manifest.json: You can use this file in its original state

 

{
    "runtime": {
        "majorVersion": "8"
    },
    "command": "java -jar acc-kafka-streams-1.0.jar",
  "isClustered": "true"
}

 

deployment.json

 

It contains environment variables corresponding required by the application at runtime. The value is left as a placeholder for you to fill prior to deployment.

 

{
"instances": "2",
  "environment": {
  "APP_NAME":"kstreams",
  "KAFKA_BROKER":"<as-configured-in-kafka-server-properties>",
  "ZOOKEEPER":"<zookeeper-host:port>"
  }
}

 

Here is an example

 

{
"instances": "2",
  "environment": {
  "APP_NAME":"kstreams",
  "KAFKA_BROKER":"oc-140-44-88-200.compute.oraclecloud.com:9092",
  "ZOOKEEPER":"10.190.210.199:2181"
  }
}

 

You need to be careful about the following

 

  • The value of the KAFKA_BROKER attribute should be the same as (Oracle Compute Cloud instance public DNS) the one you configured in the advertised.listeners attribute of the Kafka server.properties file
  • The APP_NAME attribute should be the same as the one you use while deploying your application using the Application Container Cloud REST API

Please refer to the following documentation for more details on metadata files

 

 

Build

 

Initiate the build process to produce the deployable artifact (a ZIP file)

 

//Producer application

cd <code_dir>/producer //maven project location
mvn clean package

//Consumer application

cd <code_dir>/producer //maven project location
mvn clean package

 

The output of the build process is the respective ZIP files for producer (accs-kafka-producer-1.0-dist.zip) and consumer (acc-kafka-streams-1.0-dist.zip) microservices respectively

 

Upload & deploy

You would need to upload the ZIP file to Oracle Storage Cloud and then reference it in the subsequent steps. Here are the required the cURL commands

 

Create a container in Oracle Storage cloud (if it doesn't already exist)  
  
curl -i -X PUT -u <USER_ID>:<USER_PASSWORD> <STORAGE_CLOUD_CONTAINER_URL>  
e.g. curl -X PUT –u jdoe:foobar "https://domain007.storage.oraclecloud.com/v1/Storage-domain007/accs-kstreams-consumer/"  
  
Upload your zip file into the container (zip file is nothing but a Storage Cloud object)  
  
curl -X PUT -u <USER_ID>:<USER_PASSWORD> <STORAGE_CLOUD_CONTAINER_URL> -T <zip_file> "<storage_cloud_object_URL>" //template  
e.g. curl -X PUT –u jdoe:foobar -T acc-kafka-streams-1.0-dist.zip "https://domain007.storage.oraclecloud.com/v1/Storage-domain007/accs-kstreams-consumer/accs-kafka-consumer.zip"

 

 

Repeat the same for the producer microservice

 

You can now deploy your application to Application Container Cloud using its REST API. The Oracle Storage cloud path (used above) will be referenced while using the Application Container Cloud REST API (used for deployment). Here is a sample cURL command which makes use of the REST API

 

curl -X POST -u joe@example.com:password \    
-H "X-ID-TENANT-NAME:domain007" \    
-H "Content-Type: multipart/form-data" -F "name=kstreams" \    
-F "runtime=java" -F "subscription=Monthly" \    
-F "deployment=@deployment.json" \    
-F "archiveURL=accs-kstreams-consumer/accs-kafka-consumer.zip" \    
-F "notes=notes for deployment" \    
https://apaas.oraclecloud.com/paas/service/apaas/api/v1.1/apps/domain007  

 

Note

  • the name attribute used in the curl command should be the same as the APP_NAME attribute used in the manifest.json
  • Repeat the same for the producer microservice

 

Post deployment

(the consumer application has been highlighted below)

 

The Applications console

 

 

The Overview sub-section

 

 

 

The Deployments sub-section

 

 

 

Testing

Assuming your Kakfa broker is up and running and you have deployed the application successfully, execute the below mentioned steps to test drive your application

 

Start the producer

Trigger your producer application by issuing a HTTP GET https://my-producer-app-url/producer e.g. https://accs-kafka-producer-domain007.apaas.us.oraclecloud.com/producer. This will start producing (random) CPU metrics for a bunch of (10) machines

 

 

You can stop the producer by issuing a HTTP DELETE on the same URL

 

 

Check the statistics

 

Cumulative moving average of all machines

Allow the producer to run for a 30-40 seconds and then check the current statistics. Issue a HTTP GET request to your consumer application e.g. https://acc-kafka-streams-domain007.apaas.us.oraclecloud.com/metrics. You’ll see a response payload similar to what’s depicted below

 

 

 

The information in the payload is as following

  • cpu: the cumulative average of the CPU usage of a machine
  • machine: the machine ID
  • source: this has been purposefully added as a diagnostic information to see which node (instance in the Application Container Cloud) is handling the calculation for a specific machine (this is subject to change as your application scales up/down)

 

Cumulative moving average of a specific machine

 

 

 

Scale your application

Increase the number of instances of your application (from 2 to 3)

 

 

 

Check the stats again and you’ll notice that the computation task is being shared among three nodes now..

 

That’s all for this blog series.. !

 

**The views expressed in this post are my own and do not necessarily reflect the views of Oracle