I'm trying to implement a producer/consumer model multithreaded program in C++ for a project I'm working on. The basic idea is that the main thread creates a second thread to watch a serial port for new data, process the data and put the result in a buffer that is periodically polled by the main thread. I've never written multi-threaded programs before. I've been reading lots of tutorials, but they're all in C. I think I've got a handle on the basic concepts, but I'm trying to c++ify it. For the buffer, I want to create a data class with mutex protection built in. This is what I came up with.
1) Am I going about this the wrong way? Is there a smarter way to implement a protected data class?
2) What will happen in the following code if two threads try to call ProtectedBuffer::add_back() at the same time?
#include <deque>
#include "pthread.h"
template <class T>
class ProtectedBuffer {
  std::deque<T> buffer;
  pthread_mutex_t mutex;
public:
  void add_back(T data) {
    pthread_mutex_lock(&mutex);
    buffer.push_back(data);
    pthread_mutex_unlock(&mutex);
  }
  void get_front(T &data) {
    pthread_mutex_lock(&mutex);
    data = buffer.front();
    buffer.pop_front();
    pthread_mutex_unlock(&mutex);
  }
};
Edit: Thanks for all the great suggestions. I've tried to implement them below. I also added some error checking so if a thread somehow manages to try to lock the same mutex twice it will fail gracefully. I think.
#include "pthread.h"
#include <deque>
class Lock {
    pthread_mutex_t &m;
    bool locked;
    int error;
public:
    explicit Lock(pthread_mutex_t & _m) : m(_m) {
        error = pthread_mutex_lock(&m);
        if (error == 0) {
            locked = true;
        } else {
            locked = false;
        }
    }
    ~Lock() {
        if (locked)
            pthread_mutex_unlock(&m);
    }
    bool is_locked() {
        return locked;
    }
};
class TryToLock {
    pthread_mutex_t &m;
    bool locked;
    int error;
public:
    explicit TryToLock(pthread_mutex_t & _m) : m(_m) {
        error = pthread_mutex_trylock(&m);
        if (error == 0) {
            locked = true;
        } else {
            locked = false;
        }
    }
    ~TryToLock() {
        if (locked)
            pthread_mutex_unlock(&m);
    }
    bool is_locked() {
        return locked;
    }
};
template <class T>
class ProtectedBuffer{
    pthread_mutex_t mutex;
    pthread_mutexattr_t mattr;
    std::deque<T> buffer;
    bool failbit;
    ProtectedBuffer(const ProtectedBuffer& x);
    ProtectedBuffer& operator= (const ProtectedBuffer& x);
public:
    ProtectedBuffer() {
        pthread_mutexattr_init(&mattr);
        pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ERRORCHECK);
        pthread_mutex_init(&mutex, &mattr);
        failbit = false;
    }
    ~ProtectedBuffer() {
        pthread_mutex_destroy(&mutex);
        pthread_mutexattr_destroy(&mattr);
    }
    void add_back(T &data) {
        Lock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        buffer.push_back(data);
        failbit = false;
    }
    void get_front(T &data) {
        Lock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        if (buffer.empty()) {
            failbit = true;
            return;
        }
        data = buffer.front();
        buffer.pop_front();
        failbit = false;
    }
    void try_get_front(T &data) {
        TryToLock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        if (buffer.empty()) {
            failbit = true;
            return;
        }
        data = buffer.front();
        buffer.pop_front();
        failbit = false;
    }
    void try_add_back(T &data) {
        TryToLock lck(mutex);
        if (!lck.locked()) {
            failbit = true;
            return;
        }
        buffer.push_back(data);
        failbit = false;
    }
};
Several things:
You need to initialize mutex with pthread_mutex_init in the constructor and free it with pthread_mutex_destroy in the destructor.
You must make your class non-copyable and non-assignable (or otherwise implement copy constructor and assignment operator correctly; see above).
It's worthwhile making a SBRM helper class for the lock:
class Lock
{
    pthread_mutex_t & m;
public:
    explicit Lock(pthread_mutex_t & _m) : m(_m) { pthread_mutex_lock(&m); }
    ~Lock() { pthread_mutex_unlock(&m); }
};
Now you can make a synchronized scope like { Lock lk(mutex); /* ... */ }.
As for Question 2: Concurrent access is serialized by means of locking the mutex. One of the competing threads will sleep on the acquisition of the mutex lock.
Am I going about this the wrong way? Is there a smarter way to implement a protected data class?
For the implementation you have, I think you have a good start. Since you asked about C++ifying, then if you have a compiler that supports C++11, you can use the new thread support.
You mentioned you wanted the main thread to poll this buffer, but I didn't see any mechanism that would allow it to do so. Either get_front should provide an error when there is nothing in the buffer, or get_buffer should block the caller until data is available.
#include <deque>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
template <class T>
class ProtectedBuffer {
  std::deque<T> buffer;
  std::mutex mtx;
  std::condition_variable empty_cnd;
  void get_front_i(T &data) {
    data = buffer.front();
    buffer.pop_front();
  }
public:
  void add_back(T data) {
    std::lock_guard<std::mutex> g(mtx);
    bool was_empty = buffer.empty();
    buffer.push_back(data);
    if (was_empty) empty_cnd.notify_one();
  }
  void get_front_check(T &data) {
    std::lock_guard<std::mutex> g(mtx);
    if (buffer.empty()) throw std::underflow_error("no data");
    get_front_i(data);
  }
  void get_front_block(T &data) {
    std::lock_guard<std::mutex> g(mtx);
    std::unique_lock<std::mutex> u(mtx);
    while (buffer.empty()) empty_cnd.wait(u);
    get_front_i(data);
    if (!buffer.empty()) empty_cnd.notify_one();
  }
};
If you wanted to bound how much data you add to your buffer, you can add a similar full_cnd condition variable to check for the full condition on which the add_back call would wait on if it were true. Then, the get_front_i method could signal when the buffer wasn't full anymore.
What will happen in the following code if two threads try to call ProtectedBuffer::add_back() at the same time?
Since add_back is protected from mutual exclusion, if two threads call it at the same time, one thread will be blocked from calling push_back until the other thread is done.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With