Query by Slice, Parallel Execute, and Join: A Thread Pool Pattern in Java Blog

Version 2


    Pagination is a technique by which you can present large data sets in small chunks with forward and backward navigability. Pagination can be done with custom code or with commercial, off-the-shelf (COTS) libraries. Nevertheless, many of these frameworks first bring the full dataset to the business, presentation, or client tier and then page them into small batches. This may not be the best possible solution; for one thing, such approaches consume huge amounts of memory.

    This article will first show you how to effectively utilizeROWNUM at the database level itself, so that we implement "true pagination": querying data in slices. Of course, you may also want to do some business processing to the fetched data. If you have millions of rows to be processed, you may want to process them in parallel to fully utilize the available processing power. In Java we use threads to do this, but with the advent of Java SE 5's java.util.concurrent.ThreadPoolExecutor, we also have a means to reuse the threads created. This means our paged data can be sent in batches to available threads in a thread pool. The JDK also provides us a mechanism to "join" back or aggregate the processed results from multiple threads.

    By effectively combining all the above concepts, it is possible to abstract out a Thread Pool pattern in the JDK, which can be reused across your daily parallel processing solutions. This article will showcase code that can be built and run using the JDK along with your favorite database.

    Query Data in Slices

    When you search for a product on an online site, you might get back a list of items instead of a single item. If the list is too big, you may also need some form of pagination. It is customary that in such scenarios you would also require a mechanism to navigate across pages, just like the one shown in Figure 1.

    Navigation with Pagination
    Figure 1. Navigation with pagination

    As I have already mentioned in the introduction, many pagination frameworks first bring the full dataset to the business, presentation, or client tier, and then page them into small batches. Deviating from this traditional approach, let us first look into a schema where we fetch pages from the datastore in batches.

    If you can somehow estimate the total fetch size for a query, you could then decide on the size of each page or batch to be queried. From this information, you can then work out the first index and the last index for each query. By limiting the query not to return more rows than is required, we can conserve system resources. Each database vendor has its own unique way to do this, and most of these solutions revolve around the ROW_NUMBER()window function.

    Let us now look at one of the common ways this is implemented by enterprise databases like Oracle. In Oracle, we can useROWNUM. The ROWNUM function is covered in the SQL Reference, Basic Elements of Oracle SQL, Chapter 2 (PDF).ROWNUM in Oracle is a pseudo-column, meaning it is not a "real" column that will show up when you describe a table using the DESC command. It doesn't exist anywhere in the database. But it exists for a row when retrieved using a query, and represents the sequential order in which Oracle has retrieved the row. The value of ROWNUM is just an integer that is assigned for each row of data fetched, and Oracle assigns theROWNUM "on the fly" just after the data is retrieved but before the ORDER BY clause is processed.

    To explain this, and much more, let me introduce a single, simple table, customers. The customers table schema is described in Figure 2.

    DESC Customers
    Figure 2. DESC customers

    This table can be created and pumped with data easily, by executing the script data.sql.

    Let us now look at applying ROWNUM for pagination. To retrieve rows X through Y of a result set, the general form is as follows:

    select * from ( select /*+ FIRST_ROWS(n) */ a.*, ROWNUM rnum from ( select customerid, customername, age from customers order by customerid ) a where ROWNUM <= :LAST_INDEX_TO_FETCH ) where rnum >= :FIRST_INDEX_TO_FETCH; 

    To make the ROWNUM work, I have used theORDER BY statement to order bycustomerid. Since customerid is the primary key in my table, this will work, but what if the column you are ordering by is not unique? In this case, you have to add something to the end of the ORDER BY to make it so. The following additional points are to be noted in the above query statement:

    • FIRST_ROWS hint chooses the cost-based approach to optimize a statement block in Oracle with a goal of best response time (minimum resource usage to return first row).
    • LAST_INDEX_TO_FETCH is set to the last row of the result set to fetch; i.e., if you wanted rows 71 to 80 of the result set, you would set LAST_INDEX_TO_FETCH to 80.
    • FIRST_INDEX_TO_FETCH is set to the first row of the result set to fetch. i.e., to get rows 71 to 80, you would set this to 71.

    Figure 3 shows typical query results.

    A Paginated Query
    Figure 3. A paginated query

    Did you like that? Rather, how many of you are now thinking of numerous other possible optimizations and workouts you can build around this data pagination using your Java tools? I am going to show you at least one such possibility, which I would abstract out as a parallel execution pattern.

    Parallel Task Execution in Java: Threads--The Old Mantra

    Now assume that your data table is a very big table, with more than a million records, and each record is "fat," too--say, 100 or more bytes. You may want to apply some business rules to the data in these rows and then execute some time-consuming business processing. How can you do this in the least possible amount of time?

    To solve our problem of processing a million rows in Java, it is easy to create a million threads and try to get them processed concurrently. But if you try to do that, your process is going to crash due to lack of resources. A better approach is to create a limited number of threads and try to execute the tasks with them. If we apply this approach to our million row problem, we may have to batch or paginate the rows, and send each batch or page to a thread. A thread will work on one page at a time, and the size of the page can be adjusted taking into consideration many aspects, a few of which are listed below:

    • Total rows to be processed
    • Total system resources available (including memory, processors, I/O channels, etc.)
    • How resource intensive processing individual rows is
    • How fast you want the processing to be completed

    Once a thread is started, the Java Virtual Machine calls therun() method of this thread. Any processing may be executed within the run() method, and when the processing is completed the run() method exits. It is never legal to start a thread more than once. In particular, a thread may not be restarted once it has completed execution. Still,Threads are heavy-weight objects, and if we can reuse them for further processing we can improve efficiency. We can use the Thread.sleep()method to "idle" a thread if it has completed current processing, until more tasks are available for processing. Such idling threads can be placed in a pool and when more tasks are available, we can take any idle thread from the pool and repeat the process execution.

    Pool Your Java Threads: ThreadPoolExecutor--The Smarter Way

    If we are to create a new thread for each task, we would be spending more time and consuming more system resources by creating and destroying more and more threads than by doing actual business processing. A Thread Pool helps us here by providing a solution to both the problem of thread lifecycle overhead and the problem of resource thrashing. A thread pool improves efficiency by following any or all of the following strategies:

    • It limits resource use: A maximum limit can be placed on the number of simultaneously executing threads.
    • It manages concurrency levels: A select number of threads can be allowed to execute simultaneously.
    • It minimizes overhead: Previously constructed thread objects are reused rather than creating new ones. This will improve performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead.

    The PooledExecutorclass from Doug Lea's open source library of concurrency utilities,util.concurrent, is a widely used, efficient, and correct implementation of a thread pool. Now we have the concurrency utilities in thejava.util.concurrent package in the core JDK and java.util.concurrent.ThreadPoolExecutoris the main thread pool class that executes each submitted task using one of possibly several pooled threads. Let us look at few methods of interest in the ThreadPoolExecutor class, which we will see in detail in the code sample.

    package java.util.concurrent; public class ThreadPoolExecutor implements ExecutionService { public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) public void execute(Runnable command); public int getCorePoolSize(); public int getLargestPoolSize(); public int getMaximumPoolSize(); public int getPoolSize(); public BlockingQueue<Runnable> getQueue(); public long getTaskCount(); public ThreadFactory getThreadFactory(); public boolean remove(Runnable task); public void setCorePoolSize(int corePoolSize); public void setKeepAliveTime(long time, TimeUnit unit); public void setMaximumPoolSize(int maximumPoolSize); // other methods goes here... } 

    First, you need to instantiate an appropriately configuredThreadPoolExecutor. This class provides many adjustable parameters and extensibility hooks. We can either adjust these parameters to fine-tune the pool, or use the more convenientExecutors factory methods, which are listed below:

    • Executors.newCachedThreadPool(): Unbounded thread pool, with automatic thread reclamation
    • Executors.newFixedThreadPool(int): Fixed size thread pool
    • Executors.newSingleThreadExecutor(): Single background thread

    The Executors factory methods preconfigure settings for common usage scenarios. To manually configure and fine-tune the pool, we need to know how to configure the thread pool size and the backing queue. This is explained next.

    • Configuring Thread Pool Size
      • corePoolSize: If you submit a new task when fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle.
      • maximumPoolSize: If you submit a new task when more than corePoolSize threads are running but less than maximumPoolSize, a new thread is created only if the queue is full.
    • Configuring Thread Pool Backing Queue
      • Direct handoffs Example: SynchronousQueue--Hands off tasks to threads without otherwise holding them. If you submit a new task when no threads are immediately available, a new thread will be constructed.
      • Unbounded queues Example: LinkedBlockingQueue--If you submit a new task when all corePoolSize threads are busy, the new tasks will wait. Here the number of threads will not exceed corePoolSize.
      • Bounded queues Example: ArrayBlockingQueue--By controlling the maximumPoolSizes, we can prevent resource exhaustion.

    Since there are numerous books available on threads (such as Java Threads, Third Edition), let us now jump into some practical usage patterns with code.

    A Sample Demonstration of Thread Pools

    In the sample table you created, you have about 450 customer entities, each identified separately with differentcustomerid values. Let us assume that you need to apply some business rules or process our customer entities. Going with strict design principles, you will normally have these rules or business processing code in the middle tier. It is not wise to process a million rows one by one; nor we can attach a separate thread each for each of these entities. This is where you can batch the business entities in small chunks, and use a thread pool with a predefined number of threads to process them in parallel. We will now look into that in code:

    private void testGetCustomersSortByIdInBatch()throws Exception{ Date start = new Date(); logger.info("TX-ID : " + ThreadLocalTxCounter.get() + " | Start "); int numBatch = 0; CustomerDao customerDao = new CustomerDaoJdbc(); int count = customerDao.getCustomerCount(); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, new LinkedBlockingQueue(BLOCKING_QUEUE_CAPACITY)); List bucketToCollect = Collections.synchronizedList(new ArrayList()); Callable callable = null; Customer customer = null; int batch = 0; Collection collection = new ArrayList(); logger.info("Batch #"); for(int i = 0; i < count; i += BATCH_SIZE){ if(logger.isInfoEnabled()){ System.out.print(" " + (++numBatch)); } batch = (count - i) > BATCH_SIZE ? BATCH_SIZE : (count - i); callable = new ObjectRelationalQueryTask( bucketToCollect, (i + 1), (i + batch)); collection.add(callable); } if(logger.isInfoEnabled()){ System.out.print("\n"); } threadPoolExecutor.invokeAll(collection); logger.debug("-----------------------------------------"); logger.debug("bucketToCollect.size() : " + bucketToCollect.size()); for(Iterator iterator = bucketToCollect.iterator(); iterator.hasNext();){ customer = (Customer) iterator.next(); logger.debug("customer : " + customer); } logger.debug("-----------------------------------------"); Date end = new Date(); logger.info("TX-ID : " + ThreadLocalTxCounter.get() + " | End | TimeElapsed(ms) : " + (end.getTime() - start.getTime())); } 

    This is what we are doing in the above code:

    1. As a first step, we try to get the count of the number of entities to be processed. This data will be later used to split the entities into multiple batches.
    2. We then instantiate a thread pool and configure it as follows:
      • CORE_POOL_SIZE = 4: The number of threads to keep in the pool, even if they are idle.
      • MAXIMUM_POOL_SIZE = 7: The maximum number of threads to allow in the pool.
      • KEEP_ALIVE_TIME = 1: If the number of threads is greater than the core, the excess idle threads will wait for new tasks before terminating up to these many time units.
      • BLOCKING_QUEUE_CAPACITY = 5: The fixed capacity of the LinkedBlockingQueue.
    3. Even though multiple threads are going to process the entities in batches, we need a mechanism to collect the processing results, when all the entities have been processed. For that, we now create a synchronized ArrayList that can act as a shared memory for all the threads to put results into.
    4. Next, we need to create tasks for the thread pool to work. Normally such tasks are objects that implement the Runnableinterface. But the java.util.concurrent.Callableinterface provides more control by allowing you to return a result or throw a checked exception (even though we will not demonstrate this here, since we already use another mechanism, the sharedArrayList described above, for collecting the response).
    5. We will use a BATCH_SIZE of 40 here. This means all our 450 entities will be split into batches, with each batch containing 40 items maximum.
    6. Corresponding to each batch, we create an instance ofObjectRelationalQueryTask, which is an implementation of the Callable interface.
      public class ObjectRelationalQueryTask implements Callable{ private List list; private int startIndex; private int endIndex; public ObjectRelationalQueryTask(List list, int startIndex, int endIndex){ this.list = list; this.startIndex = startIndex; this.endIndex = endIndex; } public List call(){ ThreadLocalTxCounter.resetTxId(); CustomerDao customerDao = new CustomerDaoJdbc(); List customers = customerDao.getCustomersSortById(startIndex, endIndex); //We will do business processing here :) list.addAll(customers); //We have our own mechanism to return results. //Hence returning null. return null; } } 
    7. All such tasks are then put in a collection and then supplied to the thread pool for execution.
    8. The invokeAllmethod executes the given tasks, returning a list ofFutures holding their status and results in the same sequential order as produced by the iterator for the given task collection when they all complete.
    9. Last but not least, the sample prints out the results of the processing into the console. You may need to enable logging at theDEBUG level to view the results in the console.

    Build and Run the Sample

    To build and run the sample, first you need to downloadQueryBySliceParallelExecuteAndJoinSrc.zip, available in theResources section, and follow these steps:

    1. Expand this archive file to a suitable location in your local hard drive, which will create a folder namedQueryBySliceParallelExecuteAndJoinSrc.
    2. If you haven't created the entities already, execute thedata.sql script available inside the conf subfolder at an SQL prompt to create the tables and entities in your Oracle database.
    3. Open the samples.PROPERTIES file available in the top-level folder and adjust the paths mentioned there to suit to your environment.
    4. Change directory toQueryBySliceParallelExecuteAndJoinSrc and executeant there, which will build the sample:
      • cd QueryBySliceParallelExecuteAndJoinSrc
      • ant
    5. Now you can run the sample by executing ant run.

    Keep watching the console. It will look like the one shown in Figure 4.

    Output from sample app
    Figure 4. Run the sample (click on thumbnail to view full-sized image)

    To understand the dynamics of the sample run, let us go over the major notable aspects with reference to Figure 4:

    • The entities, which are 450 in number, are split into batches. Each batch will contain a maximum of 40 items. Hence there are 12 batches.
    • There is a single thread pool configured. This thread pool hosts seven threads, named from pool-1-thread-1 topool-1-thread-7.
    • Each batch is processed by any one thread fully, in a single transaction, correlated with TX ID.
    • In every transaction we log once every time we complete processing 15 items. Hence in processing 40 items in a batch, we log three times.
    • Once a thread processes a batch fully (40 items), the thread will be made available by returning it the pool; the thread will be re-allocated if any more batches are waiting to be processed.
    • A single thread may be thus reused for many batches. This means a single thread may log more than one TX ID, in multiple processing or transaction contexts. (By "transactions" we don't mean ACID transactions, but instead just mean a processing unit of work.)
    • To mock some "heavy processing" happening on the tasks, each thread sleeps for some random amount of time after processing every 15 items. This helps us to visualize many threads working in parallel in the console, executing transactions.
    • If you want to make sure that all items are processed and the responses are available, enable logging at the DEBUGlevel and re-run the sample. This will demonstrate that we can in fact "join" the results of multiple threads.

    You may also re-run the sample by using the no-argument constructor of LinkedBlockingQueue. When you use this constructor, it creates a LinkedBlockingQueuewith a capacity of Integer.MAX_VALUE (i.e., unlimited capacity for most practical cases). If so, the console will look like that shown in Figure 5 instead.

    Output from sample app with unlimited capacity queue
    Figure 5. Run the sample with unlimited capacity queue (click on thumbnail to view full-sized image)

    What has happened here? When you use the unlimited capacity queue, you are in fact using unbounded queues. In this case, if you submit a new task when all corePoolSize threads are busy, the new tasks will wait. Here the number of threads will not exceed corePoolSize.

    Again, as an exercise for the reader, you can change the various configuration parameters of the thread pool and understand the effect by re-running the sample.


    Threads are powerful constructs available for software programming, which we have been leveraging even from the earliest versions of Java. But many times programs may not run efficiently due to non-optimized usage of system resources, including threads. Thread pools are yet another powerful tool, which even a less-experienced programmer can use to write optimized parallel programming code. This article showed you how you can apply parallelism in middle-tier Java programming and at the same time apply split-and-query mechanisms at the database level, so that we extend optimization patterns across multiple tiers in our application.