4 basic multithreading & lockfree programming exercises

Four exercises that touch basic multithreaded and lockfree programming concepts.

  1. Implement a program that attempts to use two threads to increment a global counter to 10,000 with each thread incrementing 5000. But make it buggy so that there are interleaving problems and the end result of the counter is less than 10,000.
  2. Fix the above with atomics.
  3. Implement a variant of the program: instead of simply incrementing the counter, make the counter wrap every 16 increments (as if incrementing through indices of an array of length 16). Make two threads each attempt to increment the counter (16 * 5000) times. The end state should have the counter be back at index zero. Implement it in a buggy naive way that causes the counter to often be nonzero, even if atomics are used.
  4. Fix the above using a CAS loop.
  5. (Bonus question for the above: Why isn’t std::atomic::compare_exchange_strong a good fit here?)

I especially like exercise three and four because it shows that atomics don’t always magically solve all your problems.

In simple uses cases where you just increment or decrement the atomic (even by values other than 1), this works out of the box because the C++ standard library has provided the convenience helpers to automatically emit atomic instructions for those operations. This works even with concurrent writers.

But if you’re doing more complex operations (e.g. the increment and modulus) which doesn’t have automatic compiler sugar, AND you have concurrent writers, you need to be careful to do a CAS loop.

They are good exercises especially, because they are a prerequisite for implementing a lockfree multi-producer/multi-consumer queue — possibly the most classic lockfree data structure (requiring CAS, unlike single-producer/single-consumer). Although the modulus calculation with the CAS loop isn’t strictly necessary; it can be optimized into a normal atomic increment (which will overflow), and then converting to an array index with modulus can be done by the reader.

Unlike MPMC, SPSC doesn’t require a CAS loop because there are no concurrent writers of the read and write index variables; the reader is the only one incrementing the read index, and likewise for the writer, so it’s sufficient to use simple atomic loads and stores. With SPSC, the read and write functions only have to be thread-safe with respect to each other. They do not have to be thread-safe with respect to themselves.

A little cheat sheet:

  • No concurrent readers and writers: Simple variable
  • Concurrent readers and writers. Writers doing “simple” update (e.g. increment): Use atomic, and simple C++ increment operators
  • Concurrent readers and writers. Writers doing “complex” update (e.g. increment & modulus): Use atomic, and CAS loop

Solutions:

1.

#include <iostream>
#include <thread>
int counter = 0;
void threadFunc()
{
    for (int i = 0; i < 5000; i++)
    {
        counter++;
    }
}
int main()
{
    auto th1 = std::thread(threadFunc);
    auto th2 = std::thread(threadFunc);
    th1.join();
    th2.join();
    std::cout << "counter " << counter << "\n";
    std::cout << "expected " << 10000 << "\n";
}
~/c/l/c/lockfree ❯ env CXXFLAGS="-std=c++17" make 1basic
c++ -std=c++17    1basic.cpp   -o 1basic
~/c/l/c/lockfree ❯ ./1basic
counter 5623
expected 10000
~/c/l/c/lockfree ❯ ./1basic
counter 6067
expected 10000
~/c/l/c/lockfree ❯ ./1basic
counter 5000
expected 10000
~/c/l/c/lockfree ❯ ./1basic
counter 6190
expected 10000

2.

#include <atomic>
#include <iostream>
#include <thread>
std::atomic<int> counter = 0;
void threadFunc()
{
    for (int i = 0; i < 5000; i++)
    {
        counter++;
    }
}
int main()
{
    auto th1 = std::thread(threadFunc);
    auto th2 = std::thread(threadFunc);
    th1.join();
    th2.join();
    std::cout << "counter " << counter << "\n";
    std::cout << "expected " << 10000 << "\n";
}
~/c/l/c/lockfree ❯ env CXXFLAGS="-std=c++17" make 2fixed
c++ -std=c++17    2fixed.cpp   -o 2fixed
~/c/l/c/lockfree ❯ ./2fixed
counter 10000
expected 10000
~/c/l/c/lockfree ❯ ./2fixed
counter 10000
expected 10000
~/c/l/c/lockfree ❯ ./2fixed
counter 10000
expected 10000
~/c/l/c/lockfree ❯ ./2fixed
counter 10000
expected 10000
~/c/l/c/lockfree ❯ ./2fixed
counter 10000
expected 10000
~/c/l/c/lockfree ❯ ./2fixed
counter 10000
expected 10000

3.

#include <atomic>
#include <iostream>
#include <thread>
std::atomic<int> counter = 0;
void threadFunc()
{
    for (int i = 0; i < (16 * 5000); i++)
    {
        counter = (counter + 1) % 16;
    }
}
int main()
{
    auto th1 = std::thread(threadFunc);
    auto th2 = std::thread(threadFunc);
    th1.join();
    th2.join();
    std::cout << "counter " << counter << "\n";
    std::cout << "expected " << 0 << "\n";
}
~/c/l/c/lockfree ❯ env CXXFLAGS="-std=c++17" make 3mod
c++ -std=c++17    3mod.cpp   -o 3mod
~/c/l/c/lockfree ❯ ./3mod
counter 15
expected 0
~/c/l/c/lockfree ❯ ./3mod
counter 12
expected 0
~/c/l/c/lockfree ❯ ./3mod
counter 6
expected 0
~/c/l/c/lockfree ❯ ./3mod
counter 3
expected 0
~/c/l/c/lockfree ❯ ./3mod
counter 9
expected 0
~/c/l/c/lockfree ❯ ./3mod
counter 13
expected 0

4. (Note: You might think this solution is wrong because the first load is outside the do-while loop. However, it does actually work because C++ std::atomic.compare_exchange_weak() updates the first parameter if comparison fails. (See below comments and discussion).)

#include <atomic>
#include <iostream>
#include <thread>
std::atomic<int> counter = 0;
void threadFunc()
{
    for (int i = 0; i < (16 * 5000); i++)
    {
        bool success = false;
        int counter_old = counter.load();
        do
        {
            int counter_new = (counter_old + 1) % 16;
            success = counter.compare_exchange_weak(counter_old, counter_new);
        } while (!success);
    }
}
int main()
{
    auto th1 = std::thread(threadFunc);
    auto th2 = std::thread(threadFunc);
    th1.join();
    th2.join();
    std::cout << "counter " << counter << "\n";
    std::cout << "expected " << 0 << "\n";
}
~/c/l/c/lockfree ❯ env CXXFLAGS="-std=c++17" make 4fixed
c++ -std=c++17    4fixed.cpp   -o 4fixed
~/c/l/c/lockfree ❯ ./4fixed
counter 0
expected 0
~/c/l/c/lockfree ❯ ./4fixed
counter 0
expected 0
~/c/l/c/lockfree ❯ ./4fixed
counter 0
expected 0
~/c/l/c/lockfree ❯ ./4fixed
counter 0
expected 0
~/c/l/c/lockfree ❯ ./4fixed
counter 0
expected 0
~/c/l/c/lockfree ❯ ./4fixed
counter 0
expected 0

5. std::atomic::compare_exchange_strong isn’t a good fit because we need to CAS loop anyway to retry in case there is contention. So we might as well use _weak for additional performance.

If for some reason retrying in case of contention was not necessary for this application, then compare_exchange_strong with no loop would be fine.

Rule of thumb: If you need to loop anyway, use _weak, but if looping is not necessary use _strong.


Appendix: CAS Loop: load before or inside loop?

(WIP-quality writing:) Re: #4, the code is correct either if the initial load happens before the CAS loop, or inside it. This is due to how C++ reloads the variable into the first argument if comparison failed. That semantic is actually just a reflection of the lower level architecture — on x86, CMPXCHG does a reload into RAX if the comparison fails.

Codegen – outside the loop (-O2)

#include <atomic>

std::atomic<int> counter = 0;

void threadFunc()
{
    for (int i = 0; i < (16 * 5000); i++)
    {
        bool success = false;
        int counter_old = counter.load();
        do
        {   
            int counter_new = (counter_old + 1);
            success = counter.compare_exchange_weak(counter_old, counter_new);
        } while (!success);
    }
}
threadFunc():
        mov     edx, 80000
.L3:
        mov     eax, DWORD PTR counter[rip]
.L2:
        lea     ecx, [rax+1]
        lock cmpxchg    DWORD PTR counter[rip], ecx
        jne     .L2
        sub     edx, 1
        jne     .L3
        ret
counter:
        .zero   4

Codegen – inside the loop (-O2)

#include <atomic>

std::atomic<int> counter = 0;

void threadFunc()
{
    for (int i = 0; i < (16 * 5000); i++)
    {
        bool success = false;

        do
        {   
            int counter_old = counter.load();
            int counter_new = (counter_old + 1);
            success = counter.compare_exchange_weak(counter_old, counter_new);
        } while (!success);
    }
}
readFunc():
        mov     edx, 80000
.L2:
        mov     eax, DWORD PTR counter[rip]
        lea     ecx, [rax+1]
        lock cmpxchg    DWORD PTR counter[rip], ecx
        jne     .L2
        sub     edx, 1
        jne     .L2
        ret
counter:
        .zero   4

The former actually is more efficient, as the load inside the loop is redundant because if the CAS loop ever retries, the local variable has already been reloaded. No need to explicitly reload.

3 thoughts on “4 basic multithreading & lockfree programming exercises

  1. crysr

    I believe in problem 4, your CAS loop is wrong, you should load in the loop,
    you can pass the test case because this is modulo arithmetic, every 16 increments
    , old_value get a chance to be equal with the atomic value, but this is not the intended behaviour.

    Reply
    1. Mark Post author

      I just re-tested and actually the code does work because of a sublte “feature” of C++ std::atomic.compare_exchange_weak.

      “`
      If those are bitwise-equal, replaces the former with desired (performs read-modify-write operation). Otherwise, loads the actual value stored in *this into expected (performs load operation).
      “`

      So if comparison fails, the local variable is actually updated!

      https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange

      Reply

Any thoughts?