Skip navigation

This blog introduces you to building event-driven microservices application using CQRS and Event sourcing patterns. Following is a brief definition of the concepts that would be discussed during the course of this blog, more details about these can be obtained from the resources provided at the end of this blog.

 

What is a Microservice?

While there is no single definition for this architectural style, Adrian Cockcroft defines microservices architecture as a service-oriented architecture composed of loosely coupled elements that have bounded contexts.

 

What is a Bounded Context?

A Bounded Context is a concept that encapsulates the details of a single domain, such as domain model, data model, application services, etc., and defines the integration points with other bounded contexts/domains.

 

What is CQRS?

Command Query Responsibility Segregation (CQRS) is an architectural pattern that segregates the domain operations into two categories – Queries and Commands. While queries just return some results without making any state changes, commands are the operations which change the state of the domain model.

 

Why CQRS?

During the lifecycle of an application, it is common that the logical model becomes more complicated and structured that could impact the user experience, which must be independent of the core system.
In order to have a scalable and easy to maintain application we need to reduce the constraints between the read model and the write model. Some reasons for keeping reads and writes apart could be:

  • Scalability (read exceeds the write, so does the scaling requirements for each differs and can be addressed better)
  • Flexibility (separate read / write models)
  • Reduced Complexity (shifting complexity into separate concerns)

 

What is Event sourcing?

Event sourcing achieves atomicity by using a different, event-centric approach to persisting business entities. Instead of storing the current state of an entity, the application stores a sequence of ‘events’ that changed the entity’s state. The application can reconstruct an entity’s current state by replaying the events. Since saving an event is a single operation, it is inherently atomic and does not require 2PC (2-phase commit) which is typically associated with distributed transactions.

 
Overview

This blog explains how CQRS and Event Sourcing patterns can be applied to develop a simple microservice application that consists of a single bounded context called “Cart” with add, remove and read operations. The sample does not have any functional significance but should be good enough to understand the underlying patterns and their implementations.  The following diagram depicts a high level flow of activities when using CQRS and Event sourcing patterns to build applications:

 

cqrs-es_1.jpg

Figure 1 CQRS and Event sourcing

 

The sample referred in this blog uses the following technology stack:

  • Spring Boot for building and packaging the application
  • Axon framework with Spring for CQRS and Event Sourcing. Axon is an open source CQRS framework for Java which provides implementations of the most important building blocks, such as aggregates, repositories and event buses that help us build applications using CQRS and Event sourcing architectural patterns. It also allows you to provide your own implementation of the above mentioned building blocks.
  • Oracle Application Container cloud for application deployment

With this background, let us start building the sample.

 

Identify Aggregate Root

First step is to identify the bounded context and domain entities in the bounded context. This will help us define the Aggregate Root (for example, an ‘account’, an ‘order’…etc.). An aggregate is an entity or group of entities that is always kept in a consistent state. The aggregate root is the object on top of the aggregate tree that is responsible for maintaining this consistent state.

To keep things simple for this blog, we consider ‘Cart’ as the only Aggregate Root in the domain model. Just like the usual shopping cart, the items in the cart are adjusted based on the additions or removals happening on that cart.

 

Define Commands

This aggregate root has 2 commands associated with it:

  • Add to Cart Command – Modeled by AddToCartCommand class
  • Remove from Cart Command – Modeled by RemoveFromCartCommand class
publicclass AddToCartCommand {

    private final String cartId;
    private final int item;

    public AddToCartCommand(String cartId, int item) {
        this.cartId = cartId;
        this.item = item;
    }

    public String getCartId() {
        return cartId;
    }

    public int getItem() {
        return item;
    }
}

public class RemoveFromCartCommand {

 private final String cartId;
 private final int item;

 public RemoveFromCartCommand(String cartId, int item) {
      this.cartId = cartId;
      this.item = item;
 }

 public String getCartId() {
      return cartId;
 }

 public int getItem() {
      return item;
 }
  }

 

As you notice, these commands are just POJOs used to capture the intent of what needs to happen within a system along with the necessary information that is required. Axon Framework does not require commands to implement any interface nor extend any class.

 

Define Command Handlers

A command is intended to have only one handler, the following classes represent the handlers for Add to Cart and Remove from Cart commands:

 

@Component
public class AddToCartCommandHandler {

 private Repository repository;

 @Autowired
 public AddToCartCommandHandler(Repository repository) {
      this.repository = repository;
 }

 @CommandHandler
 public void handle(AddToCartCommand addToCartCommand){
      Cart cartToBeAdded = (Cart) repository.load(addToCartCommand.getCartId());
      cartToBeAdded.addCart(addToCartCommand.getItem());
 }

}

@Component
public class RemoveFromCartHandler {

 private Repository repository;

 @Autowired
 public RemoveFromCartHandler(Repository repository) {
      this.repository = repository;
    }

 @CommandHandler
 public void handle(RemoveFromCartCommand removeFromCartCommand){
      Cart cartToBeRemoved = (Cart) repository.load(removeFromCartCommand.getCartId());
      cartToBeRemoved.removeCart(removeFromCartCommand.getItem());

 }
}

 

We use Axon with Spring framework, so the Spring beans defined above have methods annotated with @CommandHandler which makes them as command handlers. @Component annotation ensures that these beans are scanned during application startup and any auto wired resources are injected into this bean. Instead of accessing the Aggregates directly, Repository which is a domain object in Axon framework abstracts retrieving and persisting of aggregates.

 

Application Startup

Following is the AppConfiguration class which is a Spring configuration class that gets initialized upon application deployment and creates the components required for implementing the patterns.

 

@Configuration
@AnnotationDriven
public class AppConfiguration {

 @Bean
 public DataSource dataSource() {
      return DataSourceBuilder
                .create()
                .username("sa")
                .password("")
                .url("jdbc:h2:mem:axonappdb")
                .driverClassName("org.h2.Driver")
                .build();
 }

 /**
 * Event store to store events
 */
 @Bean
 public EventStore jdbcEventStore() {
      return new JdbcEventStore(dataSource());
 }

 @Bean
 public SimpleCommandBus commandBus() {
      SimpleCommandBus simpleCommandBus = new SimpleCommandBus();
      return simpleCommandBus;
 }

 /**
 *  Cluster event handlers that listens to events thrown in the application.
 */
 @Bean
 public Cluster normalCluster() {
      SimpleCluster simpleCluster = new SimpleCluster("simpleCluster");
      return simpleCluster;
 }


 /**
 * This configuration registers event handlers with defined clusters
 */
 @Bean
 public ClusterSelector clusterSelector() {
      Map<String, Cluster> clusterMap = new HashMap<>();
      clusterMap.put("msacqrses.eventhandler", normalCluster());
      return new ClassNamePrefixClusterSelector(clusterMap);
 }

 /**
*The clustering event bus is needed to route events to event handlers in the clusters. 
 */
 @Bean
 public EventBus clusteringEventBus() {
     ClusteringEventBus clusteringEventBus = new ClusteringEventBus(clusterSelector(), terminal());

     return clusteringEventBus;
 }

 /**
 * Event Bus Terminal publishes domain events to the cluster
 *
 */
 @Bean
 public EventBusTerminal terminal() {
      return new EventBusTerminal() {
            @Override
            public void publish(EventMessage... events) {
                normalCluster().publish(events);
 }
            @Override
            public void onClusterCreated(Cluster cluster) {

            }
 };
 }

 /**
 * Command gateway through which all commands in the application are submitted
 *
 */

 @Bean
 public DefaultCommandGateway commandGateway() {
      return new DefaultCommandGateway(commandBus());
 }

 /**
* Event Repository that handles retrieving of entity from the stream of events.
 */
 @Bean
 public Repository<Cart> eventSourcingRepository() {
EventSourcingRepository eventSourcingRepository = new EventSourcingRepository(Cart.class, jdbcEventStore());
      eventSourcingRepository.setEventBus(clusteringEventBus());

     return eventSourcingRepository;
 }
}

 

Let us take a look at the key Axon provided infrastructure components that are initialized in this class:

 

Command bus

As represented in “Figure 1” above, command bus is the component that routes commands to their respective command handlers.  Axon Framework comes with different types of Command Bus out of the box that can be used to dispatch commands to command handlers. Please refer here for more details on Axon’s Command Bus implementations. In our example, we use SimpleCommandBus which is configured as a bean in Spring's application context.

 

Command Gateway

Command bus can directly send commands but it is usually recommended to use a command gateway. Using a command gateway allows developers to perform certain functionalities like intercepting commands, setting retry in failure scenarios…etc. In our example, we use Axon provided default which is DefaultCommandGateway that is configured as a Spring bean to send commands instead of directly using a command bus.

 

Event Bus

As depicted in “Figure 1”, the commands initiated on an Aggregate root are sent as events to the Event store where they get persisted. Event Bus is the infrastructure that routes events to event handlers. Event Bus may look similar to command bus from a message dispatching perspective but they vary fundamentally.

Command Bus works with commands that define what happen in the near future and there is only one command handler that interprets the command. However in case of Event Bus, it routes events and events define actions that happened in the past with zero or more event handlers for an event.

Axon defines multiple implementations of Event Bus, in our example we use ClusteringEventBus which is again wired up as a Spring bean. Please refer here for more details on Axon’s Event Bus implementations.

 

Event Store

We need to configure an event store as our repository will store domain events instead of the current state of our domain objects. Axon framework allows storing the events using multiple persistent mechanisms like JDBC, JPA, file system etc. In this example we use a JDBC event store.

 

Event Sourcing Repository

In our example, the aggregate root is not created from a representation in a persistent mechanism, instead is created from stream of events which is achieved through an Event sourcing repository. We configure the repository with the event bus that we defined earlier since it will be publishing the domain events.

 

Database

We use in memory database (h2) in our example as the data store. The Spring Boot’s application.properties contains the data source configuration settings:

# Datasource configuration
spring.datasource.url=jdbc:h2:mem:axonappdb
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.datasource.validation-query=SELECT 1;
spring.datasource.initial-size=2
spring.datasource.sql-script-encoding=UTF-8

spring.jpa.database=h2
spring.jpa.show-sql=true
spring.jpa.hibernate.ddl-auto=create

 

As mentioned above, this example uses a JDBC event store to store the domain events generated in the system. These events are stored in a default tables (part of Axon framework event infrastructure) specified by the Axon framework. We use the following startup class for creating the database tables required by this example:

 

@Component
public class Datastore {

 @Autowired
 @Qualifier("transactionManager")
 protected PlatformTransactionManager txManager;

 @Autowired
 private Repository repository;

 @Autowired
 private javax.sql.DataSource dataSource;
    // create two cart entries in the repository used for command processing 
 @PostConstruct
 private void init(){

 TransactionTemplate transactionTmp = new TransactionTemplate(txManager);
 transactionTmp.execute(new TransactionCallbackWithoutResult() {
            @Override
            protected void doInTransactionWithoutResult(TransactionStatus status) {
                UnitOfWork uow = DefaultUnitOfWork.startAndGet();
                repository.add(new Cart("cart1"));
                repository.add(new Cart("cart2"));
                uow.commit();
            }
 });

 // create a database table for querying and add two cart entries
 JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
 jdbcTemplate.execute("create table cartview (cartid VARCHAR , items NUMBER )");
 jdbcTemplate.update("insert into cartview (cartid, items) values (?, ?)", new Object[]{"cart1", 0});
 jdbcTemplate.update("insert into cartview (cartid, items) values (?, ?)", new Object[]{"cart2", 0});
 }

 

This startup class creates two cart entries in the repository used for command processing and creates a database table called “cartview” which is used for processing the queries.

 

A quick recap on what we did so far:

  • We have identified “Cart” as our Aggregate root and have defined commands and command handlers for adding and removing items from the Cart.
  • We have defined a startup class which initializes the infrastructure components required for CQRS and Event sourcing.
  • A startup class has also been defined to create the database tables and setup the data required by this sample.

 

Let us now look at our AggregateRoot - “Cart” which is defined as below:

 

Aggregate Root

 

publicclass Cart extends AbstractAnnotatedAggregateRoot {
 @AggregateIdentifier
 private String cartid;

 private int items;

 public Cart() {
 }

 public Cart(String cartId) {
      apply(new CartCreatedEvent(cartId));
 }

 @EventSourcingHandler
 public void applyCartCreation(CartCreatedEvent event) {
      this.cartid = event.getCartId();
      this.items = 0;
 }

 public void removeCart(int removeitem) {

 /**
* State is not directly changed, we instead apply event that specifies what happened. Events applied are stored.
*/
 if(this.items > removeitem && removeitem > 0)
      apply(new RemoveFromCartEvent(this.cartid, removeitem, this.items));

 }

  
   
 @EventSourcingHandler
 private void applyCartRemove(RemoveFromCartEvent event) {
 /**
* When events stored in the event store are applied on an Entity this method is 
* called. Once all the events in the event store are applied, it will bring the Cart * to the most recent state.
*/

      this.items -= event.getItemsRemoved();
 }

 public void addCart(int item) {
 /**
* State is not directly changed, we instead apply event that specifies what happened. Events applied are stored.
*/
 if(item > 0)    
      apply(new AddToCartEvent(this.cartid, item, this.items));
 }

 @EventSourcingHandler
 private void applyCartAdd(AddToCartEvent event) {
 /**
* When events stored in the event store are applied on an Entity this method is 
* called. Once all the events in the event store are applied, it will bring the 
* Cart to the most recent state.
*/

      this.items += event.getItemAdded();
 }

 public int getItems() {
      return items;
 }

 public void setIdentifier(String id) {
      this.cartid = id;
 }

 @Override
 public Object getIdentifier() {
      return cartid;
 }
}

 

Following are some key aspects of the above Aggregate Root definition:

  1. The @AggregateIdentifier is similar to @Id in JPA which marks the field that represents the entity’s identity.
  2. Domain driven design recommends domain entities to contain relevant business logic, hence the business methods in the above definition. Please refer to the “References” section for more details.
  3. When a command gets triggered, the domain object is retrieved from the repository and the respective method (say addCart) is invoked on that domain object (in this case “Cart”).
    1. The domain object instead of changing the state directly, applies the appropriate event.
    2. The event is stored in the event store and the respective handler gets triggered which makes the change to the domain object.
  4. Note that the “Cart” aggregate root is only used for updates (i.e. state change via commands). All the query requests are handled by a different database entity (will be discussed in coming sections).

 

Let us also look at the events and the event handlers that manage the domain events triggered from “Cart” entity:

 

Events

 

As mentioned in the previous sections, there are two commands that get triggered on the “Cart” entity – Add to Cart and Remove from Cart. These commands when executed on the Aggregate root will generate two events – AddToCartEvent and RemoveFromCartEvent which are listed below:

 

publicclass AddToCartEvent {

 private final String cartId;
 private final int itemAdded;
 private final int items;
 private final long timeStamp;

 public AddToCartEvent(String cartId, int itemAdded, int items) {
      this.cartId = cartId;
      this.itemAdded = itemAdded;
      this.items = items;
      ZoneId zoneId = ZoneId.systemDefault();
      this.timeStamp = LocalDateTime.now().atZone(zoneId).toEpochSecond();
 }

 public String getCartId() {
      return cartId;
 }

 public int getItemAdded() {
      return itemAdded;
 }

 public int getItems() {
      return items;
 }

 public long getTimeStamp() {
      return timeStamp;
 }
}

public class RemoveFromCartEvent {
 private final String cartId;
 private final int itemsRemoved;
 private final int items;
 private final long timeStamp;

 public RemoveFromCartEvent(String cartId, int itemsRemoved, int items) {
      this.cartId = cartId;
      this.itemsRemoved = itemsRemoved;
      this.items = items;
      ZoneId zoneId = ZoneId.systemDefault();
      this.timeStamp = LocalDateTime.now().atZone(zoneId).toEpochSecond();

 }

 public String getCartId() {
      return cartId;
 }

 public int getItemsRemoved() {
      return itemsRemoved;
 }

 public int getItems() {
      return items;
 }

 public long getTimeStamp() {
      return timeStamp;
 }
}

 

Event Handlers

 

The events described above would be handled by the following event handlers:

 

@Component
public class AddToCartEventHandler {

 @Autowired
 DataSource dataSource;

 @EventHandler
 public void handleAddToCartEvent(AddToCartEvent event, Message msg) {
      JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

      // Get current state from event
      String cartId = event.getCartId();
      int items = event.getItems();
      int itemToBeAdded = event.getItemAdded();
      int newItems = items + itemToBeAdded;


      //  Update cartview
      String updateQuery = "UPDATE cartview SET items = ? WHERE cartid = ?";
      jdbcTemplate.update(updateQuery, new Object[]{newItems, cartId});

 }
@Component
public class RemoveFromCartEventHandler {

 @Autowired
 DataSource dataSource;

 @EventHandler
 public void handleRemoveFromCartEvent(RemoveFromCartEvent event) {

      JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);

      // Get current state from event
      String cartId = event.getCartId();
      int items = event.getItems();
      int itemsToBeRemoved = event.getItemsRemoved();
      int newItems = items - itemsToBeRemoved;

      // Update cartview
      String update = "UPDATE cartview SET items = ? WHERE cartid = ?";
      jdbcTemplate.update(update, new Object[]{newItems, cartId});

 }
}

 

As you notice, the event handlers update the “cartview” database table which is used for querying the “Cart” entity. While the commands get executed on one domain, the query requests are serviced by a different domain there by achieving CQRS with Event sourcing.

 

Controllers

 

This example defines 2 Spring controller classes one each for updating and querying the Cart domain. These are REST endpoints that could be invoked from a browser as below:

 

http://<host>:<port>/add/cart/<noOfItems>

http://<host>:<port>/remove/cart/<noOfItems>

http://<host>:<port>/view

 

@RestController
public class CommandController {

 @Autowired
 private CommandGateway commandGateway;

 @RequestMapping("/remove/{cartId}/{item}")
 @Transactional
 public ResponseEntity doRemove(@PathVariable String cartId, @PathVariable int item) {
       RemoveFromCartCommand removeCartCommand = new RemoveFromCartCommand(cartId, item);
      commandGateway.send(removeCartCommand);

      return new ResponseEntity<>("Remove event generated. Status: "+ HttpStatus.OK, HttpStatus.OK);
 }

 @RequestMapping("/add/{cartId}/{item}")
 @Transactional
 public ResponseEntity doAdd(@PathVariable String cartId, @PathVariable int item) {

      AddToCartCommand addCartCommand = new AddToCartCommand(cartId, item);
      commandGateway.send(addCartCommand);

     return new ResponseEntity<>("Add event generated. Status: "+ HttpStatus.OK, HttpStatus.OK);
 }


}

@RestController
public class ViewController {

 @Autowired
 private DataSource dataSource;

@RequestMapping(value = "/view", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
 public ResponseEntity getItems() {

      JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
      List<Map<String, Integer>> queryResult = jdbcTemplate.query("SELECT * from cartview ORDER BY cartid", (rs, rowNum) -> {

      return new HashMap<String, Integer>() {{
                put(rs.getString("CARTID"), rs.getInt("ITEMS"));
            }};
 });

      if (queryResult.size() > 0) {
        return new ResponseEntity<>(queryResult, HttpStatus.OK);
      } else {
        return new ResponseEntity<>(null, HttpStatus.NOT_FOUND);
      }

 }

}

 
Deployment

 

We use Spring Boot to package and deploy the application as a runnable jar into Oracle Application Container cloud. The following Spring Boot class initializes the application:

 

@SpringBootApplication

public class AxonApp {

 // Get PORT and HOST from Environment or set default
 public static final Optional<String> host;
 public static final Optional<String> port;
 public static final Properties myProps = new Properties();

 static {
      host = Optional.ofNullable(System.getenv("HOSTNAME"));
      port = Optional.ofNullable(System.getenv("PORT"));
 }

 public static void main(String[] args) {
      // Set properties
      myProps.setProperty("server.address", host.orElse("localhost"));
      myProps.setProperty("server.port", port.orElse("8128"));

      SpringApplication app = new SpringApplication(AxonApp.class);
      app.setDefaultProperties(myProps);
      app.run(args);

 }
}

Create an xml file with the following content and place it in the same directory as the pom.xml. This specifies the deployment assembly of the application being deployed to Oracle Application Container Cloud.

 

<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 http://maven.apache.org/xsd/assembly-1.1.3.xsd">
 <id>dist</id>
 <formats>
 <format>zip</format>
 </formats>
 <includeBaseDirectory>false</includeBaseDirectory>
 <files>
 <file>
            <source>manifest.json</source>
            <outputDirectory></outputDirectory>
 </file>
 </files>
 <fileSets>
 <fileSet>
            <directory>${project.build.directory}</directory>
            <outputDirectory></outputDirectory>
            <includes>
                <include>${project.artifactId}-${project.version}.jar</include>
            </includes>
 </fileSet>
 </fileSets>
</assembly>

 

To let Application Container cloud know the jar to run once the application is deployed, you need to create a “manifest.json” file specifying the jar name as shown below:

{
 "runtime": {
 "majorVersion": "8"
    },
 "command": "java -jar AxonApp-0.0.1-SNAPSHOT.jar",
 "release": {},
 "notes": "Axon Spring Boot App"
}

 

The following diagram depicts the project structure of this sample:

prjstructure.png

Figure 2 Project Structure

 

The application jar file along with the above manifest file should be archived to zip and uploaded into Application Container cloud for deployment. Please refer here for more details on deploying Spring Boot Application in Application Container cloud.

 

Once the application is successfully deployed, you would be able to access the following URLs to trigger the services on the Cart:

http://<host>:<port>/view

http://<host>:<port>/add/cart/<noOfItems>

http://<host>:<port>/remove/cart/<noOfItems>

 

When you first hit the “view” REST endpoint, you can see the 2 carts that we added in our startup class with number of items added to them. You can add or remove items from the Cart using the other two REST calls and can retrieve the updated item count using the “view” REST call. The result from the above REST invocations is a simple JSON structure displaying the Carts and the no of items in the Cart at a given point of time.

 

Conclusion

 

This blog is restricted to introduce you to developing microservice applications using CQRS and Event sourcing patterns. You can refer to the following resources to know more about other advanced concepts and recent updates in this space.

 

References

 

The views expressed in this post are my own and do not necessarily reflect the views of Oracle.

The underlying theme of this blog continues to remain the same as one of my previous blogs i.e. scalable stream processing microservices on Oracle Cloud. But there are significant changes & additions

 

  • Docker: we will package the Kafka Streams based consumer application as a Docker image
  • Oracle Container Cloud: our containerized application will run and scale on Oracle Container Cloud
  • Service discovery: application is revamped to leverage the service discovery capabilities within Oracle Container Cloud

 

 

Technical Components

 

Here is a quick summary of the technologies used

 

  • Oracle Container Cloud: An enterprise grade platform to compose, deploy and orchestrate Docker containers
  • Docker needs no introduction
  • Apache Kafka: A scalable, distributed pub-sub message hub
  • Kafka Streams: A library for building stream processing applications on top of Apache Kafka
  • Jersey: Used to implement REST and SSE services. Uses Grizzly as a (pluggable) runtime/container
  • Maven: Used as the standard Java build tool (along with its assembly plugin)

 

Sample application

 

By and large, the sample application remains the same and its details can be referred here. Here is a quick summary

 

  • The components: a Kafka broker, a producer application and a consumer (Kafka Streams based) stream processing application
  • Changes (as compared to the setup here): the consumer application will now run on Oracle Container Cloud and the application instance discovery semantics (which were earlier Oracle Application Container Cloud specific) have now been implemented on top of Oracle Container Cloud service discovery capability

 

Architecture

 

To get an idea of the key concepts, I would recommend going through this section of the High level architecture section of one of the previous blogs . Here is a diagram representing the overall runtime view of the system

 

 

It's key takeaway are as follows

 

  • Oracle Container Cloud will host our containerized stream processing (Kafka consumer) applications
  • We will use its elastic scalability features to spin additional containers on-demand to distribute the processing load
  • The contents of the topic partitions in Kafka broker (marked as P1, P2, P3) will be distributed among the application instances

 

Please note that having more application instances than topic partitions will mean that some of your instances will be idle (no processing). It is generally recommended to set the number of topic partitions to a relatively high number (e.g. 50) in order to reap maximum benefit from Kafka

 

Code

 

You can refer to this section in the previous blog for code related details (since the bulk of the logic is the same). The logic for service discovery part (which is covered in-depth below) is the major difference (since it relies on Oracle Container Cloud KV store for runtime information). Here is the relevant snippet for the same

 

/**
     * find yourself in the cloud!
     *
     * @return my port
     */
    public static String getSelfPortForDiscovery() {
        String containerID = System.getProperty("CONTAINER_ID", "container_id_not_found");
        //String containerID = Optional.ofNullable(System.getenv("CONTAINER_ID")).orElse("container_id_not_found");
        LOGGER.log(Level.INFO, " containerID {0}", containerID);


        String sd_key_part = Optional.ofNullable(System.getenv("SELF_KEY")).orElse("sd_key_not_found");
        LOGGER.log(Level.INFO, " sd_key_part {0}", sd_key_part);


        String sd_key = sd_key_part + "/" + containerID;
        LOGGER.log(Level.INFO, " SD Key {0}", sd_key);


        String sd_base_url = "172.17.0.1:9109/api/kv";


        String fullSDUrl = "http://" + sd_base_url + "/" + sd_key + "?raw=true";
        LOGGER.log(Level.INFO, " fullSDUrl {0}", fullSDUrl);


        String hostPort = getRESTClient().target(fullSDUrl)
                .request()
                .get(String.class);


        LOGGER.log(Level.INFO, " hostPort {0}", hostPort);
        
        String port = hostPort.split(":")[1];
        LOGGER.log(Level.INFO, " Auto port {0}", port);


        return port;
    }

Kafka setup

 

On Oracle Compute Cloud

 

You can refer to part I of the blog for the Apache Kafka related setup on Oracle Compute. The only additional step which needs to be executed is opening of the port on which your Zookeeper process is listening (its 2181 by default) –as this is required by the Kafka Streams library configuration. While executing the steps from section Open Kafka listener port section, ensure that you include the Oracle Compute Cloud configuration for 2181 (in addition to the Kafka broker port 9092)

 

On Oracle Container Cloud!

 

You can run a Kafka cluster on Oracle Container Cloud – check out this cool blog post !

 

The Event Hub Cloud is a new offering which provides Apache Kafka as a managed service in Oracle Cloud

 

Configuring our application to run on Oracle Container Cloud

 

Build the application

 

Execute mvn clean package to build the application JAR

 

Push to Docker Hub

 

Create a Docker Hub account if you don't have one already. To build and push the Docker image, execute the below commands

 

Please ensure that Docker engine is up and running

 

docker login
docker build –t <registry>/<image_name>:<tag> . e.g. docker build –t abhirockzz/kafka-streams:latest .
docker push <registry>/<image_name>:<tag> e.g. docker push abhirockzz/kafka-streams:latest

 

Check your Docker Hub account to confirm that the image exists there

 

 

Create the Service

 

To create a new Service, click on New Service in the Services menu

 

 

There are multiple ways in which you can configure your service – one of which is the traditional way of filling in each of the attributes in the Service Editor. You can also directly enter the Docker run command or a YAML configuration (similar to docker-compose) and Oracle Container Cloud will automatically populate the Service details. Let’s see the YAML based method in action

 

 

Populate the YAML editor (highlighted above) with the required configuration

 

version: 2
services:
  kstreams:
    image: "<docker hub image e.g. abhirockzz/kafka-streams>"
    environment:
      - "KAFKA_BROKER=<kafka broker host:port e.g. 149.007.42.007:9092>"
      - "ZOOKEEPER=<zookeeper host:port e.g. 149.007.42.007:2181>"
      - "SELF_KEY={{ sd_deployment_containers_path .ServiceID 8080 }}"
      - "OCCS_HOST={{hostip_for_interface .HostIPs \"public_ip\"}}"
      - "occs:scheduler=random"
    ports:
      - 8080/tcp

Please make sure that you substitute the host:port for your Kafka broker and Zookeeper server in the yaml configuration file

 

 

If you switch to the Builder view, notice that all the values have already been populated

 

 

All you need to do is fill out the Service Name and (optionally) choose the Scheduler and Availability properties and click Save to finish the Service creation

 

 

You should see your newly created service in the list of service in the Services menu

 

 

YAML configuration details

 

Here is an overview of the configuration parameters

 

  • Image: Name of the application image on Docker Hub
  • Environment variables
    • KAFKA_BROKER: the host and port information to connect to the Kafka broker

    • ZOOKEEPER: the host and port information to connect to the Zookeeper server (for the Kafka broker)
    • SELF_KEY & OCCS_HOST: these are defined as templates functions (more details on this in a moment) and help with dynamic container discovery
  • Ports: Our application is configured to run on port 8080 i.e. this is specified within the code itself. This is not a problem since we have configured a random (auto generated) port on the host (worker node of Oracle Container Cloud) to map to 8080

 

This is equivalent to using the –P option in docker run command

 

Template functions and Service discovery

 

We used the following template functions within the environment variables of our YAML file

 

Environment variable

Template function

 

 

SELF_KEY

{{ sd_deployment_containers_path .ServiceID 8080 }}

OCCS_HOST

{{hostip_for_interface .HostIPs \"public_ip\"}}

 

What are templates*?

Template arguments provide access to deployment properties related to your services (or stacks) and template functions allow you to utilize them at runtime (in a programmatic fashion). More details in the documentation

 

Why do we need them?

Within our application, each Kafka Streams consumer application instance needs register to its co-ordinates in the Streams configuration (using the application.server parameter). This in turn allows Kafka Streams to store this as a metadata which can then be used at runtime. Here are some excerpts from the code

 

Seeding discovery info

 

Map<String, Object> configurations = new HashMap<>();
String streamsAppServerConfig = GlobalAppState.getInstance().getHostPortInfo().host() + ":"
                + GlobalAppState.getInstance().getHostPortInfo().port();
 configurations.put(StreamsConfig.APPLICATION_SERVER_CONFIG, streamsAppServerConfig);

 

Using the info

 

Collection<StreamsMetadata> storeMetadata = ks.allMetadataForStore(storeName);
StreamsMetadata metadataForMachine = ks.metadataForKey(storeName, machine, new StringSerializer());

 

How is this achieved?

 

For the application.server parameter, we need the host and port of the container instance in Oracle Container Cloud. The OCCS_HOST environment variable is populated automatically by the evaluation of the template function {{hostip_for_interface .HostIPs \"public_ip\"}} – this is the public IP of the Oracle Container Cloud host and takes care of ‘host’ part of the application.server configuration. The port determination needs more work since we have configured port 8080 to be mapped with a random port on Oracle Container Cloud host/worker node. The inbuilt discovery service mechanism within Oracle Container cloud made it possible to implement this.

 

The internal service discovery database is exposed via a REST API for external clients. But it can be accessed internally (by applications) on 172.17.0.1:9109. It exposes the host and port (of a Docker container) information in a key-value format

 

 

Key points to be noted in the above image

  • The part highlighted in red is the value which is the host and port information
  • The part highlighted in green is a portion of the key, which is the (dynamic) Docker container ID
  • The remaining portion of the key is also dynamic, but can be evaluated with the help of a template function

 

The trick is to build the above key and then use that to query the discovery service to get the value (host and port details). This is where the SELF_KEY environment variable comes into play. It uses the {{ sd_deployment_containers_path .ServiceID 8080 }} (where 8080 is the exposed and mapped application port) template function which gets evaluated at runtime. This gives us a part of the key i.e. (as per above example) apps/kstreams-kstreams-20170315-080407-8080/containers

 

The SELF_KEY environment variable is concatenated with the Docker container ID (which is a random UUID) evaluated during container startup within the init.sh script i.e. (in the above example) 3a52….. This completes our key using which we can query the service discovery store.

 

#!/bin/sh

export CONTAINER_ID=$(cat /proc/self/cgroup | grep 'cpu:/' | sed -r 's/[0-9]+:cpu:.docker.//g')
echo $CONTAINER_ID
java -jar -DCONTAINER_ID=$CONTAINER_ID occ-kafka-streams-1.0.jar

 

 

Both SELF_KEY and OCCS_HOST environment variables are used within the internal logic of the Kafka consumer application. The Oracle Container Cloud service discovery store is invoked (using its REST API) at container startup using the complete URL – http://172.17.0.1:9109/api/kv/<SELF_KEY>/<CONTAINER_ID>

 

See it in action via this code snippet

 

String containerID = System.getProperty("CONTAINER_ID", "container_id_not_found");
String sd_key_part = Optional.ofNullable(System.getenv("SELF_KEY")).orElse("sd_key_not_found");
String sd_key = sd_key_part + "/" + containerID;
String sd_base_url = "172.17.0.1:9109/api/kv";
String fullSDUrl = "http://" + sd_base_url + "/" + sd_key + "?raw=true";
String hostPort = getRESTClient().target(fullSDUrl).request().get(String.class);        
String port = hostPort.split(":")[1];

 

Initiate Deployment

 

Start Kafka broker first

 

 

Click on the Deploy button to start the deployment. Accept the defaults (for this time) and click Deploy

 

 

 

You will be lead into the Deployments screen. Wait for a few seconds for the process to finish

 

 

 

Dive into the container details

 

Click on the Container Name (highlighted). You will lead to the container specific details page

 

 

Make a note of the following

 

Auto bound port

 

 

Environment variables (important ones have been highlighted)

 

Test

 

Assuming your Kakfa broker is up and running and you have deployed the application successfully, execute the below mentioned steps to test drive your application

 

Build & start the producer application

 

 

mvn clean package //Initiate the maven build 
cd target //Browse to the build director
java –jar –DKAFKA_CLUSTER=<kafka broker host:port> kafka-cpu-metrics-producer.jar //Start the application

 

The producer application will start sending data to the Kakfa broker

 

Check the statistics

 

Cumulative moving average of all machines

 

Allow the producer to run for a 30-40 seconds and then check the current statistics. Issue a HTTP GET request to your consumer application at http://OCCS_HOST:PORT/metrics e.g . http://120.33.42.007:37155/metrics. You’ll see a response payload similar to what’s depicted below

 

the output below has been truncated for the sake of brevity

 

 

The information in the payload is as following

  • cpu: the cumulative average of the CPU usage of a machine
  • machine: the machine ID
  • source: this has been purposefully added as a diagnostic information to see which node (Docker container in Oracle Container Cloud) is handling the calculation for a specific machine (this is subject to change as your application scales up/down)

 

Cumulative moving average of a specific machine

 

Issue a HTTP GET request to your consumer application at http://OCCS_HOST:PORT/metrics/<machine-ID> e.g.  http://120.33.42.007:37155/metrics/machine-1

 

 

 

Scale up… and down

 

Oracle Container Cloud enables your application to remain elastic i.e. scale out or scale in on-demand. The process is simple – let’s see how it works for this application. Choose your deployment from the Deployments menu and click Change Scaling. We are bumping up to 3 instances now

 

 

After sometime, you’ll have three containers running separate instances of your Kafka Streams application

 

 

 

The cpu metrics computation task will now be shared amongst three nodes now. You can check the logs of the old and new container logs to confirm this.

 

 

In the old container, Kafka streams will close the existing processing tasks in order to re-distribute them to the new nodes. On checking the logs, you will see something similar to the below output

 

 

 

In the new containers, you will see Processor Initialized output, as a result of tasks being handed to these nodes. Now you can check the metrics using any of the three instances (check the auto bound port for the new containers). You can spot the exact node which has calculated the metric (notice the different port number). See snippet below

 

 

 

Scale down: You can scale down the number of instances using the same set of step and Kafka Streams will take care re-balancing the tasks among the remaining nodes

 

Note on Dynamic load balancing

 

In a production setup, one would want to load balance the consumer microservices by using haproxy, ngnix etc. (in this example one had to inspect each application instance by using the auto bound port information). This might be covered in a future blog post. Oracle Container Cloud provides you the ability to easily build such a coordinated set of services using Stacks and ships with some example stacks for reference purposes

 

That’s all for this blog post.... Cheers!

 

The views expressed in this post are my own and do not necessarily reflect the views of Oracle.

This blog shows you how you can use Payara Micro to build a Java EE based microservice. It will leverage the following services from the Oracle Cloud (PaaS) stack

 

  • Developer Cloud service: to host the code (Git repo), provide Continuous Integration & Continuous Deployment capabilities (thanks to its integration with other Oracle PaaS services)
  • Application Container Cloud service: scalable aPaaS for running our Java EE microservice

 

 

Overview

 

Payara Micro?

Payara Micro is a Java EE based solution for building microservice style applications. Let’s expand on this a little bit

 

  • Java EE: Payara Micro supports the Java EE Web Profile standard along with additional support for other specifications which are not a part of the Web Profile (e.g. Batch, Concurrency Utilities etc.)
  • It’s a library: Available as a JAR file which encapsulates all these features

 

Development model

Payara Micro offers you the choice of multiple development styles…

 

  • WAR: package your Java EE application a WAR file and launch it with Payara Micro using java –jar payara-micro-<version>.jar --deploy mystocks.war
  • Embedded mode: because it’s a library, it can be embedded within your Java applications using its APIs
  • Uber JAR: Use the Payara Micro Maven support along with the exec plugin to package your WAR along with the Payara Micro library as a fat JAR

 

We will use the fat JAR technique in the sample application presented in the blog

 

Benefits

 

Some of the potential benefits are as follows

 

  • Microservices friendly: gives you the power of Java EE as a library, which can be easily used within applications, packaged in flexible manner (WAR + JAR or just a fat JAR) and run in multiple environments such as PaaS , container based platforms
  • Leverage Java EE skill set: continue using your expertise on Java EE specifications like JAX-RS, JPA, EJB, CDI etc.

 

About the sample application

 

It is a vanilla Java EE application which uses the following APIs – JAX-RS, EJB, CDI and WebSocket. It helps keep track of stock prices of NYSE scrips.

 

  • Users can check the stock price of a scrip (listed on NASDAQ) using a simple REST interface
  • Real time price tracking is also available – but this is only available for Oracle (ORCL)

 

Here is a high level diagram and some background context

 

  • an EJB scheduler fetches (ORCL) periodically fetches stock price, fires CDI events which are recevied by the WebSocket component (marked as a CDI event observer) and connected clients are updated with the latest price
  • the JAX-RS REST endpoint is used to fetch price for any stock on demand - this is a typical request-response based HTTP interaction as opposed to the bi-directional, full-duplex WebSocket interaction

 

 

 

 

Code

 

Let's briefly look at the relevant portions of the code (import statements omitted for brevity)

 

RealTimeStockTicker.java

 

@ServerEndpoint("/rt/stocks")
public class RealTimeStockTicker {


    //stores Session (s) a.k.a connected clients
    private static final List<Session> CLIENTS = new ArrayList<>();


    /**
     * Connection callback method. Stores connected client info
     *
     * @param s WebSocket session
     */
    @OnOpen
    public void open(Session s) {
        CLIENTS.add(s);
        Logger.getLogger(RealTimeStockTicker.class.getName()).log(Level.INFO, "Client connected -- {0}", s.getId());
    }


    /**
     * pushes stock prices asynchronously to ALL connected clients
     *
     * @param tickTock the stock price
     */
    public void broadcast(@Observes @StockDataEventQualifier String tickTock) {
        Logger.getLogger(RealTimeStockTicker.class.getName()).log(Level.INFO, "Event for Price {0}", tickTock);
        for (final Session s : CLIENTS) {
            if (s != null && s.isOpen()) {
                /**
                 * Asynchronous push
                 */
                s.getAsyncRemote().sendText(tickTock, new SendHandler() {
                    @Override
                    public void onResult(SendResult result) {
                        if (result.isOK()) {
                            Logger.getLogger(RealTimeStockTicker.class.getName()).log(Level.INFO, "Price sent to client {0}", s.getId());
                        } else {
                            Logger.getLogger(RealTimeStockTicker.class.getName()).log(Level.SEVERE, "Could not send price update to client " + s.getId(),
                                    result.getException());
                        }
                    }
                });
            }


        }


    }


    /**
     * Disconnection callback. Removes client (Session object) from internal
     * data store
     *
     * @param s WebSocket session
     */
    @OnClose
    public void close(Session s) {
        CLIENTS.remove(s);
        Logger.getLogger(RealTimeStockTicker.class.getName()).log(Level.INFO, "Client discconnected -- {0}", s.getId());
    }


}

 

 

StockDataEventQualifier.java

 

/**
 * Custom CDI qualifier to stamp CDI stock price CDI events
 * 
 */
@Qualifier
@Retention(RUNTIME)
@Target({METHOD, FIELD, PARAMETER, TYPE})
public @interface StockDataEventQualifier {
}

 

 

StockPriceScheduler.java

 

/**
 * Periodically polls the Google Finance REST endpoint using the JAX-RS client
 * API to pull stock prices and pushes them to connected WebSocket clients using
 * CDI events
 *
 */
@Singleton
@Startup
public class StockPriceScheduler {


    @Resource
    private TimerService ts;
    private Timer timer;


    /**
     * Sets up the EJB timer (polling job)
     */
    @PostConstruct
    public void init() {
        /**
         * fires 5 secs after creation
         * interval = 5 secs
         * non-persistent
         * no-additional (custom) info
         */
        timer = ts.createIntervalTimer(5000, 5000, new TimerConfig(null, false)); //trigger every 5 seconds
        Logger.getLogger(StockPriceScheduler.class.getName()).log(Level.INFO, "Timer initiated");
    }


    @Inject
    @StockDataEventQualifier
    private Event<String> msgEvent;


    /**
     * Implements the logic. Invoked by the container as per scheduled
     *
     * @param timer the EJB Timer object
     */
    @Timeout
    public void timeout(Timer timer) {
        Logger.getLogger(StockPriceScheduler.class.getName()).log(Level.INFO, "Timer fired at {0}", new Date());
        /**
         * Invoked asynchronously
         */
        Future<String> tickFuture = ClientBuilder.newClient().
                target("https://www.google.com/finance/info?q=NASDAQ:ORCL").
                request().buildGet().submit(String.class);


        /**
         * Extracting result immediately with a timeout (3 seconds) limit. This
         * is a workaround since we cannot impose timeouts for synchronous
         * invocations
         */
        String tick = null;
        try {
            tick = tickFuture.get(3, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException ex) {
            Logger.getLogger(StockPriceScheduler.class.getName()).log(Level.INFO, "GET timed out. Next iteration due on - {0}", timer.getNextTimeout());
            return;
        }
        
        if (tick != null) {
            /**
             * cleaning the JSON payload
             */
            tick = tick.replace("// [", "");
            tick = tick.replace("]", "");


            msgEvent.fire(StockDataParser.parse(tick));
        }


    }


    /**
     * purges the timer
     */
    @PreDestroy
    public void close() {
        timer.cancel();
        Logger.getLogger(StockPriceScheduler.class.getName()).log(Level.INFO, "Application shutting down. Timer will be purged");
    }
}

 

 

RESTConfig.java

 

/**
 * JAX-RS configuration class
 * 
 */
@ApplicationPath("api")
public class RESTConfig extends Application{
    
}

 

 

StockDataParser.java

 

/**
 * A simple utility class which leverages the JSON Processing (JSON-P) API to filter the JSON 
 * payload obtained from the Google Finance REST endpoint and returns useful data in a custom format
 * 
 */
public class StockDataParser {
    
    public static String parse(String data){
        
        JsonReader reader = Json.createReader(new StringReader(data));
                JsonObject priceJsonObj = reader.readObject();
                String name = priceJsonObj.getJsonString("t").getString();
                String price = priceJsonObj.getJsonString("l_cur").getString();
                String time = priceJsonObj.getJsonString("lt_dts").getString();
        


        return (String.format("Price for %s on %s = %s USD", name, time, price));
    }
}

 

A note on packaging

A mentioned earlier, from a development perspective, it is a typical WAR based Java EE application which is packaged as a fat JAR along with the Payara Micro container

 

Notice how the container is being packaged with the application rather than the application being deployed into a container

The Java EE APIs are only needed for compilation (scope = provided) since they are present in the Payara Micro library

 

<dependency>
 <groupId>javax</groupId>
 <artifactId>javaee-api</artifactId>
 <version>7.0</version>
 <scope>provided</scope>
</dependency>

 

 

Using the Maven plugin to produce a fat JAR

 

<plugin>
    <groupId>org.codehaus.mojo</groupId>
    <artifactId>exec-maven-plugin</artifactId>
    <version>1.5.0</version>
    <dependencies>
        <dependency>
            <groupId>fish.payara.extras</groupId>
            <artifactId>payara-micro</artifactId>
            <version>4.1.1.164</version>
        </dependency>
    </dependencies>
    <executions>
        <execution>
            <id>payara-uber-jar</id>
            <phase>package</phase>
            <goals>
                <goal>java</goal>
            </goals>
            <configuration>
                <mainClass>fish.payara.micro.PayaraMicro</mainClass>
                <arguments>
                    <argument>--deploy</argument>
                    <argument>${basedir}/target/${project.build.finalName}.war</argument>
                    <argument>--outputUberJar</argument>                                                  
                    <argument>${basedir}/target/${project.build.finalName}.jar</argument>
                </arguments>
                <includeProjectDependencies>false</includeProjectDependencies>
                <includePluginDependencies>true</includePluginDependencies>
                <executableDependency>
                    <groupId>fish.payara.extras</groupId>
                    <artifactId>payara-micro</artifactId>
                </executableDependency>
            </configuration>
        </execution>
    </executions>
</plugin>

 

 

Setting up Continuous Integration & Deployment

The below sections deal with the configurations to made within Oracle Developer Cloud service

 

Project & code repository creation

Please refer to the Project & code repository creation section in the Tracking JUnit test results in Developer Cloud service blog or check the product documentation for more details

 

Configure source code in Git repository

Push the project from your local system to your Developer Cloud Git repo you just created. We will do this via command line and all you need is Git client installed on your local machine. You can use Git or any other tool of your choice

 

cd <project_folder> 
git init  
git remote add origin <developer_cloud_git_repo>  
//e.g. https://john.doe@developer.us.oraclecloud.com/developer007-foodomain/s/developer007-foodomain-project_2009/scm/sample.git//john.doe@developer.us.oraclecloud.com/developer007-foodomain/s/developer007-foodomain-project_2009/scm/sample.git   
git add .  
git commit -m "first commit"  
git push -u origin master  //Please enter the password for your Oracle Developer Cloud account when prompted

 

Configure build

 

Create a New Job

 

 

Select JDK

 

 

 

Continuous Integration (CI)

 

Choose Git repository

 

 

 

Set build trigger - this build job will be triggered in response to updated within the Git repository (e.g. via git push)

 

 

Add build steps

 

  • A Maven Build step – to produce the WAR and the fat JAR
  • An Execute Shell step – package up the application JAR along with the required deployment descriptor (manifest.json required by Application Container cloud)

 

 

 

 

Here is the command for your reference

 

zip -j accs-payara-micro.zip target/mystocks.jar manifest.json

 

The manifest.json is as follows

 

{
    "runtime": {
        "majorVersion": "8"
    },
    "command": "java -jar mystocks.jar --port $PORT --noCluster",
    "release": {
        "build": "23022017.1202",
        "commit": "007",
        "version": "0.0.1"
    },
    "notes": "Java EE on ACC with Payara Micro"
}

 

Activate a post build action to archive deployable zip file

 

 

 

Execute Build

Before configuring deployment, we need to trigger the build in order to produce the artifacts which can be referenced by the deployment configuration

 

 

 

After the build is complete, you can

  • Check the build logs
  • Confirm archived artifacts

 

Logs

 

 

Artifacts

 

 

 

Continuous Deployment (CD) to Application Container Cloud

 

Create a New Confguration for deployment

 

 

 

  • Enter the required details and configure the Deployment Target
  • Configure the Application Container Cloud instance
  • Configure Automatic deployment option on the final confirmation page

 

You’ll end up with the below configuration

 

 

Confirmation screen

 

 

 

Check your application in Application Container Cloud

 

 

 

Test the CI/CD flow

 

Make some code changes and push them to the Developer Cloud service Git repo. This should

 

  • Automatically trigger the build, which once successful will
  • Automatically trigger the deployment process, and
  • Redeploy the new application version to Application Container Cloud

 

Test the application

 

 

I would recommend using the client which can be installed into Chrome browser as a plugin – Simple WebSocket Client

 

That's all for this blog post..

 

**The views expressed in this post are my own and do not necessarily reflect the views of Oracle

Part I of the blog demonstrated development, deployment of individual microservices (on Oracle Application Container Cloud) and how they are loosely coupled using the Apache Kafka message hub (setup on Oracle Compute Cloud). This (second) part will continue building on the previous one and with the help of an application, it will explore microservice based stream processing and dive into the following areas

 

  • Kafka Streams: A stream processing library
  • Scalability: enable your application to handle increased demands
  • Handling state: this is a hard problem to solve when the application needs to be horizontally scalable

 

 

Technical Components

 

Open source technologies

The following open source components were used to build the sample application

 

Component

Description

 

 

Apache Kafka

A scalable, distributed pub-sub message hub

Kafka Streams

A library for building stream processing applications on top of Apache Kafka

Jersey

Used to implement REST and SSE services. Uses Grizzly as a (pluggable) runtime/container

Maven

Used as the standard Java build tool (along with its assembly plugin)

 

Oracle Cloud

The following Oracle Cloud services have been leveraged

 

Oracle Cloud Service

Description

 

 

Application Container Cloud

Serves as a scalable platform for running our

stream processing microservices

Compute Cloud

Hosts the Kafka cluster (broker)

 

Note: In addition to compute based (IaaS) Kafka hosting, Oracle Cloud now offers Event Hub Cloud. This is a compelling offering which provides Apache Kafka as a fully managed service along with other value added capabilities.

 

Hello Kafka Streams!

In simple words, Kafka Streams is a library which you can include in your Java based applications to build stream processing applications on top of Apache Kafka. Other distributed computing platforms like Apache Spark, Apache Storm etc. are widely used in the big data stream processing world, but Kafka Streams brings some unique propositions in this area

 

Kafka Streams: what & why

 

What

Why

 

 

Built on top of Kafka – leverages its scalable and fault tolerant capabilities

If you use Kafka in your ecosystem, it makes perfect sense to leverage Kafka Streams to churn streaming data to/from the Kafka topics

 

 

Microservices friendly

It’s a lightweight library which you use within your Java application. This means that you can use it to  build microservices style stream processing applications

 

 

Flexible deployment & elastic in nature

You’re not restricted to a specific deployment model (e.g. cluster-based). The application can be packaged and deployed in a flexible manner and scaled up and down easily

 

 

For fast data

Harness the power of Kafka streams to crunch high volume data in real time systems – it does not need to be at big data scale

 

 

Support for stateful processing

Helps manage local application state in a fault tolerant & scalable manner

 

 

Sample application: what’s new

 

In part I, the setup was as follows

  • A Kafka broker serving as the messaging hub
  • Producer application (on Application Container Cloud) pushing CPU usage metrics to Kafka
  • Consumer application (on Application Container Cloud) consuming those metrics from Kafka and exposes them as real time feed (using Server Sent Events)

 

Some parts of the sample have been modified to demonstrate some of the key concepts. Here is the gist

 

Component

Changes

 

 

Consumer API

The new consumer application leverages the Kafka Streams API on Application Container Cloud as compared to the traditional (polling based) Kafka Consumer client API (used in part I)

 

 

Consumer topology

We will deploy multiple instances of the Consumer application to scale our processing logic

 

 

Nature of metrics feed

The cumulative moving average of the CPU metrics per machine is calculated as opposed to the exact metric provided by the SSE feed in part I

 

 

Accessing the CPU metrics feed

the consumer application makes the CPU usage metrics available in the form of a REST API as compared to the SSE based implementation in part I

 

High level architecture

The basic architecture still remains the same i.e. microservices decoupled using a messaging layer

 

 

 

As mentioned above, the consumer application has undergone changes and is now based on the Kafka Streams API. We could have continued to use the traditional poll based Kafka Consumer client API as in part I, but the Kafka Streams API was chosen for a few reasons. Let’s go through them in detail and see how it fits in the context of the overall solution. At this point, ask yourself the following questions

 

  • How would you scale your consumer application?
  • How would you handle intermediate state (required for moving average calculation) spread across individual instances of your scaled out application?

 

Scalability

With Application Container Cloud you can spawn multiple instances of your stream processing application with ease (for more details, refer to the documentation)

 

But how does it help?

The sample application models CPU metrics being continuously sent by the producer application to a Kafka broker – for demonstration purposes, the number of machines (whose CPU metrics are being sent) have been limited to ten. But how would you handle large scale data

 

  • When the number of machines increases to scale of thousands?
  • Perhaps you want to factor in additional attributes (in addition to just the cpu usage)?
  • Maybe you want to execute all this at data-center scale?

 

The answer lies in distributing your computation across several processes and this is where horizontal scalability plays a key role.

When the CPU metrics are sent to a topic in Kafka, they are distributed to different partitions (using a default consistent hashing algorithm) – this is similar to sharding. This helps from multiple perspectives

  • When Kafka itself is scaled out (broker nodes are added) – individual partitions are replicated over these nodes for fault tolerance and high performance
  • From a consumer standpoint - multiple consumers (in the same group) automatically distribute the load among themselves

 

In the case of our example, each of the streams processing application instance is nothing but a (specialized) form of Kafka Consumer and takes up a non-overlapping set of partitions in Kafka for processing. For a setup where 2 instances which are processing data for 10 machines spread over 4 partitions in Kafka (broker). Here is a pictorial representation

 

 

 

Managing application state (at scale)

The processing logic in the sample application is not stateless i.e. it depends on previous state to calculate its current state. In the context of this application, state is

 

  • the cumulative moving average of a continuous stream of CPU metrics,
  • being calculated in parallel across a distributed set of instances, and
  • constantly changing i.e. the cumulative moving average of the machines handled by each application instance is getting updated with the latest results

 

If you confine the processing logic to a single node, the problem of localized state co-ordination would not have existed i.e. local state = global state. But this luxury is not available in a distributed processing system. Here is how our application handles it (thanks to Kafka Streams)

 

  • The local state store (a KV store) containing the machine to (cumulative moving average) CPU usage metric is sent to a dedicated topic in Kafka e.g. the in-memory-avg-store in our application (named cpu-streamz) will have a corresponding topic cpu-streamz-in-memory-avg-store-changelog in Kafka
  • This topic is called a changelog since it is a compacted one i.e. only the latest key-value pair is retained by Kafka. This is meant to achieve the goal (distributed state management) in the cheapest possible manner
  • During scale up – Kafka assigns some partitions to the new instance (see above example) and the state for those partitions (which were previously stored in another instance) are replayed from the Kafka changelog topic to build the state store for this new instance
  • When an instance crashes or is stopped – the partitions being handled by that instance is handed off to some other node and the state of the partition (stored in the Kafka changelog topic) is written to the local state store of the existing node to which the work was allotted

 

All in all, this ensures scalable and fault tolerant state management

 

Exposing application state

As mentioned above, the cumulative moving averages of CPU metrics of each machine is calculated across multiple nodes in parallel. In order to find out the global state of the system i.e. current average of all (or specific) machines, the local state stores need to be queried. The application provides a REST API for this

 

 

 

 

More details in the Testing section on how to see this in action

 

It's important to make note of these points with regards to the implementation of the REST API which in turns lets us get what we want - real time insight in to the moving averages of the CPU usage

 

  • Topology agnostic: Use a single access URL provided by Application Container Cloud (as highlighted in the diagram above). As a client, you do not have to be aware of individual application instances
  • Robust & flexible: Instances can be added or removed on the fly but the overall business logic (in this case it is calculation of the cumulative moving average of a stream of CPU metrics) will remain fault tolerant and adjust to the elastic topology changes

 

This is made possible by a combination of the following

 

  • Automatic load balancing: Application Container cloud load balances requests among multiple instances of your applications
  • Clustered setup: from an internal implementation perspective, your application instances can detect each other. For this to work, the isClustered attribute in the manifest.json is set to true and custom logic is implemented within the solution in order for the instance specific information to be discovered and used appropriately. However, this is an internal implementation detail and the user is not affected by it

Please look at the Code snippets section for some more details

  • Interactive queries: this capability in Kafka Streams enables external clients to introspect the state store of a stream processing application instance via a host-port configuration enabled within the app configuration

 

An in-depth discussion of Kafka Streams is not possible in a single blog. The above sections are meant to provide just enough background which is (hopefully) sufficient from the point of view of this blog post. Readers are encouraged to spend some time going through the official documentation and come back to this blog to continue hacking on the sample

 

Setup

You can refer to part I of the blog for the Apache Kafka related setup. The only additional step which needs to be executed is exposing the port on which your Zookeeper process is listening (its 2181 by default) – as this is required by the Kafka Streams library configuration. While executing the steps from section Open Kafka listener port section, ensure that you include the Oracle Compute Cloud configuration for 2181 (in addition to the Kafka broker port 9092)

 

Code

Maven dependenies

As mentioned earlier, from an application development standpoint, Kafka Streams is just a library. This is evident in the pom.xml

 

<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-streams</artifactId>
     <version>0.10.1.1</version>
</dependency>

 

The project also uses the appropriate Jersey libraries along with the Maven shade and assembly plugins to package the application  

Overview

The producer microservice remains the same and you can refer part I for the details. Let’s look at the revamped Consumer stream processing microservice

 

Class

Details

 

 

KafkaStreamsAppBootstrap

Entry point for the application. Kicks off Grizzly container, Kafka Stream processing pipeline

CPUMetricStreamHandler

Implements the processing pipeline logic and handles K-Stream configuration and the topology creation as well

MetricsResource

Exposes multiple REST endpoints for fetching CPU moving average metrics

Metric, Metrics

POJOs (JAXB decorated) to represent metric data. They are exchanged as JSON/XML payloads

GlobalAppState, Utils

Common utility classes

 

Now that you have a fair idea of what's going on within the application and an overview of the classes involved, it makes sense to peek at some of the relevant sections of the code

 

State store

 

    public static class CPUCumulativeAverageProcessor implements Processor<String, String> {
     ...................
        @Override
        public void init(ProcessorContext pc) {
            this.pc = pc;
            this.pc.schedule(12000); //invoke punctuate every 12 seconds
            this.machineToAvgCPUUsageStore = (KeyValueStore<String, Double>) pc.getStateStore(AVG_STORE_NAME);
            this.machineToNumOfRecordsReadStore = (KeyValueStore<String, Integer>) pc.getStateStore(NUM_RECORDS_STORE_NAME);
        }
     ...............

 

Cumulative Moving Average (CMA) calculation

 

..........
@Override
public void process(String machineID, String currentCPUUsage) {

            //turn each String value (cpu usage) to Double
            Double currentCPUUsageD = Double.parseDouble(currentCPUUsage);
            Integer recordsReadSoFar = machineToNumOfRecordsReadStore.get(machineID);
            Double latestCumulativeAvg = null;

            if (recordsReadSoFar == null) {
                PROC_LOGGER.log(Level.INFO, "First record for machine {0}", machineID);
                machineToNumOfRecordsReadStore.put(machineID, 1);
                latestCumulativeAvg = currentCPUUsageD;
            } else {
                Double cumulativeAvgSoFar = machineToAvgCPUUsageStore.get(machineID);
                PROC_LOGGER.log(Level.INFO, "CMA so far {0}", cumulativeAvgSoFar);

                //refer https://en.wikipedia.org/wiki/Moving_average#Cumulative_moving_average for details
                latestCumulativeAvg = (currentCPUUsageD + (recordsReadSoFar * cumulativeAvgSoFar)) / (recordsReadSoFar + 1);
                recordsReadSoFar = recordsReadSoFar + 1;
                machineToNumOfRecordsReadStore.put(machineID, recordsReadSoFar);
            }

            machineToAvgCPUUsageStore.put(machineID, latestCumulativeAvg); //store latest CMA in local state store
..........

 

 

Metrics POJO

 

@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
public class Metrics {
    private final List<Metric> metrics;

    public Metrics() {
        metrics = new ArrayList<>();
    }

    public Metrics add(String source, String machine, String cpu) {
        metrics.add(new Metric(source, machine, cpu));
        return this;
    }

    public Metrics add(Metrics anotherMetrics) {
        anotherMetrics.metrics.forEach((metric) -> {
            metrics.add(metric);
        });
        return this;
    }

    @Override
    public String toString() {
        return "Metrics{" + "metrics=" + metrics + '}';
    }
    
    public static Metrics EMPTY(){
        return new Metrics();
    }
    
}

 

 

Exposing REST API for state

 

@GET
@Produces({MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML})
public Response all_metrics() throws Exception {
        Response response = null;
        try {
            KafkaStreams ks = GlobalAppState.getInstance().getKafkaStreams();
            HostInfo thisInstance = GlobalAppState.getInstance().getHostPortInfo();
            
          Metrics metrics = getLocalMetrics();

            ks.allMetadataForStore(storeName)
                    .stream()
                    .filter(sm -> !(sm.host().equals(thisInstance.host()) && sm.port() == thisInstance.port())) //only query remote node stores
                    .forEach(new Consumer<StreamsMetadata>() {
                        @Override
                        public void accept(StreamsMetadata t) {
                            String url = "http://" + t.host() + ":" + t.port() + "/metrics/remote";
                            //LOGGER.log(Level.INFO, "Fetching remote store at {0}", url);
                            Metrics remoteMetrics = Utils.getRemoteStoreState(url, 2, TimeUnit.SECONDS);
                            metrics.add(remoteMetrics);
                            LOGGER.log(Level.INFO, "Metric from remote store at {0} == {1}", new Object[]{url, remoteMetrics});
                        }
                    });

            response = Response.ok(metrics).build();
        } catch (Exception e) {
            LOGGER.log(Level.SEVERE, "Error - {0}", e.getMessage());
        }
        return response;
}

 

Host discovery

 

    public static String getHostIPForDiscovery() {
    String host = null;
        try {

            String hostname = Optional.ofNullable(System.getenv("APP_NAME")).orElse("streams");

            InetAddress inetAddress = Address.getByName(hostname);
            host = inetAddress.getHostAddress();

        } catch (UnknownHostException ex) {
            host = "localhost";
        }
        return host;
    }

Deployment to Application Container Cloud

 

Now that you have a fair idea of the application, it’s time to look at the build, packaging & deployment

 

Update deployment descriptors

 

The metadata files for the producer application are the same. Please refer to part I for details on how to update them. The steps below are relevant to the (new) stream processing consumer microservice.

manifest.json: You can use this file in its original state

 

{
    "runtime": {
        "majorVersion": "8"
    },
    "command": "java -jar acc-kafka-streams-1.0.jar",
  "isClustered": "true"
}

 

deployment.json

 

It contains environment variables corresponding required by the application at runtime. The value is left as a placeholder for you to fill prior to deployment.

 

{
"instances": "2",
  "environment": {
  "APP_NAME":"kstreams",
  "KAFKA_BROKER":"<as-configured-in-kafka-server-properties>",
  "ZOOKEEPER":"<zookeeper-host:port>"
  }
}

 

Here is an example

 

{
"instances": "2",
  "environment": {
  "APP_NAME":"kstreams",
  "KAFKA_BROKER":"oc-140-44-88-200.compute.oraclecloud.com:9092",
  "ZOOKEEPER":"10.190.210.199:2181"
  }
}

 

You need to be careful about the following

 

  • The value of the KAFKA_BROKER attribute should be the same as (Oracle Compute Cloud instance public DNS) the one you configured in the advertised.listeners attribute of the Kafka server.properties file
  • The APP_NAME attribute should be the same as the one you use while deploying your application using the Application Container Cloud REST API

Please refer to the following documentation for more details on metadata files

 

 

Build

 

Initiate the build process to produce the deployable artifact (a ZIP file)

 

//Producer application

cd <code_dir>/producer //maven project location
mvn clean package

//Consumer application

cd <code_dir>/producer //maven project location
mvn clean package

 

The output of the build process is the respective ZIP files for producer (accs-kafka-producer-1.0-dist.zip) and consumer (acc-kafka-streams-1.0-dist.zip) microservices respectively

 

Upload & deploy

You would need to upload the ZIP file to Oracle Storage Cloud and then reference it in the subsequent steps. Here are the required the cURL commands

 

Create a container in Oracle Storage cloud (if it doesn't already exist)  
  
curl -i -X PUT -u <USER_ID>:<USER_PASSWORD> <STORAGE_CLOUD_CONTAINER_URL>  
e.g. curl -X PUT –u jdoe:foobar "https://domain007.storage.oraclecloud.com/v1/Storage-domain007/accs-kstreams-consumer/"  
  
Upload your zip file into the container (zip file is nothing but a Storage Cloud object)  
  
curl -X PUT -u <USER_ID>:<USER_PASSWORD> <STORAGE_CLOUD_CONTAINER_URL> -T <zip_file> "<storage_cloud_object_URL>" //template  
e.g. curl -X PUT –u jdoe:foobar -T acc-kafka-streams-1.0-dist.zip "https://domain007.storage.oraclecloud.com/v1/Storage-domain007/accs-kstreams-consumer/accs-kafka-consumer.zip"

 

 

Repeat the same for the producer microservice

 

You can now deploy your application to Application Container Cloud using its REST API. The Oracle Storage cloud path (used above) will be referenced while using the Application Container Cloud REST API (used for deployment). Here is a sample cURL command which makes use of the REST API

 

curl -X POST -u joe@example.com:password \    
-H "X-ID-TENANT-NAME:domain007" \    
-H "Content-Type: multipart/form-data" -F "name=kstreams" \    
-F "runtime=java" -F "subscription=Monthly" \    
-F "deployment=@deployment.json" \    
-F "archiveURL=accs-kstreams-consumer/accs-kafka-consumer.zip" \    
-F "notes=notes for deployment" \    
https://apaas.oraclecloud.com/paas/service/apaas/api/v1.1/apps/domain007  

 

Note

  • the name attribute used in the curl command should be the same as the APP_NAME attribute used in the manifest.json
  • Repeat the same for the producer microservice

 

Post deployment

(the consumer application has been highlighted below)

 

The Applications console

 

 

The Overview sub-section

 

 

 

The Deployments sub-section

 

 

 

Testing

Assuming your Kakfa broker is up and running and you have deployed the application successfully, execute the below mentioned steps to test drive your application

 

Start the producer

Trigger your producer application by issuing a HTTP GET https://my-producer-app-url/producer e.g. https://accs-kafka-producer-domain007.apaas.us.oraclecloud.com/producer. This will start producing (random) CPU metrics for a bunch of (10) machines

 

 

You can stop the producer by issuing a HTTP DELETE on the same URL

 

 

Check the statistics

 

Cumulative moving average of all machines

Allow the producer to run for a 30-40 seconds and then check the current statistics. Issue a HTTP GET request to your consumer application e.g. https://acc-kafka-streams-domain007.apaas.us.oraclecloud.com/metrics. You’ll see a response payload similar to what’s depicted below

 

 

 

The information in the payload is as following

  • cpu: the cumulative average of the CPU usage of a machine
  • machine: the machine ID
  • source: this has been purposefully added as a diagnostic information to see which node (instance in the Application Container Cloud) is handling the calculation for a specific machine (this is subject to change as your application scales up/down)

 

Cumulative moving average of a specific machine

 

 

 

Scale your application

Increase the number of instances of your application (from 2 to 3)

 

 

 

Check the stats again and you’ll notice that the computation task is being shared among three nodes now..

 

That’s all for this blog series.. !

 

**The views expressed in this post are my own and do not necessarily reflect the views of Oracle