|
#ifndef UTIL_PCQUEUE_H |
|
#define UTIL_PCQUEUE_H |
|
|
|
#include "exception.hh" |
|
|
|
#include <boost/interprocess/sync/interprocess_semaphore.hpp> |
|
#include <boost/scoped_array.hpp> |
|
#include <boost/thread/mutex.hpp> |
|
#include <boost/utility.hpp> |
|
|
|
#include <cerrno> |
|
|
|
#ifdef __APPLE__ |
|
#include <mach/semaphore.h> |
|
#include <mach/task.h> |
|
#include <mach/mach_traps.h> |
|
#include <mach/mach.h> |
|
#endif |
|
|
|
namespace util { |
|
|
|
|
|
|
|
|
|
#ifdef __APPLE__ |
|
|
|
#define MACH_CALL(call) UTIL_THROW_IF(KERN_SUCCESS != (call), Exception, "Mach call failure") |
|
|
|
class Semaphore { |
|
public: |
|
explicit Semaphore(int value) : task_(mach_task_self()) { |
|
MACH_CALL(semaphore_create(task_, &back_, SYNC_POLICY_FIFO, value)); |
|
} |
|
|
|
~Semaphore() { |
|
MACH_CALL(semaphore_destroy(task_, back_)); |
|
} |
|
|
|
void wait() { |
|
MACH_CALL(semaphore_wait(back_)); |
|
} |
|
|
|
void post() { |
|
MACH_CALL(semaphore_signal(back_)); |
|
} |
|
|
|
private: |
|
semaphore_t back_; |
|
task_t task_; |
|
}; |
|
|
|
inline void WaitSemaphore(Semaphore &semaphore) { |
|
semaphore.wait(); |
|
} |
|
|
|
#else |
|
typedef boost::interprocess::interprocess_semaphore Semaphore; |
|
|
|
inline void WaitSemaphore (Semaphore &on) { |
|
while (1) { |
|
try { |
|
on.wait(); |
|
break; |
|
} |
|
catch (boost::interprocess::interprocess_exception &e) { |
|
if (e.get_native_error() != EINTR) { |
|
throw; |
|
} |
|
} |
|
} |
|
} |
|
|
|
#endif |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
template <class T> class PCQueue : boost::noncopyable { |
|
public: |
|
explicit PCQueue(size_t size) |
|
: empty_(size), used_(0), |
|
storage_(new T[size]), |
|
end_(storage_.get() + size), |
|
produce_at_(storage_.get()), |
|
consume_at_(storage_.get()) {} |
|
|
|
|
|
void Produce(const T &val) { |
|
WaitSemaphore(empty_); |
|
{ |
|
boost::unique_lock<boost::mutex> produce_lock(produce_at_mutex_); |
|
try { |
|
*produce_at_ = val; |
|
} |
|
catch (...) { |
|
empty_.post(); |
|
throw; |
|
} |
|
if (++produce_at_ == end_) produce_at_ = storage_.get(); |
|
} |
|
used_.post(); |
|
} |
|
|
|
|
|
T& Consume(T &out) { |
|
WaitSemaphore(used_); |
|
{ |
|
boost::unique_lock<boost::mutex> consume_lock(consume_at_mutex_); |
|
try { |
|
out = *consume_at_; |
|
} |
|
catch (...) { |
|
used_.post(); |
|
throw; |
|
} |
|
if (++consume_at_ == end_) consume_at_ = storage_.get(); |
|
} |
|
empty_.post(); |
|
return out; |
|
} |
|
|
|
|
|
|
|
T Consume() { |
|
T ret; |
|
Consume(ret); |
|
return ret; |
|
} |
|
|
|
private: |
|
|
|
Semaphore empty_; |
|
|
|
Semaphore used_; |
|
|
|
boost::scoped_array<T> storage_; |
|
|
|
T *const end_; |
|
|
|
|
|
T *produce_at_; |
|
boost::mutex produce_at_mutex_; |
|
|
|
|
|
T *consume_at_; |
|
boost::mutex consume_at_mutex_; |
|
|
|
}; |
|
|
|
} |
|
|
|
#endif |
|
|