Problem With Michael-Scott Lock-Free Queue

As I stated in the previous post, the actual queue is built based on the lock-free queue implementation described by Maged Michael and Michael Scott in this paper, and that it is probably the best known and most widely used lock-free queue algorithm. Doug Lea used it as the basis for concurrent linke queues in java.util.concurrent libraries for Java. In the paper, the authors address a major problem with lock-free programming known as the ABA problem, where between the read and update another process changes the value from A to B and back to A so that the queue is not in the same state though the pointers in first process have not changed. The well-known solution is to add a generation counter and use a double-word compare_and_swap instruction, which is what my implementation does. The Java implementation does not have the same issue because of garbage collection since the first thread would hold a reference to A it could never be reused. The shared queue uses the counter and double-word CAS instruction to prevent ABA issues since it is implemented in shared memory with no possibility of garbage collection.

While stress testing the shared memory queue, I found another problem with the use of the Michael-Scott algorithm in addition to the ABA problem. I stumbled across the problem while stress testing mostly because I use multiple queues using the same memory in close proximity starting at the same initial state for the generational counters, which dramatically increased the probability of occurrence. The problematic algorithm below is from the paper:

enqueue(Q: pointer to queue t, value: data type)
E1: node = new node()
E2: node–>value = value
E3: node–>next.ptr = NULL
E4: loop
E5: tail = Q–>Tail
E6: next = tail.ptr–>next
E7: if tail == Q–>Tail
E8: if next.ptr == NULL
E9: if CAS(&tail.ptr–>next, next, <node, next.count+1>)
E10: break
E11: endif
E12: else
E13: CAS(&Q–>Tail, tail, <next.ptr, tail.count+1>)
E14: endif
E15: endif
E16: endloop
E17: CAS(&Q–>Tail, tail, <node, tail.count+1>)

The problem begins after line E7 where an executing process is suspended before the CAS at line E9, either before or after line E8. The following diagram illustrates the steps that can occur with concurrently executing processes:

Step 1 for process 1 begins after line E7 where it gets suspended while attempting to add an item at the end of queue 1. At that point, another process can remove the items on the queue including item B which is the tail item for process 1. Then if item B is added to the end of queue 2, it looks to be in the same state as it was in the end of queue 1. If the generation counters for the two queues contain the same value, for instance because the items are bouncing between the queues, process 1 will succeed when it resumes and performs the CAS at line E9. Line E17 will fail because the tail has changed, so the item is incompletely on queue 2 instead of queue 1 and the queue 2 tail is not advanced correctly. I believe the Java implementation is not protected from this issue either based on the code I downloaded from Doug Lea’s homepage if the next field is set to null for any reason after being removed from queue.

I came up with two alternative solutions to the problem. The simpler one for my shared memory queue implementation was to intialize the generation counters to separate ranges of integer values that would not likely overlap since it is based on a 64-bit architecture where the integers will not wrap. Technically one range could end up overlapping the other, in which case the problem would occur again, but I make the ranges so far apart that it would be almost impossible. The other alternative is to use a unique value as the end of queue value in the next field instead of null, which would also be needed for Java. That could be the parent queue structure address, for instance, or some empty node value allocated for each queue, but the only requirement is that it be unique for each queue instance.

Shared Memory Interprocess Queue

In attempting to build my own implementation of an interprocess queue, I was faced with two basic design issues related to the requirements I placed on myself. The first requirement was to be able to dynamically grow the queue based on demand for memory, and the second requirement was to allow arbitrary sized messages. I wanted no preallocation or explicit limitations. So the design had to allow the shared memory to grow, and for all processes to be able to detect the increase and adjust as needed concurrently. Secondly, I needed to track arbitrary allocations and deallocations within the shared memory object.

Initially, a single page of 4096 bytes is allocated in shared memory, which for Linux resides in /dev/shm on the file system. The initial allocation looks like the diagram below:

For all processes to be able to use the queue when accessing at different virtual addresses, the memory is treated as an array of signed 64-bit integers so that the embedded data structures are all updated based on calculated offsets instead of pointers. There is a 512-byte header that contains the base of all the basic embedded data structures. Those data structures include three separate semaphores, lock-free lists for the queues, and a critbit tree for tracking freed data allocations. Both the critbit tree and the lists use the same size internal nodes, which have to be double-word aligned. All node allocations are performed from the left to guarantee alignment, whereas, the data allocations can be an arbitrary number of array slots proceed from the right when looking at the diagram. The array is always grown in multiples of 4096-byte page size and the allocation schemes for each type is maintained by advancing the allocation counters to the left and right of the newly allocated pages.

Unfortunately, I could not figure out a way to manage a lock-free concurrent expansion across multiple processes so I had to use a dedicated semaphore in the header as a mutex such that only one process at a time can expand the shared memory. That way any accessing process is able to safely expand using ftruncate() through the use of a shared semaphore. Every access of the shared memory by other processes is guarded by a range check on the index to make sure it is in bounds, otherwise, the enlarged shared memory is remapped before proceeding. In this manner, I am able to have any given process expand the shared memory queue and have all other processes detect and adjust.

As previously mentioned, I use a critbit tree to track freed data allocations based on the implementation described in this paper. I chose that particular data structure because it minimizes the number of internal nodes which minimizes the memory references. I believe that in typical usage that messages on the queue will grouped within a typical range so that the tree in most cases will remain quite small. The key used is the number of 64-bit integer slots in the array the memory allocation occupies, which means that endianness of the native CPU architecture affects the implementation. Since I have access to only Intel CPUs, the code is written for little-endian integer keys.

The actual queue is built based on the lock-free queue implementation described by Maged Michael and Michael Scott in this paper. It is probably the best known and most widely used lock-free queue algorithm. Doug Lea used it as the basis for concurrent queues in java.util.concurrent libraries for Java. The book The Art of Multiprocessor Programming by Maurice Herlihy and Nir Shavit also describes the algorithms involved and the issues surrounding them in detail. There are two explicit queues embedded with one for messages and the other for events. Also, when internal nodes are freed, they are essentially queued on a free list.

The conceptual diagram below illustrates a queue with a couple of items enqueued:

In the diagram, I use the arrows as if they are pointers, where in the actual implementation they are actually integer index references rather than pointer addresses on heap as would be done in a typical single process implementation. The data for each item on queue is shown allocated to the right hand side, and queue nodes are allocated on the left. Only node allocations end up on the actual queue. Each node has a reference to the data location and the next node on queue. Lastly, there is both a head reference and a tail reference maintained in the header portion of the shared memory segment.

Finally, there are two more semaphores that are used to track the size of the queue and allow the calling processes to block either because the queue is empty or that the queue is full. The write semaphore is initialized to the maximum size and will allow writers to add items to queue only if it is not zero. Every add to the queue decrements the count and every remove increments the count. The read semaphore works just the opposite in that it is initialized to zero and reads from queue will block until something has been added. Every add increments the semaphore count and every remove decrements the count. By using two semaphores in such a manner, I am able to create a bounded queue that allows calling processes to block or not block as needed on both the adds and removes. An unbounded queue is approximated by setting the write semaphore to the maximum value for a semaphore, which on a 64-bit Linux system allows 2147483648 items to be added to a queue.