How to implement thread-safe container with natural syntactic sugar?

  • A+
Category:Languages

Preface

Below code results in undefined behaviour, if used as is:

vector<int> vi; ... vi.push_back(1);  // thread-1 ... vi.pop(); // thread-2 

Traditional approach is to fix it with std::mutex:

vi_mutex.lock(); vi.push_back(1); vi_mutex.unlock(); 

Encapsulating it in macro, it can be slightly simplified. However, as the code grows, such things start looking cumbersome, as for every object, we may have to maintain a mutex.

Objective

Without compromising in the syntax of accessing an object and declaring an explicit mutex, I would like to create a template such that, it does all the boilerplate work. e.g.

Concurrent<vector<int>> vi;  // specific `vi` mutex is auto declared in this wrapper ... vi.push_back(1); // thread-1: locks `vi` only until `push_back()` is performed ... vi.pop ()  // thread-2: locks `vi` only until `pop()` is performed 

In current C++, it's impossible to achieve this. However, I have attempted a code where if just change vi. to vi->, then the things work as expected in above code comments.

Code

It's rather long code; the name Concurrent is inspired from C#:

// An intermediate class, which hides various `begin()` methods and all the other methods are accessible using pointer template<class Class> class Protected : public Class {   public:    using Class::Class;   protected: using Class::begin;   protected: using Class::cbegin;   protected: using Class::rbegin;   protected: using Class::crbegin;    // To encapsulate an `Class::iterator`; It will hold the locked mutex, until it remains in the scope; Not useful for non-iterating classes   protected: template<class Base,                       class Mutex>              struct Safe : Base              {                Mutex& m_rMutex;                 Safe (Base&& object,                      Mutex& mutex) :                Base(std::move(object)),                m_rMutex(mutex)                { m_rMutex.lock(); PRINT(this); }                 ~Safe() { PRINT(this); m_rMutex.unlock(); }              };   protected: template<typename... Args>              auto              Iterator (Args&&... args)               {                 return Safe<Args...>(std::forward<Args>(args)...);               } };  // The primary class, which encapsulate all the aspects of the class; // The various `begin()`s are overloaded though; Using `operator->`, everything can be accessed inside the original class template<class Class,          class Mutex = std::mutex> class Concurrent : protected Protected<Class> {   public: using Protected<Class>::Protected;   public: using Class::end;   public: using Class::cend;   public: using Class::rend;   public: using Class::crend;    private: struct SafeThis            {              Concurrent* const m_This;              Mutex& m_rMutex;               SafeThis (Concurrent* const this_,                        Mutex& mutex) :              m_This(this_),              m_rMutex(mutex)              { m_rMutex.lock(); PRINT(this); }               ~SafeThis () { PRINT(this); m_rMutex.unlock(); }               Protected<Class>* operator-> () { PRINT(this); return m_This; }            };    public: SafeThis ScopeLocked () { return SafeThis(this, m_Mutex); }   public: SafeThis operator-> () { return ScopeLocked(); }    public: auto& operator[] (const uint64_t index) { return (*this)->at(index); }   public: const auto& operator[] (const uint64_t index) const { return (*this)->at(index); }  #define DEFINE(NAME) auto NAME () { return this->Iterator(Class::NAME(), m_Mutex); }   public: DEFINE(begin)   public: DEFINE(cbegin)   public: DEFINE(rbegin)   public: DEFINE(crbegin) #undef DEFINE    private: Mutex m_Mutex; }; 

Test

#include<iostream> #include<mutex> #include<thread> #include<vector>  std::mutex mutex; #define PRINT(VALUE) std::cout << __FUNCTION__ << "(" << std::this_thread::get_id() << "): " << VALUE << "/n"  /* above utility code here */  Concurrent<std::vector<int>> vi{8,9};  // Test: constructors  void Thread () {   PRINT("started...");   vi->push_back(2);  // Test: calling a method }  int main () {   std::thread run(Thread);   PRINT("started...");    vi->push_back(1);  // Test: calling a method   for(auto it = vi.begin(); it != vi.end(); ++it)  // Test: iterating     PRINT(*it);  // Test: accessing iteration value    PRINT(vi[0]);  // Test: `operator[]`    run.join(); //  for(const auto& i : vi)  <--- needs fixing //    PRINT(i);    auto it = std::begin(vi);  // Test: `std::begin()`   PRINT(*++it);  // Test: accessing with `operator++` } 

Demo

Questions

  • Is using the temporary object to call a function with overloaded operator->() leads to undefined behavior in C++
  • The range based for loop syntax is not working in this case. It gives compilation error. What is the correct way to fix it?
  • Does this small utility class serve the purpose of thread-safety for an encapsulated object in all the cases?

Clarifications

There are some productive comments here. However, I see that those are trivial issue, unless I am missing something badly.

First of all, all the iterator methods are overloaded. You may see Concurrent::begin() methods. The purpose is to lock the object for longer time. Hence following should be safe:

case-1:

// thread-1 // it remains locked till the `for` scope for(auto it = vi.begin(); it != vi.end; ++it)   *it = 0;  // thread-2 // runs either before or after, but NOT during above `for` loop vi->clear(); 

case-2:

Possible race condition when .size() is taken and .at() is called; but in between another thread clears the vector. This is more of a design issue and just by adding a simple named function, which is actually exactly the same as operator->():

// thread-1 gets a locked object within {} {   auto viLocked = vi.ScopeLocked();   if(viLocked->size() > 0)     viLocked.at(0); }  // here the scoped lock is gone, as the name suggests  // thread-2 clears before or after, but not during the scope {} of above vi->clear(); 

The Demo is updated. Please let me know if anything is missed.

 


Don't do this.

It's almost impossible to make a thread safe collection class in which every method takes a lock.

Consider the following instance of your proposed Concurrent class.

Concurrent<vector<int>> vi; 

A developer might come along and do this:

 int result = 0;  if (vi.size() > 0)  {      result = vi.at(0);  } 

And another thread might make this change in between the first threads call to size() and at(0).

vi.clear(); 

So now, the synchronized order of operations is:

vi.size()  // returns 1 vi.clear() // sets the vector's size back to zero vi.at(0)   // throws exception since size is zero 

So even though you have a thread safe vector class, two competing threads can result in an exception being thrown in unexpected places.

That's just the simplest example. There are other ways in which multiple threads attempting to read/write/iterate at the same time could inadvertently break your guarantee of thread safety.

You mentioned that the whole thing is motivated by this pattern being cumbersome:

vi_mutex.lock(); vi.push_back(1); vi_mutex.unlock(); 

In fact, there are helper classes that will make this cleaner, namely lock_guard that will take a mutex to lock in its constructor and unlock on the destructor

{     lock_guard<mutex> lck(vi_mutex);     vi.push_back(1); } 

Then other code in practice becomes thread safe ala:

{      lock_guard<mutex> lck(vi_mutex);      result = 0;      if (vi.size() > 0)      {          result = vi.at(0);      } } 

Comment

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: