Monday, August 31, 2009

Implementing the end of the loop

After yesterday's blog entry, I may have left you thinking "this guy Walter, he sure can talk the talk, but can he code the code?" Well, yeah. After writing the entry, I felt the irresistible need to actually implement the (not new) idea of having a generic framework for running a compute kernel distributed over N threads on a system with M cores. It was not as easy as I thought it would be, and there was a catch interesting enough to write about here. Talking the talk is one thing, but coding the code still is another.

Description of the problem
You want to able to process an array using a generic "compute kernel", which is a subroutine that computes a value for a single cell in the array. The system should run as many threads as needed to complete this task, so it should virtually run one thread per cell. As this is a framework, the compute kernel routine must be passed as a function pointer so that it becomes a generic interface for processing arrays in parallel.

Designing a solution
A big problem is that it makes no sense to spawn a hundred threads on a (for example) dual core system, as the overhead would play a significant role in slowing the system down rather than speeding it up in comparison to its serial brother. The solution is to spawn only as many threads as there are processor cores. Each thread processes its part of the array (using an "old-fashioned" loop) and does this without locking, so it's very fast.
However, the compute kernel is written in such a way that each invocation still works on only one array element. This is a requirement, because otherwise it wouldn't be a real compute kernel. From the compute kernel's point of view, it should look like there are a hundred threads (or more) running in the system. So rather than passing a real thread id to the compute kernel, the framework cheats and passes the cell number, which now acts as a "virtual thread id".

Even though threads are lightweight, in a system like this, it is not maximum efficiency to spawn threads on an as-needed basis. I chose to pre-spawn (or pre-fork) the worker threads and to let them sleep until called for. This requires some basic thread scheduling mechanism. After finishing the parallel work, there should be another synchronization point as well. So the end result is that an array gets processed in parallel, after which the program can continue in serial.

Implementation details 1: Automated data partitioning
OK, suppose we have 100 array elements, that we want to process on only 2 cores. That's easy, each thread handles 50 array elements. Now suppose we have 115 array elements, and 8 cores. Easy too, each thread handles 115/8th part of the array, plus a little part that is the remainder. First, let's focus on the 115/8th part. The compute kernel routine expects a cell number (or "virtual thread id" as I like to call it), which is determined as follows:
    part = 115/8              # num_elem / n_threads
for i = 0 to part do
vtid = part * tid + i
call compute_kernel(vtid)
By the way, I implemented this in C, but it's easier explaining in pseudo-code. Also, you may have noticed you can optimize this pseudo-code by moving the multiplication for the "base vtid" outside the loop, but it's not really important for what I'm trying to convey here. The idea is that every thread acts on its own part of the array, and it selects its own part by using its thread id as base to index the array. Now, you should be aware that the thread id of a POSIX thread can be an arbitrary number, so it should be a sequential number that is known to this thread and has been set by you at initialization time of the thread. In other words, you give the first created thread a tid of 0, the second 1, the third 2, etc, and this number should be known to the thread itself (you can do this by passing it as argument to the thread's main function). This may seem very obvious, but as said, POSIX thread ids are arbitrary numbers, and otherwise this trick of selecting the correct region in the array would never work.

When you have an uneven distribution, as is the case with 115 array elements and 8 cores, there is a remainder of the array to be processed. The remainder is computed as the modulo; 115 % 8 equals 3. The way to handle this is the same as in the following case; suppose we had an array of only 3 elements and 8 cores, then we'd do it like this:
    if tid <= num_elem then
call compute_kernel(tid)
So, this is also how you handle the remainder part of the array in parallel, except that you'd make a simple calculation to get the "vtid" right.

This was all really simple, but we've actually accomplished a great thing here; we have created the ability to automatically process an arbitrarily sized array using any number of threads — meaning that this code will run good on a dual core system, and formidable on an 8-core system, with near linear performance improvement. Given the nature of scaling by data partitioning, this scales to 1000 cores just as easily, and further if you can find a machine that big (1).

Implementation details 2: Thread scheduling
Data partitioning was the easy part. The second part is all about thread synchronization, which is a schoolbook example, really, but very frustrating to get right if you didn't get it right the first time. I like to pre-fork the threads, and have them sleep until they are assigned work to do. Then they should be woken up, do their job, and go back to sleep. When all threads are sleeping again, that's our barrier point at which the parallel part is done, and the main application can continue in serial fashion.

This behavior can probably be implemented using mutexes only, but what's really fit for this job are condition variables. A condition variable enables a thread to sleep on a state variable, and then another thread (like in this case, the main thread) can signal it, to wake it up.

The way to do it is to use a state variable that says whether it's OK for the thread to run now, or not. This is a boolean that I named runnable. A thread is runnable when it can run, else it should go back to sleep.
thread code:
runnable = False
while not runnable do

main thread:
foreach thread do
mutex_lock(thread's mutex)
thread runnable = True
mutex_unlock(thread's mutex)
To understand this code, you should know that condition_wait unlocks the mutex when going to sleep, and automatically (and atomically) relocks the mutex when waking up again. Apparently (at least according to the pthread specifications), there may be spurious wake ups, so the waking thread must always check its state when it wakes up. That's why it's enclosed in a while loop.

The main thread isn't so exciting, it simply signals all threads to start work. A caveat here is that upon startup, not all threads may have gone to sleep yet. So there must be a barrier sync before starting, and after ending. The barrier sync simply checks whether all spawned threads are not runnable, ie. already sleeping.

Implementation details 3: The number of threads to use
I said it's best to spawn as many threads as there are processor cores. This is true for tight code, if this doesn't drive your system close to 100% CPU utilization, you may add more threads until it does. How do you find out how many cores are in the system? This is a tough question, as it is different on every system. See this great info on, that shows how to get the number of cores from the system for various operating systems.
In my code, I choose the easy way and simply read an environment variable that you can set yourself, which is kind of inspired by OMP_NUM_THREADS, really.

Whew, this blog entry ended up much more text and much less code than I anticipated, but I got to tell you that it's a joy to see a function call like
    run_parallel(process_array, arr, sizeof(arr))
work in parallel by the number of threads given in an environment variable. What's really great about this routine is its flexibility and its scalability. It really scales "for free", gaining more performance when you add more cores.

1. At NASA Ames Research Center, they actually have an SGI Altix supercomputer named Columbia that has a partition running a single system image of 2048 processors, which is incredibly large, even by supercomputer standards.