Thursday, June 30, 2011

Concurrency: A Method for Reducing Contention and Overhead in Worker Queues for Multithreaded Java Applications

Introduction
Many server applications, such as Web servers, application servers, database servers, file servers, and mail servers, maintain worker queues and thread pools to handle large numbers of short tasks that arrive from remote sources. In general, a "worker queue" holds all the short tasks that need to be executed, and the threads in the thread pool retrieve the tasks from the worker queue and complete the tasks.

Since multiple threads act on the worker queue, adding tasks to and deleting tasks from the worker queue needs to be synchronized, which introduces contention in the worker queue. This article explains the contention involved with the traditional approach (using a common queue for the thread pool) and helps you reduce the contention by maintaining one queue per thread. This article also explains a work stealing technique that is important for utilizing the CPU effectively in multicore systems.

Note: The source code for the examples described in this article can be downloaded here: workerqueue.zip

Common Worker Queue: The Traditional Approach

Today, most server applications use a common worker queue and thread pool to exploit the concurrency provided by the underlying hardware. As shown in Figure 1, server applications use a common worker queue to hold short tasks that arrive from remote sources. A pool of threads acts on the worker queue by retrieving tasks from the worker queue and running the tasks to completion. Threads are blocked on the queue if there is no task in the worker queue.

This method of using a common worker queue resolves the issues created by earlier approaches, such as creating a thread per task, which caused lots of threads to be spawned. However, the common worker queue method creates a bottleneck when the number of tasks is high and the task time is very short. The single background thread approach also has flaws when an application has a huge number of short-spanned, independent tasks.

Figure 1. Common Worker Queue.


Listing 1 shows how you can create a common worker queue with just few lines of code.

Listing 1. Creating a Common Worker Queue


 /* * Defines common worker queue and pool of 
      threads to execute tasks from remote sources */ 

public class SimpleWorkQueue {     
  private final PoolWorker[] threads;     
  private final BlockingDeque queue;
      
  public SimpleWorkQueue(int nThreads)     {         
        queue = new LinkedBlockingDeque();
        threads = new PoolWorker[nThreads];
        for (int i = 0 ; i < nThreads; i++) {
            threads[i] = new PoolWorker();
            threads[i].start();
        }
    }

    /*
     * Worker thread to execute remote tasks
     */
    private class PoolWorker extends Thread {         
     /*
      * Method to retrieve task from worker queue and start executing it.
      * This thread will wait for a task if there is no task in the queue. 
      */
        public void run() {
            while (!stopNow) {
                try {
     Runnable r = (Runnable) queue.takeLast();
     r.run();
                } catch ( java.lang.Throwable  e) { }
            }
        }
    }
}

As shown in Listing 1, the SimpleWorkQueue class initializes a dequeue and starts a fixed number of threads at startup. Each thread then executes queue.takeLast() in a loop that retrieves a task from the worker queue (if there are tasks) or waits for a new task to arrive (if it finds the queue is empty).

Once a task is retrieved, each thread then calls the run method, r.run(), of the task.

Worker Queues per Thread

The approach above is very simple and improves performance over the traditional approach of creating threads for each incoming task. However, as shown in Figure 2, this method creates contention.
Contention is created when multiple threads use a single work queue to get their task. The condition is worse when the number of threads (cores) is higher.

Figure 2. Contention in a Common Worker Queue.


Today, with the advent of more multicore processors, it becomes a challenge for software applications to utilize the underlying cores effectively. (For example, IBM's Power7, Oracle's UltraSPARC, and Intel's Nehalem are multicore processors capable of running multiple threads.)

There are various solutions available for overcoming the contention in the common worker queue approach:
  • Using lock-free data structures
  • Using concurrent data structures with multiple locks
  • Maintaining multiple queues to isolate the contention
In this article, we explain how to maintain multiple queues—a queue-per-thread approach—to isolate the contention, as shown in Figure 3.

Figure 3. Queue-per-Thread Queue.


In this approach, each thread has its own worker queue and can retrieve tasks only from its own queue, not from any other queue. This approach isolates contention when retrieving tasks because there is no one to compete with. This guarantees that threads will not be in a sleeping state if there are tasks in the worker queue, which utilizes the cores effectively.

Listing 2 shows how you can easily migrate from the common worker queue approach to the queue-per-thread approach by making just a few modifications to the code that was shown in Listing 1. In Listing 2, the constructor initializes multiple queues (equal to the number of threads) at startup and each thread maintains an ID called thread_id. Then, thread_id is used to isolate the contention by helping each thread retrieve tasks from its own queue.

Listing 2. Creating a Queue-per-Thread Queue


 /* Modification to number of queue initialization */ 
for (int i = 0 ; i
Queue-per-Thread Queue with Work Stealing
Although the queue-per-thread approach greatly reduces the contention, it does not guarantee that the underlying cores are used effectively all the time, For example, what happens if a couple of queues get emptied long before other queues? This is a common situation, and in this case, only a few threads execute the tasks whereas other threads (emptied queues threads) wait for the new tasks to arrive. This can happen due to following:
  • Unpredictable nature of the scheduling algorithm
  • Unpredictable nature of the incoming tasks (short versus long)
A solution to this problem is work stealing.
Work stealing lets one thread steal work from another queue when it finds that its own queue is empty. This ensures that all the threads (and, in turn, the cores) are busy all the time. Figure 4 shows a scenario where Thread 2 steals a work from Thread 1’s queue because its own queue is empty. Work stealing can be implemented with standard queues, but using a dequeue greatly reduces the contention involved in stealing the work:
  • Only the worker thread accesses the head of its own dequeue, so there is never contention for the head of a dequeue.
  • The tail of the dequeue is accessed only when a thread runs out of work. There is rarely contention for the tail of any thread's dequeue either.

Figure 4. Work Stealing.


Listing 3 shows how you can steal work from other queues with just a few modifications to the queue-per-thread approach. As shown, each thread calls pollLast() instead to takeLast().
This is necessary because threads should not get blocked on a queue if there is no task in the queue. Once a thread finds that its own queue is empty, it steals work from another queue by calling pollFirst() on the other thread's queue.


Listing 3. Implementing Work Stealing

/* do not block if there is no task in the current queue */
r = (Runnable) queue[thread_id].pollLast();

if(null == r) {
 /* There is no task in the current queue, 
steal one from another thread's queue 
*/
 r = stealWork(thread_id);
}

/*
 * Method to steal work from other queues.
 */
Runnable stealWork(index) {
     for (int i = 0; i < nThreads; i++) {
      if(i != index) {
       Object o = queue[i].pollFirst();
       if(o != null) {
        return (Runnable) o;
       }
      }
     }
     return null;
}

Building the Benchmark

In order to demonstrate these approaches, we developed a small test scenario for the three approaches mentioned in this article and studied the behavior. The test basically creates a lot of 10 x 10 matrix multiplication tasks and executes them using the three approaches.
Note: The source code for the examples described in this article can be downloaded here: workerqueue.zip
The test defines the following classes:
  • MainClass: A class that initiates, starts, and coordinates various elements of the benchmark.
  • WorkAssignerThread: A thread that creates a lot of 10 x 10 matrix multiplication tasks and queues them.
  • Task: A class that defines a 10 x 10 matrix multiplication.
  • WorkQueue: An interface that defines a set of methods any worker queue must implement.
  • WorkerQueueFactory: A factory class that returns the workQueue object based on the queue type.
  • SimpleWorkQueue: A class that defines a simple worker queue and initiates a set of threads. This depicts the first queue type mentioned in this article (common worker queue).
  • MultiWorkQueue: A class that isolates the contention by defining multiple worker queues (one per thread) and depicts the second queue type mentioned in this article.
  • WorkStealingQueue: A class that isolates the contention by defining multiple queues and steals work when it finds one of its thread's queues is empty. This depicts the third queue type mentioned in this article.
The test can be executed by specifying the queue type, number of threads, and number of tasks, as shown in Listing 4. Listing 4 also shows how to invoke a test with the first queue type (common worker queue), with the number of threads equal to 10 and the number of tasks equal to 10000.


Listing 4. Executing the Test
 java MainClass <Queue type> <number of threads> <number of tasks>  
/* for example: */  java MainClass 1 10 10000 
Experimental Results
We evaluated the performance in different architectures and the results are very positive. Initially, we evaluated the performance on an AMD Opteron box, which had eight core processors and ran Linux, and we found that performance for queue type 3 was improved by 12 to 18.4% over queue type 1, depending on the load, as shown in Figure 5.

Figure 5. Performance Comparison Between Type 1 and Type 3 Queues on Linux AMD Opteron System with Eight Core Processors. [Disclaimer: This is not to compare any products or claim performance for any product; it is just to showcase the advantage of the techniques proposed in this article, which are purely the authors' views.]
We also evaluated the performance in a Linux power system that had four dual-core processors (Power4), and we found that performance was improved by 12 to16% for the same load, as shown in Figure 6.

Figure 6. Performance Comparisons Between Type 1 and Type 3 Queues on Linux System with Four Dual-Core Power Processors. [Disclaimer: This is not to compare any products or claim performance for any product; it is just to showcase the advantage of the techniques proposed in this article, which are purely the authors' views.


As shown in Figure 5 and Figure 6, we varied the tasks from 0.1 million to .5 million and measured the performance in seconds. The outcome of our experiment clearly indicates that a large amount of contention is created in queue type 1 and it can be eliminated by creating multiple queues and stealing work.

Summary

This article demonstrated the contention involved in the common worker queue approach and then isolated the contention by creating one queue per thread. This article also demonstrated, through a simple benchmark, why work stealing is important and how it improves the overall performance of an application.

Resources

No comments :

Post a Comment