Click here to Skip to main content
15,877,103 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
I have this multithreading (concurrent queue) C++ code where a single consumer executes tasks submitted to the concurrent queue by multiple producers. Assuming that the consumer can handle events faster than that the producers can produce them, otherwise behaviour is undefined.

I can compile & run my program normally, but my teammate the queue said he got a segmentation fault (SEGFAULT) occurs while trying to execute one of the tasks in the queue. An example case has been implemented in the main function of the file that can reproduce the problem (at least on his PC, it could be dependent on how fast the tasks get executed and how the threads will be scheduled on other systems).

Please tell me:

1. Good ways to write testing code to find the bug (not only SEGFAULT but any possible bugs).

2. Ways to debug those bugs in multithreading program.

3. If there's no bug can be discovered, how should we write testing code for the thread safety of the class?

4. Should I use unique_lock and condition_variable instead of lock_guard? If so, how to implement it?

5. Can you give me suggestion/reviews for my code below to reduce overhead, cleaner code of multithreading even if there's no bugs?

I'd appreciate if you can show me some codes to improve my code performance.

C++
#include <cmath>
#include <functional>
#include <iostream>
#include <mutex>
#include <stdexcept>
#include <thread>

template<typename T, uint64_t SIZE = 4096, uint64_t MAX_SPIN_ON_BUSY = 40000000>
class ConcurrentQueue {
private:
    static constexpr unsigned Log2(unsigned n, unsigned p = 0) {
        return (n <= 1) ? p : Log2(n / 2, p + 1);
    }

    static constexpr uint64_t closestExponentOf2(uint64_t x) {
        return (1UL << ((uint64_t) (Log2(SIZE - 1)) + 1));
    }

    static constexpr uint64_t mRingModMask = closestExponentOf2(SIZE) - 1;
    static constexpr uint64_t mSize = closestExponentOf2(SIZE);

    static const T mEmpty;

    T mMem[mSize];
    std::mutex mLock;
    uint64_t mReadPtr = 0;
    uint64_t mWritePtr = 0;

public:
    const T& pop() {
        if (!peek()) {
            return mEmpty;
        }

        std::lock_guard<std::mutex> lock(mLock);

        if (!peek()) {
            return mEmpty;
        }

        T& ret = mMem[mReadPtr & mRingModMask];

        mReadPtr++;
        return ret;
    }

    bool peek() const {
        return (mWritePtr != mReadPtr);
    }

    uint64_t getCount() const {
        return mWritePtr > mReadPtr ? mWritePtr - mReadPtr : mReadPtr - mWritePtr;
    }

    bool busyWaitForPush() {
        uint64_t start = 0;
        while (getCount() == mSize) {
            if (start++ > MAX_SPIN_ON_BUSY) {
                return false;
            }
        }
        return true;
    }

    void push(const T& pItem) {
        if (!busyWaitForPush()) {
            throw std::runtime_error("Concurrent queue full cannot write to it!");
        }

        std::lock_guard<std::mutex> lock(mLock);
        mMem[mWritePtr & mRingModMask] = pItem;
        mWritePtr++;
    }

    void push(T&& pItem) {
        if (!busyWaitForPush()) {
            throw std::runtime_error("Concurrent queue full cannot write to it!");
        }

        std::lock_guard<std::mutex> lock(mLock);
        mMem[mWritePtr & mRingModMask] = std::move(pItem);
        mWritePtr++;
    }
};

template<typename T, uint64_t SIZE, uint64_t MAX_SPIN_ON_BUSY>
const T ConcurrentQueue<T, SIZE, MAX_SPIN_ON_BUSY>::mEmpty = T{ };

int main(int, char**) {
    using Functor = std::function<void()>;

    ConcurrentQueue<Functor*> queue;

    std::thread consumer([ & ] {
        while (true) {
            if (queue.peek()) {
                auto task = queue.pop();
                (*task)();
                delete task;
            }
        }
    });

    std::thread producer([ & ] {
        uint64_t counter = 0;
        while (true) {
            auto taskId = counter++;
            auto newTask = new Functor([ = ] {
                std::cout << "Running task " << taskId << std::endl << std::flush;
            });
            queue.push(newTask);
        }
    });

    consumer.join();
    producer.join();
    return 0;
}


What I have tried:

I've built the code for single consumer and multiple producer using queue, lock_guard.
Posted
Updated 11-Sep-22 4:53am
v2
Comments
Member 15627495 11-Sep-22 2:55am    
do you set your aside Thread in MTA ? a big amount of work leads to seg fault...

about the code to compute, are there any 'memory leaks' ?
as the 'main' thread is to keep for basic running ( often GUI ... ), can you identify at a moment, which thread is running ? you tell you have it in the queue ( it's a good start, a good place to )

do you use 'return value' for your thread and task ? If you want to follow their execution, token or return value are good tools to achieve.

you can write a class Thread for a better management.

I would start by debugging the locking code. One way you can do this using output messages and this usually called "printf debugging" since the implementations usually used printf. What I would do is add an output statement similar to what you have in pop() :
C++
const T& pop() {
    if (!peek()) {
        return mEmpty;
    }

    std::lock_guard<std::mutex> lock(mLock);

    if (!peek()) {
        return mEmpty;
    }

    auto masked = mReadPtr & mRingModMask;

    std::cout << "task " << taskId << " has acquired the lock and processing " << masked << std::endl << std::flush;

    T& ret = mMem[ masked ];

    mReadPtr++;

    std::cout << "task " << taskId << " is releasing the lock" << std::endl << std::flush;

    return ret;
}
then run your program a bit and see if you have interleaved messages. This would indicate the lock is not working correctly. What I mean by interleaved is you should see messages from the same task acquiring and then releasing the lock but if you see messages from other tasks acquiring the lock then you know you have a problem.

If the lock appears to be working correctly then have a look at your use of the mMem array's indexes. A segmentation fault can be caused by an invalid array index such as a negative value or one that exceeds SIZE. The reason I split out the array index to a variable called masked is so you can output its value. You might want to output this in other places too.

One thing that could be happening is your friends machine could be much slower or much faster than yours and it is exacerbating a flaw in your code.

I hope this is helpful for you.
 
Share this answer
 
Comments
Member 14969632 11-Sep-22 6:07am    
Thanks Rick. My code has exception thrown read access violation after producer thread printed Running task... about 4100 times. Currently, I can see messages printed in consumer thread pop() , then producer printed.

task 0 has acquired the lock
task 0 is releasing
Running task 0
...


The program exited with code 0 (0x0) and showed: "Exception thrown at 0x00406CE5 in Project1.exe: 0xC0000005: Access violation reading location 0xDDDDDDDD."

Besides, how about the way to write testing code for thread safety checking and other issues in threads?
Rick York 11-Sep-22 12:28pm    
I write lots and lots of multithreaded code and it is very tricky to debug. I (nearly) always do it in two phases. If possible, I first thoroughly debug the algorithm. That is not always possible like with a reduction algorithm, for example. Then I debug the locking, when it is used. When possible I avoid locks but your algorithm requires them so I make sure that works correctly. Then it's integration time where the whole is tried together. I would guess you have an array indexing problem. I would add output statements so you see every array index accessed and use a temporary variable for that as I did in the snippet above. Also - output the value returned from pop. That pointer is dereferenced to execute the function queued and it is likely that it is the errant pointer that causes the exception. I would add more output so you can see what values result in the exception. One more thing - you use a default of 4096 for the array size. If you can, try it with much smaller values like 8 or 16 and see if the same things happen. If the behavior is the same then you should see it happen much sooner which will make it easier to debug. Personally, I don't like how you handle the array index. I would never do it that way, but that's just me. I would limit the values and force them to wrap around instead of masking. Again, that's just my personal view.
Rick York 11-Sep-22 12:33pm    
One other thing that just occurred to me - when you access the value from the array and return it in pop, you could also null the slot. It is possible that you are trying to access an object that was already deleted and this is likely to happen when you reach the end of the array and wrap around. It might help to if you check for that condition (null pointer) and catch it when it happens.
Siddhant K.Singh 24-Apr-23 1:48am    
Was this resolved, did you find the root cause of this error?? If yes could you please help? Stuck on it for some time now.
Writing correct multithreaded code is hard, and debugging it is even harder, which is why I advocate a far less error-prone approach[^] in which each thread runs unpreemptably (locked) and yields in its thread loop after completing one or more units of work. You may not be able to use that approach here, but you should consider it whenever you can.

A condition variable allows a thread to sleep and be woken up when it has work to do. Right now, your consumer thread just loops furiously until it gets scheduled out if peek() returns false. When the queue is empty, it should sleep instead, and the producer should wake it up after adding a work item to an empty queue. See this interface[^] and this implementation[^].

By the way, the standard signal is SIGSEGV, not SIGFAULT. But you won't see it on Windows, which uses the "structured exception" value 0xC0000005 instead. The value 0xDDDDDDDD means that you are using a bad pointer. I can't guess how that occurs, because 0x00406CE5 is an address in your executable. You could set a breakpoint at that address so that you can look at the call stack when the problem occurs. This might help you to debug the problem.
 
Share this answer
 
v2

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900