Part I of the blog demonstrated development, deployment of individual microservices (on Oracle Application Container Cloud) and how they are loosely coupled using the Apache Kafka message hub (setup on Oracle Compute Cloud). This (second) part will continue building on the previous one and with the help of an application, it will explore microservice based stream processing and dive into the following areas


  • Kafka Streams: A stream processing library
  • Scalability: enable your application to handle increased demands
  • Handling state: this is a hard problem to solve when the application needs to be horizontally scalable



Technical Components


Open source technologies

The following open source components were used to build the sample application






Apache Kafka

A scalable, distributed pub-sub message hub

Kafka Streams

A library for building stream processing applications on top of Apache Kafka


Used to implement REST and SSE services. Uses Grizzly as a (pluggable) runtime/container


Used as the standard Java build tool (along with its assembly plugin)


Oracle Cloud

The following Oracle Cloud services have been leveraged


Oracle Cloud Service




Application Container Cloud

Serves as a scalable platform for running our

stream processing microservices

Compute Cloud

Hosts the Kafka cluster (broker)


Note: In addition to compute based (IaaS) Kafka hosting, Oracle Cloud now offers Event Hub Cloud. This is a compelling offering which provides Apache Kafka as a fully managed service along with other value added capabilities.


Hello Kafka Streams!

In simple words, Kafka Streams is a library which you can include in your Java based applications to build stream processing applications on top of Apache Kafka. Other distributed computing platforms like Apache Spark, Apache Storm etc. are widely used in the big data stream processing world, but Kafka Streams brings some unique propositions in this area


Kafka Streams: what & why






Built on top of Kafka – leverages its scalable and fault tolerant capabilities

If you use Kafka in your ecosystem, it makes perfect sense to leverage Kafka Streams to churn streaming data to/from the Kafka topics



Microservices friendly

It’s a lightweight library which you use within your Java application. This means that you can use it to  build microservices style stream processing applications



Flexible deployment & elastic in nature

You’re not restricted to a specific deployment model (e.g. cluster-based). The application can be packaged and deployed in a flexible manner and scaled up and down easily



For fast data

Harness the power of Kafka streams to crunch high volume data in real time systems – it does not need to be at big data scale



Support for stateful processing

Helps manage local application state in a fault tolerant & scalable manner



Sample application: what’s new


In part I, the setup was as follows

  • A Kafka broker serving as the messaging hub
  • Producer application (on Application Container Cloud) pushing CPU usage metrics to Kafka
  • Consumer application (on Application Container Cloud) consuming those metrics from Kafka and exposes them as real time feed (using Server Sent Events)


Some parts of the sample have been modified to demonstrate some of the key concepts. Here is the gist






Consumer API

The new consumer application leverages the Kafka Streams API on Application Container Cloud as compared to the traditional (polling based) Kafka Consumer client API (used in part I)



Consumer topology

We will deploy multiple instances of the Consumer application to scale our processing logic



Nature of metrics feed

The cumulative moving average of the CPU metrics per machine is calculated as opposed to the exact metric provided by the SSE feed in part I



Accessing the CPU metrics feed

the consumer application makes the CPU usage metrics available in the form of a REST API as compared to the SSE based implementation in part I


High level architecture

The basic architecture still remains the same i.e. microservices decoupled using a messaging layer




As mentioned above, the consumer application has undergone changes and is now based on the Kafka Streams API. We could have continued to use the traditional poll based Kafka Consumer client API as in part I, but the Kafka Streams API was chosen for a few reasons. Let’s go through them in detail and see how it fits in the context of the overall solution. At this point, ask yourself the following questions


  • How would you scale your consumer application?
  • How would you handle intermediate state (required for moving average calculation) spread across individual instances of your scaled out application?



With Application Container Cloud you can spawn multiple instances of your stream processing application with ease (for more details, refer to the documentation)


But how does it help?

The sample application models CPU metrics being continuously sent by the producer application to a Kafka broker – for demonstration purposes, the number of machines (whose CPU metrics are being sent) have been limited to ten. But how would you handle large scale data


  • When the number of machines increases to scale of thousands?
  • Perhaps you want to factor in additional attributes (in addition to just the cpu usage)?
  • Maybe you want to execute all this at data-center scale?


The answer lies in distributing your computation across several processes and this is where horizontal scalability plays a key role.

When the CPU metrics are sent to a topic in Kafka, they are distributed to different partitions (using a default consistent hashing algorithm) – this is similar to sharding. This helps from multiple perspectives

  • When Kafka itself is scaled out (broker nodes are added) – individual partitions are replicated over these nodes for fault tolerance and high performance
  • From a consumer standpoint - multiple consumers (in the same group) automatically distribute the load among themselves


In the case of our example, each of the streams processing application instance is nothing but a (specialized) form of Kafka Consumer and takes up a non-overlapping set of partitions in Kafka for processing. For a setup where 2 instances which are processing data for 10 machines spread over 4 partitions in Kafka (broker). Here is a pictorial representation




Managing application state (at scale)

The processing logic in the sample application is not stateless i.e. it depends on previous state to calculate its current state. In the context of this application, state is


  • the cumulative moving average of a continuous stream of CPU metrics,
  • being calculated in parallel across a distributed set of instances, and
  • constantly changing i.e. the cumulative moving average of the machines handled by each application instance is getting updated with the latest results


If you confine the processing logic to a single node, the problem of localized state co-ordination would not have existed i.e. local state = global state. But this luxury is not available in a distributed processing system. Here is how our application handles it (thanks to Kafka Streams)


  • The local state store (a KV store) containing the machine to (cumulative moving average) CPU usage metric is sent to a dedicated topic in Kafka e.g. the in-memory-avg-store in our application (named cpu-streamz) will have a corresponding topic cpu-streamz-in-memory-avg-store-changelog in Kafka
  • This topic is called a changelog since it is a compacted one i.e. only the latest key-value pair is retained by Kafka. This is meant to achieve the goal (distributed state management) in the cheapest possible manner
  • During scale up – Kafka assigns some partitions to the new instance (see above example) and the state for those partitions (which were previously stored in another instance) are replayed from the Kafka changelog topic to build the state store for this new instance
  • When an instance crashes or is stopped – the partitions being handled by that instance is handed off to some other node and the state of the partition (stored in the Kafka changelog topic) is written to the local state store of the existing node to which the work was allotted


All in all, this ensures scalable and fault tolerant state management


Exposing application state

As mentioned above, the cumulative moving averages of CPU metrics of each machine is calculated across multiple nodes in parallel. In order to find out the global state of the system i.e. current average of all (or specific) machines, the local state stores need to be queried. The application provides a REST API for this





More details in the Testing section on how to see this in action


It's important to make note of these points with regards to the implementation of the REST API which in turns lets us get what we want - real time insight in to the moving averages of the CPU usage


  • Topology agnostic: Use a single access URL provided by Application Container Cloud (as highlighted in the diagram above). As a client, you do not have to be aware of individual application instances
  • Robust & flexible: Instances can be added or removed on the fly but the overall business logic (in this case it is calculation of the cumulative moving average of a stream of CPU metrics) will remain fault tolerant and adjust to the elastic topology changes


This is made possible by a combination of the following


  • Automatic load balancing: Application Container cloud load balances requests among multiple instances of your applications
  • Clustered setup: from an internal implementation perspective, your application instances can detect each other. For this to work, the isClustered attribute in the manifest.json is set to true and custom logic is implemented within the solution in order for the instance specific information to be discovered and used appropriately. However, this is an internal implementation detail and the user is not affected by it

Please look at the Code snippets section for some more details

  • Interactive queries: this capability in Kafka Streams enables external clients to introspect the state store of a stream processing application instance via a host-port configuration enabled within the app configuration


An in-depth discussion of Kafka Streams is not possible in a single blog. The above sections are meant to provide just enough background which is (hopefully) sufficient from the point of view of this blog post. Readers are encouraged to spend some time going through the official documentation and come back to this blog to continue hacking on the sample



You can refer to part I of the blog for the Apache Kafka related setup. The only additional step which needs to be executed is exposing the port on which your Zookeeper process is listening (its 2181 by default) – as this is required by the Kafka Streams library configuration. While executing the steps from section Open Kafka listener port section, ensure that you include the Oracle Compute Cloud configuration for 2181 (in addition to the Kafka broker port 9092)



Maven dependenies

As mentioned earlier, from an application development standpoint, Kafka Streams is just a library. This is evident in the pom.xml




The project also uses the appropriate Jersey libraries along with the Maven shade and assembly plugins to package the application  


The producer microservice remains the same and you can refer part I for the details. Let’s look at the revamped Consumer stream processing microservice







Entry point for the application. Kicks off Grizzly container, Kafka Stream processing pipeline


Implements the processing pipeline logic and handles K-Stream configuration and the topology creation as well


Exposes multiple REST endpoints for fetching CPU moving average metrics

Metric, Metrics

POJOs (JAXB decorated) to represent metric data. They are exchanged as JSON/XML payloads

GlobalAppState, Utils

Common utility classes


Now that you have a fair idea of what's going on within the application and an overview of the classes involved, it makes sense to peek at some of the relevant sections of the code


State store


    public static class CPUCumulativeAverageProcessor implements Processor<String, String> {
        public void init(ProcessorContext pc) {
            this.pc = pc;
            this.pc.schedule(12000); //invoke punctuate every 12 seconds
            this.machineToAvgCPUUsageStore = (KeyValueStore<String, Double>) pc.getStateStore(AVG_STORE_NAME);
            this.machineToNumOfRecordsReadStore = (KeyValueStore<String, Integer>) pc.getStateStore(NUM_RECORDS_STORE_NAME);


Cumulative Moving Average (CMA) calculation


public void process(String machineID, String currentCPUUsage) {

            //turn each String value (cpu usage) to Double
            Double currentCPUUsageD = Double.parseDouble(currentCPUUsage);
            Integer recordsReadSoFar = machineToNumOfRecordsReadStore.get(machineID);
            Double latestCumulativeAvg = null;

            if (recordsReadSoFar == null) {
                PROC_LOGGER.log(Level.INFO, "First record for machine {0}", machineID);
                machineToNumOfRecordsReadStore.put(machineID, 1);
                latestCumulativeAvg = currentCPUUsageD;
            } else {
                Double cumulativeAvgSoFar = machineToAvgCPUUsageStore.get(machineID);
                PROC_LOGGER.log(Level.INFO, "CMA so far {0}", cumulativeAvgSoFar);

                //refer for details
                latestCumulativeAvg = (currentCPUUsageD + (recordsReadSoFar * cumulativeAvgSoFar)) / (recordsReadSoFar + 1);
                recordsReadSoFar = recordsReadSoFar + 1;
                machineToNumOfRecordsReadStore.put(machineID, recordsReadSoFar);

            machineToAvgCPUUsageStore.put(machineID, latestCumulativeAvg); //store latest CMA in local state store



Metrics POJO


public class Metrics {
    private final List<Metric> metrics;

    public Metrics() {
        metrics = new ArrayList<>();

    public Metrics add(String source, String machine, String cpu) {
        metrics.add(new Metric(source, machine, cpu));
        return this;

    public Metrics add(Metrics anotherMetrics) {
        anotherMetrics.metrics.forEach((metric) -> {
        return this;

    public String toString() {
        return "Metrics{" + "metrics=" + metrics + '}';
    public static Metrics EMPTY(){
        return new Metrics();



Exposing REST API for state


public Response all_metrics() throws Exception {
        Response response = null;
        try {
            KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
            HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
          Metrics metrics = getLocalMetrics();

                    .filter(sm -> !( && sm.port() == thisInstance.port())) //only query remote node stores
                    .forEach(new Consumer<StreamsMetadata>() {
                        public void accept(StreamsMetadata t) {
                            String url = "http://" + + ":" + t.port() + "/metrics/remote";
                            //LOGGER.log(Level.INFO, "Fetching remote store at {0}", url);
                            Metrics remoteMetrics = Utils.getRemoteStoreState(url, 2, TimeUnit.SECONDS);
                            LOGGER.log(Level.INFO, "Metric from remote store at {0} == {1}", new Object[]{url, remoteMetrics});

            response = Response.ok(metrics).build();
        } catch (Exception e) {
            LOGGER.log(Level.SEVERE, "Error - {0}", e.getMessage());
        return response;


Host discovery


    public static String getHostIPForDiscovery() {
    String host = null;
        try {

            String hostname = Optional.ofNullable(System.getenv("APP_NAME")).orElse("streams");

            InetAddress inetAddress = Address.getByName(hostname);
            host = inetAddress.getHostAddress();

        } catch (UnknownHostException ex) {
            host = "localhost";
        return host;

Deployment to Application Container Cloud


Now that you have a fair idea of the application, it’s time to look at the build, packaging & deployment


Update deployment descriptors


The metadata files for the producer application are the same. Please refer to part I for details on how to update them. The steps below are relevant to the (new) stream processing consumer microservice.

manifest.json: You can use this file in its original state


    "runtime": {
        "majorVersion": "8"
    "command": "java -jar acc-kafka-streams-1.0.jar",
  "isClustered": "true"




It contains environment variables corresponding required by the application at runtime. The value is left as a placeholder for you to fill prior to deployment.


"instances": "2",
  "environment": {


Here is an example


"instances": "2",
  "environment": {


You need to be careful about the following


  • The value of the KAFKA_BROKER attribute should be the same as (Oracle Compute Cloud instance public DNS) the one you configured in the advertised.listeners attribute of the Kafka file
  • The APP_NAME attribute should be the same as the one you use while deploying your application using the Application Container Cloud REST API

Please refer to the following documentation for more details on metadata files





Initiate the build process to produce the deployable artifact (a ZIP file)


//Producer application

cd <code_dir>/producer //maven project location
mvn clean package

//Consumer application

cd <code_dir>/producer //maven project location
mvn clean package


The output of the build process is the respective ZIP files for producer ( and consumer ( microservices respectively


Upload & deploy

You would need to upload the ZIP file to Oracle Storage Cloud and then reference it in the subsequent steps. Here are the required the cURL commands


Create a container in Oracle Storage cloud (if it doesn't already exist)  
e.g. curl -X PUT –u jdoe:foobar ""  
Upload your zip file into the container (zip file is nothing but a Storage Cloud object)  
curl -X PUT -u <USER_ID>:<USER_PASSWORD> <STORAGE_CLOUD_CONTAINER_URL> -T <zip_file> "<storage_cloud_object_URL>" //template  
e.g. curl -X PUT –u jdoe:foobar -T ""



Repeat the same for the producer microservice


You can now deploy your application to Application Container Cloud using its REST API. The Oracle Storage cloud path (used above) will be referenced while using the Application Container Cloud REST API (used for deployment). Here is a sample cURL command which makes use of the REST API


curl -X POST -u \    
-H "X-ID-TENANT-NAME:domain007" \    
-H "Content-Type: multipart/form-data" -F "name=kstreams" \    
-F "runtime=java" -F "subscription=Monthly" \    
-F "deployment=@deployment.json" \    
-F "archiveURL=accs-kstreams-consumer/" \    
-F "notes=notes for deployment" \  



  • the name attribute used in the curl command should be the same as the APP_NAME attribute used in the manifest.json
  • Repeat the same for the producer microservice


Post deployment

(the consumer application has been highlighted below)


The Applications console



The Overview sub-section




The Deployments sub-section





Assuming your Kakfa broker is up and running and you have deployed the application successfully, execute the below mentioned steps to test drive your application


Start the producer

Trigger your producer application by issuing a HTTP GET https://my-producer-app-url/producer e.g. This will start producing (random) CPU metrics for a bunch of (10) machines



You can stop the producer by issuing a HTTP DELETE on the same URL



Check the statistics


Cumulative moving average of all machines

Allow the producer to run for a 30-40 seconds and then check the current statistics. Issue a HTTP GET request to your consumer application e.g. You’ll see a response payload similar to what’s depicted below




The information in the payload is as following

  • cpu: the cumulative average of the CPU usage of a machine
  • machine: the machine ID
  • source: this has been purposefully added as a diagnostic information to see which node (instance in the Application Container Cloud) is handling the calculation for a specific machine (this is subject to change as your application scales up/down)


Cumulative moving average of a specific machine




Scale your application

Increase the number of instances of your application (from 2 to 3)




Check the stats again and you’ll notice that the computation task is being shared among three nodes now..


That’s all for this blog series.. !


**The views expressed in this post are my own and do not necessarily reflect the views of Oracle