Skip to content

Commit 90d41bf

Browse files
authored
fix(search): Use thread lock for hnsw multi reader/multi writer mutex (#6205)
fix(hnsw): Use thread lock for hnsw multi reader/multi writer mutex When we expire keys in periodic heartbeat we shouldn't take any fiber locks when removing documents from hnsw index. Use thread lock instead. Signed-off-by: mkaruza <[email protected]>
1 parent 62a0328 commit 90d41bf

File tree

2 files changed

+60
-26
lines changed

2 files changed

+60
-26
lines changed

src/core/search/mrmw_mutex.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22
// See LICENSE for licensing terms.
33
//
44

5+
#include <condition_variable>
6+
57
#include "base/logging.h"
6-
#include "util/fibers/synchronization.h"
8+
#include "base/spinlock.h"
79

810
namespace dfly::search {
911

@@ -50,16 +52,18 @@ class MRMWMutex {
5052
return target_mode == LockMode::kReadLock ? reader_waiters_ : writer_waiters_;
5153
};
5254

53-
inline util::fb2::CondVar& GetCondVar(LockMode target_mode) {
55+
inline std::condition_variable_any& GetCondVar(LockMode target_mode) {
5456
return target_mode == LockMode::kReadLock ? reader_cond_var_ : writer_cond_var_;
5557
};
5658

5759
static inline LockMode GetInverseMode(LockMode mode) {
5860
return mode == LockMode::kReadLock ? LockMode::kWriteLock : LockMode::kReadLock;
5961
}
6062

61-
util::fb2::Mutex mutex_;
62-
util::fb2::CondVar reader_cond_var_, writer_cond_var_;
63+
// TODO: use fiber sync primitives in future
64+
base::SpinLock mutex_;
65+
std::condition_variable_any reader_cond_var_, writer_cond_var_;
66+
6367
size_t writer_waiters_ = 0, reader_waiters_ = 0;
6468
size_t active_runners_ = 0;
6569
LockMode lock_mode_;

src/core/search/mrmw_mutex_test.cc

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -176,49 +176,79 @@ TEST_F(MRMWMutexTest, WriterAfterReaders) {
176176
EXPECT_EQ(write_count.load(), 0);
177177
}
178178

179-
// Test 5: Mix of readers and writes
180-
TEST_F(MRMWMutexTest, MixWritersReaders) {
179+
TEST_F(MRMWMutexTest, MixWritersReadersOnDifferentFibers) {
181180
std::atomic<size_t> read_count(0);
182181
std::atomic<size_t> write_count(0);
183182

184183
// Start multiple readers and writers
185184
const int num_threads = 100;
186185
std::vector<util::fb2::Fiber> threads;
187-
threads.reserve(num_threads + 1);
186+
threads.reserve(num_threads);
188187

189-
// Add long read task that will block all write tasks
190-
threads.emplace_back(
191-
pp_->at(0)->LaunchFiber([&] { ReadTask(&mutex_, std::ref(read_count), 2000); }));
192-
193-
// Give long writer time to acquire the lock
194-
util::ThisFiber::SleepFor(std::chrono::milliseconds(100));
195-
196-
size_t write_threads = 0;
197188
for (int i = 0; i < num_threads; ++i) {
198-
size_t fiber_id = rand() % 2;
199189
if (rand() % 3) {
200-
threads.emplace_back(pp_->at(fiber_id)->LaunchFiber(util::fb2::Launch::post, [&] {
190+
threads.emplace_back(pp_->at(0)->LaunchFiber(util::fb2::Launch::post, [&] {
201191
ReadTask(&mutex_, std::ref(read_count), kReadTaskSleepTime);
202192
}));
203193
} else {
204-
write_threads++;
205-
threads.emplace_back(pp_->at(fiber_id)->LaunchFiber(util::fb2::Launch::post, [&] {
194+
threads.emplace_back(pp_->at(1)->LaunchFiber(util::fb2::Launch::post, [&] {
206195
WriteTask(&mutex_, std::ref(write_count), kWriteTaskSleepTime);
207196
}));
208197
}
209198
}
210199

211-
// All shorter threads should be done and only long one remains
212-
util::ThisFiber::SleepFor(std::chrono::milliseconds(500));
213-
214-
EXPECT_EQ(read_count.load(), 1);
215-
216-
EXPECT_EQ(write_count.load(), write_threads);
217-
218200
// Wait for all readers to acquire and release the lock
219201
for (auto& t : threads) {
220202
t.Join();
221203
}
222204
}
223205

206+
// TODO: Once we have fiber locking we can test scenario where we write/read on same fibers
207+
// current implementation block thread so it is not possible to test this for now.
208+
209+
// Test 6: Mix of readers and writes on random fibers
210+
// TEST_F(MRMWMutexTest, MixWritersReadersOnFibers) {
211+
// std::atomic<size_t> read_count(0);
212+
// std::atomic<size_t> write_count(0);
213+
214+
// // Start multiple readers and writers
215+
// const int num_threads = 100;
216+
// std::vector<util::fb2::Fiber> threads;
217+
// threads.reserve(num_threads + 1);
218+
219+
// // Add long read task that will block all write tasks
220+
// threads.emplace_back(
221+
// pp_->at(0)->LaunchFiber([&] { ReadTask(&mutex_, std::ref(read_count), 2000); }));
222+
223+
// // Give long writer time to acquire the lock
224+
// util::ThisFiber::SleepFor(std::chrono::milliseconds(100));
225+
226+
// size_t write_threads = 0;
227+
// for (int i = 0; i < num_threads; ++i) {
228+
// size_t fiber_id = rand() % 2;
229+
// if (rand() % 3) {
230+
// threads.emplace_back(pp_->at(fiber_id)->LaunchFiber(util::fb2::Launch::post, [&] {
231+
// ReadTask(&mutex_, std::ref(read_count), kReadTaskSleepTime);
232+
// }));
233+
// } else {
234+
// write_threads++;
235+
// threads.emplace_back(pp_->at(fiber_id)->LaunchFiber(util::fb2::Launch::post, [&] {
236+
// WriteTask(&mutex_, std::ref(write_count), kWriteTaskSleepTime);
237+
// }));
238+
// }
239+
// }
240+
241+
// // All shorter threads should be done and only long one remains
242+
// util::ThisFiber::SleepFor(std::chrono::milliseconds(500));
243+
244+
// EXPECT_EQ(read_count.load(), 1);
245+
246+
// EXPECT_EQ(write_count.load(), write_threads);
247+
248+
// // Wait for all readers to acquire and release the lock
249+
// for (auto& t : threads) {
250+
// t.Join();
251+
// }
252+
// }
253+
224254
} // namespace dfly::search

0 commit comments

Comments
 (0)