Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions include/rigtorp/MPMCQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ SOFTWARE.
#include <memory>
#include <new> // std::hardware_destructive_interference_size
#include <stdexcept>
#include <type_traits>

#ifndef __cpp_aligned_new
#ifdef _WIN32
Expand Down Expand Up @@ -253,6 +254,33 @@ class Queue {
}
}

template <typename Consumer> bool try_consume(Consumer &&c) noexcept {
#ifdef __cpp_lib_is_invocable
static_assert(std::is_nothrow_invocable_v<Consumer, T &&>,
"Consumer must be noexcept invocable with T&&");
#endif
auto tail = tail_.load(std::memory_order_acquire);
for (;;) {
auto &slot = slots_[idx(tail)];
if (turn(tail) * 2 + 1 == slot.turn.load(std::memory_order_acquire)) {
if (tail_.compare_exchange_strong(tail, tail + 1)) {
static_assert(noexcept(c(slot.move())),
"Consumer must be noexcept invocable with T&&");
c(slot.move());
slot.destroy();
slot.turn.store(turn(tail) * 2 + 2, std::memory_order_release);
return true;
}
} else {
auto const prevTail = tail;
tail = tail_.load(std::memory_order_acquire);
if (tail == prevTail) {
return false;
}
}
}
}

private:
constexpr size_t idx(size_t i) const noexcept { return i % capacity_; }

Expand Down
69 changes: 65 additions & 4 deletions src/MPMCQueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,77 @@ int main(int argc, char *argv[]) {
{
MPMCQueue<std::unique_ptr<int>> q(16);
// lvalue
auto v = std::unique_ptr<int>(new int(1));
// auto v = std::unique_ptr<int>(new int(1));
// q.emplace(v);
// q.try_emplace(v);
// q.push(v);
// q.try_push(v);
// xvalue
q.emplace(std::unique_ptr<int>(new int(1)));
q.try_emplace(std::unique_ptr<int>(new int(1)));
q.push(std::unique_ptr<int>(new int(1)));
q.try_push(std::unique_ptr<int>(new int(1)));
q.try_emplace(std::unique_ptr<int>(new int(2)));
q.push(std::unique_ptr<int>(new int(3)));
q.try_push(std::unique_ptr<int>(new int(4)));

std::unique_ptr<int> v;
q.pop(v);
assert(*v == 1);
assert(q.try_pop(v) == true);
assert(*v == 2);
assert(q.try_consume([&v](auto &&w) noexcept { v = std::move(w); }) ==
true);
assert(*v == 3);
}

// Check that pointer have same address and value after moving through queue
{
MPMCQueue<std::unique_ptr<int>> q(1);
{
auto up = std::make_unique<int>(1);
auto *rp = up.get();
std::unique_ptr<int> up2;
assert(q.try_emplace(std::move(up)) == true);
assert(q.try_consume([&](std::unique_ptr<int> v) noexcept {
up2 = std::move(v);
}) == true);
assert(rp == up2.get());
assert(*up2 == 1);
}

{
auto up = std::make_unique<int>(1);
auto *rp = up.get();
std::unique_ptr<int> up2;
q.emplace(std::move(up));
assert(q.try_consume([&](std::unique_ptr<int> v) noexcept {
up2 = std::move(v);
}) == true);
assert(rp == up2.get());
assert(*up2 == 1);
}

{
auto up = std::make_unique<int>(1);
auto *rp = up.get();
std::unique_ptr<int> up2;
q.push(std::move(up));
assert(q.try_consume([&](std::unique_ptr<int> v) noexcept {
up2 = std::move(v);
}) == true);
assert(rp == up2.get());
assert(*up2 == 1);
}

{
auto up = std::make_unique<int>(1);
auto *rp = up.get();
std::unique_ptr<int> up2;
assert(q.try_push(std::move(up)) == true);
assert(q.try_consume([&](std::unique_ptr<int> v) noexcept {
up2 = std::move(v);
}) == true);
assert(rp == up2.get());
assert(*up2 == 1);
}
}

{
Expand Down