Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / desktop / Win32

RWMutex: A Shared/Exclusive Recursive Mutex

4.98/5 (22 votes)
13 Dec 2018CPOL3 min read 63.5K  
A mutex with shared/exclusive access with upgrade/downgrade capability

Introduction

My aim was to create something that could act as a read/write locking mechanism. Any thread can lock it for reading, but only one thread can lock it for writing. Until the writing thread releases it, all other threads wait. A writing thread does not acquire the mutex until any other thread has released.

I could use Slim Reader/Writer locks, but:

  • They are not recursive, e.g., a call to AcquireSRWLockExclusive() will block if the same thread has called the same function earlier.
  • They are not upgradable, e.g., a thread which has locked the lock for read access can't lock it for write.
  • They are not copyable handles.

I could try C++ 14 shared_lock but I still need C++ 11 support. Besides, I'm not yet sure if it can actually fulfill my requirements.

Therefore, I had to implement it manually. The plain C++ 11 way was removed due to lack of WaitForMultipleObjects (nyi). Now with upgrade/downgrade capabilities.

RWMUTEX

My class is rather simple.

C++
class RWMUTEX
    {
    private:
        HANDLE hChangeMap;
        std::map<DWORD, HANDLE> Threads;
        RWMUTEX(const RWMUTEX&) = delete;
        RWMUTEX(RWMUTEX&&) = delete;

I need a std::map<DWORD,HANDLE> to store handles for all threads that try to access the shared resource, and I also need a mutex handle to make sure that all changes to this map are thread safe.

Constructor

C++
RWMUTEX(const RWMUTEX&) = delete;
void operator =(const RWMUTEX&) = delete;

RWMUTEX()
    {
    hChangeMapWrite = CreateMutex(0,0,0);
    }

Simply create a handle to the changing map mutex. The object should not be copyable.

CreateIf

C++
HANDLE CreateIf(bool KeepReaderLocked = false)
    {
                WaitForSingleObject(hChangeMap, INFINITE);
                DWORD id = GetCurrentThreadId();
                if (Threads[id] == 0)
                    {
                    HANDLE e0 = CreateMutex(0, 0, 0);
                    Threads[id] = e0;
                    }
                HANDLE e = Threads[id];
                if (!KeepReaderLocked)
                      ReleaseMutex(hChangeMap);
                return e; 
    }

This private member function is called when you call LockRead() or LockWrite() to lock the object. If the current thread has not already registered itself to the threads that might access this mutex, this function creates a mutex for that thread. If some other thread has locked this mutex for write access, this function will block until the writing thread releases the object. This function returns the mutex handle for the current thread.

LockRead/ReleaseRead

C++
HANDLE LockRead()
    {
    auto f = CreateIf();
    WaitForSingleObject(f,INFINITE);
    return f;
    }
void ReleaseRead(HANDLE f)
    {
    ReleaseMutex(f);
    }

These functions are called when you want to lock the object for read access and later release it.

LockWrite/ReleaseWrite

C++
void LockWrite()
    {
                CreateIf(true);

                // Wait for all 
                vector<HANDLE> AllThreads;
                AllThreads.reserve(Threads.size());
                for (auto& a : Threads)
                    {
                    AllThreads.push_back(a.second);
                    }

                WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, INFINITE);

                // Reader is locked
    }

void ReleaseWrite()
    {
    
    // Release All
    for (auto& a : Threads)
        ReleaseMutex(a.second);
    ReleaseMutex(hChangeMap);
    }

These functions are called when you want to lock the object for write access and later release it. LockWrite() function makes sure that:

  1. no new threads are registered during the lock, and
  2. any reading thread has released the lock

Destructor

C++
~RWMUTEX()
    {
    CloseHandle(hChangeMap);
    hChangeMap = 0;
    for (auto& a : Threads)
        CloseHandle(a.second);
    Threads.clear();
    }

The destructor makes sure that all handles are cleared.

Upgradable/Downgradable Locks

Sometimes, you want a read lock to be upgraded to a writing lock, without unlocking first, for efficiency. Therefore, LockWrite is modified as so:

C++
void LockWrite(DWORD updThread = 0)
{
    CreateIf(true);

    // Wait for all
    AllThreads.reserve(Threads.size());
    AllThreads.clear();
    for (auto& a : Threads)
    {
        if (updThread == a.first) // except ourself if in upgrade operation
            continue;
        AllThreads.push_back(a.second);
    }
    auto tim = WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, wi);

    if (tim == WAIT_TIMEOUT && wi != INFINITE)
        OutputDebugString(L"LockWrite debug timeout!");

    // We don't want to keep threads, the hChangeMap is enough
    // We also release the handle to the upgraded thread, if any
    for (auto& a : Threads)
        ReleaseMutex(a.second);

    // Reader is locked
}

void Upgrade()
{
    LockWrite(GetCurrentThreadId());
}

HANDLE Downgrade()
{
    DWORD id = GetCurrentThreadId();
    auto z = Threads[id];
    auto tim = WaitForSingleObject(z, wi);
    if (tim == WAIT_TIMEOUT && wi != INFINITE)
        OutputDebugString(L"Downgrade debug timeout!");
    ReleaseMutex(hChangeMap);
    return z;
}

Calling Upgrade() now results in:

  • Change map is locked
  • Wait for all reading threads to exit except our own

We then release our own threads mutex since locking the change map is enough.

Calling Downgrade() results in:

  • Getting the handle from the map directly, no need to relock
  • Lock this handle as if we are in read mode
  • Release the change map

So the entire code is (with some debugging aid):

C++
// RWMUTEX
    class RWMUTEX
        {
        private:
            HANDLE hChangeMap = 0;
            std::map<DWORD, HANDLE> Threads;
            DWORD wi = INFINITE;
            RWMUTEX(const RWMUTEX&) = delete;
            RWMUTEX(RWMUTEX&&) = delete;
            operator=(const RWMUTEX&) = delete;

        public:

            RWMUTEX(bool D = false)
                {
                if (D)
                    wi = 10000;
                else
                    wi = INFINITE;
                hChangeMap = CreateMutex(0, 0, 0);
                }

            ~RWMUTEX()
                {
                CloseHandle(hChangeMap);
                hChangeMap = 0;
                for (auto& a : Threads)
                    CloseHandle(a.second);
                Threads.clear();
                }

            HANDLE CreateIf(bool KeepReaderLocked = false)
                {
                auto tim = WaitForSingleObject(hChangeMap, INFINITE);
                if (tim == WAIT_TIMEOUT && wi != INFINITE)
                    OutputDebugString(L"LockRead debug timeout!");
                DWORD id = GetCurrentThreadId();
                if (Threads[id] == 0)
                    {
                    HANDLE e0 = CreateMutex(0, 0, 0);
                    Threads[id] = e0;
                    }
                HANDLE e = Threads[id];
                if (!KeepReaderLocked)    
                    ReleaseMutex(hChangeMap);
                return e;
                }

            HANDLE LockRead()
                {
                auto z = CreateIf();
                auto tim = WaitForSingleObject(z, wi);
                if (tim == WAIT_TIMEOUT && wi != INFINITE)
                    OutputDebugString(L"LockRead debug timeout!");
                return z;
                }

    void LockWrite(DWORD updThread = 0)
    {
        CreateIf(true);

        // Wait for all 
        AllThreads.reserve(Threads.size());
        AllThreads.clear();
        for (auto& a : Threads)
        {
            if (updThread == a.first) // except ourself if in upgrade operation
                continue;
            AllThreads.push_back(a.second);
        }
        auto tim = WaitForMultipleObjects((DWORD)AllThreads.size(), AllThreads.data(), TRUE, wi);

        if (tim == WAIT_TIMEOUT && wi != INFINITE)
            OutputDebugString(L"LockWrite debug timeout!");

        // We don't want to keep threads, the hChangeMap is enough
        // We also release the handle to the upgraded thread, if any
        for (auto& a : Threads)
            ReleaseMutex(a.second);

        // Reader is locked
    }

    void ReleaseWrite()
    {
        ReleaseMutex(hChangeMap);
    }

    void ReleaseRead(HANDLE f)
    {
        ReleaseMutex(f);
    }

    void Upgrade()
    {
        LockWrite(GetCurrentThreadId());
    }

    HANDLE Downgrade()
    {
        DWORD id = GetCurrentThreadId();
        auto z = Threads[id];
        auto tim = WaitForSingleObject(z, wi);
        if (tim == WAIT_TIMEOUT && wi != INFINITE)
            OutputDebugString(L"Downgrade debug timeout!");
        ReleaseMutex(hChangeMap);
        return z;
    }              
};

To use the RWMUTEX, you can simply create locking classes:

C++
class RWMUTEXLOCKREAD
    {
    private:
        RWMUTEX* mm = 0;
    public:

        RWMUTEXLOCKREAD(const RWMUTEXLOCKREAD&) = delete;
        void operator =(const RWMUTEXLOCKREAD&) = delete;

        RWMUTEXLOCKREAD(RWMUTEX*m)
            {
            if (m)
                {
                mm = m;
                mm->LockRead();
                }
            }
        ~RWMUTEXLOCKREAD()
            {
            if (mm)
                {
                mm->ReleaseRead();
                mm = 0;
                }
            }
    };

class RWMUTEXLOCKWRITE
    {
    private:
        RWMUTEX* mm = 0;
    public:
        RWMUTEXLOCKWRITE(RWMUTEX*m)
            {
            if (m)
                {
                mm = m;
                mm->LockWrite();
                }
            }
        ~RWMUTEXLOCKWRITE()
            {
            if (mm)
                {
                mm->ReleaseWrite();
                mm = 0;
                }
            }
    };

And a new class for the upgrade mechanism:

C++
class RWMUTEXLOCKREADWRITE
{
private:
    RWMUTEX* mm = 0;
    HANDLE lm = 0;
    bool U = false;
public:

    RWMUTEXLOCKREADWRITE(const RWMUTEXLOCKREADWRITE&) = delete;
    void operator =(const RWMUTEXLOCKREADWRITE&) = delete;

    RWMUTEXLOCKREADWRITE(RWMUTEX*m)
    {
        if (m)
        {
            mm = m;
            lm = mm->LockRead();
        }
    }

    void Upgrade()
    {
        if (mm && !U)
        {
            mm->Upgrade();
            lm = 0;
            U = 1;
        }
    }

    void Downgrade()
    {
        if (mm && U)
        {
            lm = mm->Downgrade();
            U = 0;
        }
    }

    ~RWMUTEXLOCKREADWRITE()
    {
        if (mm)
        {
            if (U)
                mm->ReleaseWrite();
            else
                mm->ReleaseRead(lm);
            lm = 0;
            mm = 0;
        }
    }
};

Sample usage:

C++
RWMUTEX m;

// ... other code
void foo1() {
  RWMUTEXLOCKREAD lock(&m);
  }

void foo2() {
 RWMUTEXLOCKWRITE lock(&m);
}

History

  • 13-12-2018: Added upgrading mechanism
  • 10-12-2017: Added debugging aids and fixed a read/write deadlock by allowing ReleaseRead() not to call CreateIf()
  • 12-05-2017: Fixed rare deadlock in writers, and simplified class
  • 23-08-2016: Fixed deadlock in reader unlocking
  • 17-12-2015: Fixed race bug in reading (and removed C++ 11 implementation)
  • 12-12-2015: First release

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)