The underlying theme of this blog continues to remain the same as one of my previous blogs i.e. scalable stream processing microservices on Oracle Cloud. But there are significant changes & additions

 

  • Docker: we will package the Kafka Streams based consumer application as a Docker image
  • Oracle Container Cloud: our containerized application will run and scale on Oracle Container Cloud
  • Service discovery: application is revamped to leverage the service discovery capabilities within Oracle Container Cloud

 

 

Technical Components

 

Here is a quick summary of the technologies used

 

  • Oracle Container Cloud: An enterprise grade platform to compose, deploy and orchestrate Docker containers
  • Docker needs no introduction
  • Apache Kafka: A scalable, distributed pub-sub message hub
  • Kafka Streams: A library for building stream processing applications on top of Apache Kafka
  • Jersey: Used to implement REST and SSE services. Uses Grizzly as a (pluggable) runtime/container
  • Maven: Used as the standard Java build tool (along with its assembly plugin)

 

Sample application

 

By and large, the sample application remains the same and its details can be referred here. Here is a quick summary

 

  • The components: a Kafka broker, a producer application and a consumer (Kafka Streams based) stream processing application
  • Changes (as compared to the setup here): the consumer application will now run on Oracle Container Cloud and the application instance discovery semantics (which were earlier Oracle Application Container Cloud specific) have now been implemented on top of Oracle Container Cloud service discovery capability

 

Architecture

 

To get an idea of the key concepts, I would recommend going through this section of the High level architecture section of one of the previous blogs . Here is a diagram representing the overall runtime view of the system

 

 

It's key takeaway are as follows

 

  • Oracle Container Cloud will host our containerized stream processing (Kafka consumer) applications
  • We will use its elastic scalability features to spin additional containers on-demand to distribute the processing load
  • The contents of the topic partitions in Kafka broker (marked as P1, P2, P3) will be distributed among the application instances

 

Please note that having more application instances than topic partitions will mean that some of your instances will be idle (no processing). It is generally recommended to set the number of topic partitions to a relatively high number (e.g. 50) in order to reap maximum benefit from Kafka

 

Code

 

You can refer to this section in the previous blog for code related details (since the bulk of the logic is the same). The logic for service discovery part (which is covered in-depth below) is the major difference (since it relies on Oracle Container Cloud KV store for runtime information). Here is the relevant snippet for the same

 

/**
     * find yourself in the cloud!
     *
     * @return my port
     */
    public static String getSelfPortForDiscovery() {
        String containerID = System.getProperty("CONTAINER_ID", "container_id_not_found");
        //String containerID = Optional.ofNullable(System.getenv("CONTAINER_ID")).orElse("container_id_not_found");
        LOGGER.log(Level.INFO, " containerID {0}", containerID);


        String sd_key_part = Optional.ofNullable(System.getenv("SELF_KEY")).orElse("sd_key_not_found");
        LOGGER.log(Level.INFO, " sd_key_part {0}", sd_key_part);


        String sd_key = sd_key_part + "/" + containerID;
        LOGGER.log(Level.INFO, " SD Key {0}", sd_key);


        String sd_base_url = "172.17.0.1:9109/api/kv";


        String fullSDUrl = "http://" + sd_base_url + "/" + sd_key + "?raw=true";
        LOGGER.log(Level.INFO, " fullSDUrl {0}", fullSDUrl);


        String hostPort = getRESTClient().target(fullSDUrl)
                .request()
                .get(String.class);


        LOGGER.log(Level.INFO, " hostPort {0}", hostPort);
        
        String port = hostPort.split(":")[1];
        LOGGER.log(Level.INFO, " Auto port {0}", port);


        return port;
    }

Kafka setup

 

On Oracle Compute Cloud

 

You can refer to part I of the blog for the Apache Kafka related setup on Oracle Compute. The only additional step which needs to be executed is opening of the port on which your Zookeeper process is listening (its 2181 by default) –as this is required by the Kafka Streams library configuration. While executing the steps from section Open Kafka listener port section, ensure that you include the Oracle Compute Cloud configuration for 2181 (in addition to the Kafka broker port 9092)

 

On Oracle Container Cloud!

 

You can run a Kafka cluster on Oracle Container Cloud – check out this cool blog post !

 

The Event Hub Cloud is a new offering which provides Apache Kafka as a managed service in Oracle Cloud

 

Configuring our application to run on Oracle Container Cloud

 

Build the application

 

Execute mvn clean package to build the application JAR

 

Push to Docker Hub

 

Create a Docker Hub account if you don't have one already. To build and push the Docker image, execute the below commands

 

Please ensure that Docker engine is up and running

 

docker login
docker build –t <registry>/<image_name>:<tag> . e.g. docker build –t abhirockzz/kafka-streams:latest .
docker push <registry>/<image_name>:<tag> e.g. docker push abhirockzz/kafka-streams:latest

 

Check your Docker Hub account to confirm that the image exists there

 

 

Create the Service

 

To create a new Service, click on New Service in the Services menu

 

 

There are multiple ways in which you can configure your service – one of which is the traditional way of filling in each of the attributes in the Service Editor. You can also directly enter the Docker run command or a YAML configuration (similar to docker-compose) and Oracle Container Cloud will automatically populate the Service details. Let’s see the YAML based method in action

 

 

Populate the YAML editor (highlighted above) with the required configuration

 

version: 2
services:
  kstreams:
    image: "<docker hub image e.g. abhirockzz/kafka-streams>"
    environment:
      - "KAFKA_BROKER=<kafka broker host:port e.g. 149.007.42.007:9092>"
      - "ZOOKEEPER=<zookeeper host:port e.g. 149.007.42.007:2181>"
      - "SELF_KEY={{ sd_deployment_containers_path .ServiceID 8080 }}"
      - "OCCS_HOST={{hostip_for_interface .HostIPs \"public_ip\"}}"
      - "occs:scheduler=random"
    ports:
      - 8080/tcp

Please make sure that you substitute the host:port for your Kafka broker and Zookeeper server in the yaml configuration file

 

 

If you switch to the Builder view, notice that all the values have already been populated

 

 

All you need to do is fill out the Service Name and (optionally) choose the Scheduler and Availability properties and click Save to finish the Service creation

 

 

You should see your newly created service in the list of service in the Services menu

 

 

YAML configuration details

 

Here is an overview of the configuration parameters

 

  • Image: Name of the application image on Docker Hub
  • Environment variables
    • KAFKA_BROKER: the host and port information to connect to the Kafka broker

    • ZOOKEEPER: the host and port information to connect to the Zookeeper server (for the Kafka broker)
    • SELF_KEY & OCCS_HOST: these are defined as templates functions (more details on this in a moment) and help with dynamic container discovery
  • Ports: Our application is configured to run on port 8080 i.e. this is specified within the code itself. This is not a problem since we have configured a random (auto generated) port on the host (worker node of Oracle Container Cloud) to map to 8080

 

This is equivalent to using the –P option in docker run command

 

Template functions and Service discovery

 

We used the following template functions within the environment variables of our YAML file

 

Environment variable

Template function

 

 

SELF_KEY

{{ sd_deployment_containers_path .ServiceID 8080 }}

OCCS_HOST

{{hostip_for_interface .HostIPs \"public_ip\"}}

 

What are templates*?

Template arguments provide access to deployment properties related to your services (or stacks) and template functions allow you to utilize them at runtime (in a programmatic fashion). More details in the documentation

 

Why do we need them?

Within our application, each Kafka Streams consumer application instance needs register to its co-ordinates in the Streams configuration (using the application.server parameter). This in turn allows Kafka Streams to store this as a metadata which can then be used at runtime. Here are some excerpts from the code

 

Seeding discovery info

 

Map<String, Object> configurations = new HashMap<>();
String streamsAppServerConfig = GlobalAppState.getInstance().getHostPortInfo().host() + ":"
                + GlobalAppState.getInstance().getHostPortInfo().port();
 configurations.put(StreamsConfig.APPLICATION_SERVER_CONFIG, streamsAppServerConfig);

 

Using the info

 

Collection<StreamsMetadata> storeMetadata = ks.allMetadataForStore(storeName);
StreamsMetadata metadataForMachine = ks.metadataForKey(storeName, machine, new StringSerializer());

 

How is this achieved?

 

For the application.server parameter, we need the host and port of the container instance in Oracle Container Cloud. The OCCS_HOST environment variable is populated automatically by the evaluation of the template function {{hostip_for_interface .HostIPs \"public_ip\"}} – this is the public IP of the Oracle Container Cloud host and takes care of ‘host’ part of the application.server configuration. The port determination needs more work since we have configured port 8080 to be mapped with a random port on Oracle Container Cloud host/worker node. The inbuilt discovery service mechanism within Oracle Container cloud made it possible to implement this.

 

The internal service discovery database is exposed via a REST API for external clients. But it can be accessed internally (by applications) on 172.17.0.1:9109. It exposes the host and port (of a Docker container) information in a key-value format

 

 

Key points to be noted in the above image

  • The part highlighted in red is the value which is the host and port information
  • The part highlighted in green is a portion of the key, which is the (dynamic) Docker container ID
  • The remaining portion of the key is also dynamic, but can be evaluated with the help of a template function

 

The trick is to build the above key and then use that to query the discovery service to get the value (host and port details). This is where the SELF_KEY environment variable comes into play. It uses the {{ sd_deployment_containers_path .ServiceID 8080 }} (where 8080 is the exposed and mapped application port) template function which gets evaluated at runtime. This gives us a part of the key i.e. (as per above example) apps/kstreams-kstreams-20170315-080407-8080/containers

 

The SELF_KEY environment variable is concatenated with the Docker container ID (which is a random UUID) evaluated during container startup within the init.sh script i.e. (in the above example) 3a52….. This completes our key using which we can query the service discovery store.

 

#!/bin/sh

export CONTAINER_ID=$(cat /proc/self/cgroup | grep 'cpu:/' | sed -r 's/[0-9]+:cpu:.docker.//g')
echo $CONTAINER_ID
java -jar -DCONTAINER_ID=$CONTAINER_ID occ-kafka-streams-1.0.jar

 

 

Both SELF_KEY and OCCS_HOST environment variables are used within the internal logic of the Kafka consumer application. The Oracle Container Cloud service discovery store is invoked (using its REST API) at container startup using the complete URL – http://172.17.0.1:9109/api/kv/<SELF_KEY>/<CONTAINER_ID>

 

See it in action via this code snippet

 

String containerID = System.getProperty("CONTAINER_ID", "container_id_not_found");
String sd_key_part = Optional.ofNullable(System.getenv("SELF_KEY")).orElse("sd_key_not_found");
String sd_key = sd_key_part + "/" + containerID;
String sd_base_url = "172.17.0.1:9109/api/kv";
String fullSDUrl = "http://" + sd_base_url + "/" + sd_key + "?raw=true";
String hostPort = getRESTClient().target(fullSDUrl).request().get(String.class);        
String port = hostPort.split(":")[1];

 

Initiate Deployment

 

Start Kafka broker first

 

 

Click on the Deploy button to start the deployment. Accept the defaults (for this time) and click Deploy

 

 

 

You will be lead into the Deployments screen. Wait for a few seconds for the process to finish

 

 

 

Dive into the container details

 

Click on the Container Name (highlighted). You will lead to the container specific details page

 

 

Make a note of the following

 

Auto bound port

 

 

Environment variables (important ones have been highlighted)

 

Test

 

Assuming your Kakfa broker is up and running and you have deployed the application successfully, execute the below mentioned steps to test drive your application

 

Build & start the producer application

 

 

mvn clean package //Initiate the maven build 
cd target //Browse to the build director
java –jar –DKAFKA_CLUSTER=<kafka broker host:port> kafka-cpu-metrics-producer.jar //Start the application

 

The producer application will start sending data to the Kakfa broker

 

Check the statistics

 

Cumulative moving average of all machines

 

Allow the producer to run for a 30-40 seconds and then check the current statistics. Issue a HTTP GET request to your consumer application at http://OCCS_HOST:PORT/metrics e.g . http://120.33.42.007:37155/metrics. You’ll see a response payload similar to what’s depicted below

 

the output below has been truncated for the sake of brevity

 

 

The information in the payload is as following

  • cpu: the cumulative average of the CPU usage of a machine
  • machine: the machine ID
  • source: this has been purposefully added as a diagnostic information to see which node (Docker container in Oracle Container Cloud) is handling the calculation for a specific machine (this is subject to change as your application scales up/down)

 

Cumulative moving average of a specific machine

 

Issue a HTTP GET request to your consumer application at http://OCCS_HOST:PORT/metrics/<machine-ID> e.g.  http://120.33.42.007:37155/metrics/machine-1

 

 

 

Scale up… and down

 

Oracle Container Cloud enables your application to remain elastic i.e. scale out or scale in on-demand. The process is simple – let’s see how it works for this application. Choose your deployment from the Deployments menu and click Change Scaling. We are bumping up to 3 instances now

 

 

After sometime, you’ll have three containers running separate instances of your Kafka Streams application

 

 

 

The cpu metrics computation task will now be shared amongst three nodes now. You can check the logs of the old and new container logs to confirm this.

 

 

In the old container, Kafka streams will close the existing processing tasks in order to re-distribute them to the new nodes. On checking the logs, you will see something similar to the below output

 

 

 

In the new containers, you will see Processor Initialized output, as a result of tasks being handed to these nodes. Now you can check the metrics using any of the three instances (check the auto bound port for the new containers). You can spot the exact node which has calculated the metric (notice the different port number). See snippet below

 

 

 

Scale down: You can scale down the number of instances using the same set of step and Kafka Streams will take care re-balancing the tasks among the remaining nodes

 

Note on Dynamic load balancing

 

In a production setup, one would want to load balance the consumer microservices by using haproxy, ngnix etc. (in this example one had to inspect each application instance by using the auto bound port information). This might be covered in a future blog post. Oracle Container Cloud provides you the ability to easily build such a coordinated set of services using Stacks and ships with some example stacks for reference purposes

 

That’s all for this blog post.... Cheers!

 

The views expressed in this post are my own and do not necessarily reflect the views of Oracle.