Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

### Added
* Implement "safe" mode to prevent writing data to wrong replicaset when vshard rebalance is in progress.
* Auto switch to safe mode when rebalance process starts.
* Manual return to fast mode.

## [1.6.1] - 19-09-25

### Added
Expand Down
2 changes: 1 addition & 1 deletion crud-scm-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies = {
'lua ~> 5.1',
'checks >= 3.3.0-1',
'errors >= 2.2.1-1',
'vshard >= 0.1.36-1',
'vshard >= 0.1.39-1',
}

build = {
Expand Down
17 changes: 16 additions & 1 deletion crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ local readview = require('crud.readview')
local schema = require('crud.schema')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guys, the commits are absolutely chaotic, this is not the way, we develop open-source modules. We should rebase that branch in master to make the commit history linear, and not just merge it. So, all commits like Minor changes according to review comments will be visible to our users. You can check out, how Georgy Moiseev did it: proper commits, proper commit messages, tests in every commit (e.g. 8d7cae0).

Now, I'm forced to review more than 1k lines in one step, which is very inconvenient and increases the chanse of missed bugs. And our users won't be able to check the individual commits, if they want to. Of course, it's up to you, since I'm not the maintainer of that module, but it's just not nice to develop and merge code like that.

IMHO, the features should be properly split between commits, every commit must include the associated tests, proper commit messages and mentioning of the #448 ticket. Of course, refactoring or code moving should be in the separate commits. Between commits all of the test must pass

Copy link
Author

@ita-sammann ita-sammann Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guys, the commits are absolutely chaotic

Of course these commits will not go to master, I will re-split them before merging the PR.

Now, I'm forced to review more than 1k lines in one step

My bad. Never thought of this PR as of multiple features that can be reviewed separately.

local storage_info = require('crud.storage_info')
local storage = require('crud.storage')
local rebalance = require('crud.common.rebalance')

local crud = {}

Expand Down Expand Up @@ -158,8 +159,22 @@ crud.readview = readview.new
-- @function schema
crud.schema = schema.call

crud.rebalance = {}

-- @refer rebalance.router_cache_clear
-- @function router_cache_clear
crud.rebalance.router_cache_clear = rebalance.router_api.cache_clear

-- @refer rebalance.router_cache_length
-- @function router_cache_length
crud.rebalance.router_cache_length = rebalance.router_api.cache_length

-- @refer rebalance.router_cache_last_clear_ts
-- @function router_cache_last_clear_ts
crud.rebalance.router_cache_last_clear_ts = rebalance.router_api.cache_last_clear_ts

function crud.init_router()
rawset(_G, 'crud', crud)
rawset(_G, 'crud', crud)
end

function crud.stop_router()
Expand Down
86 changes: 73 additions & 13 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
local errors = require('errors')
local log = require('log')

local call_cache = require('crud.common.call_cache')
local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber_clock = require('fiber').clock
local fiber = require('fiber')
local const = require('crud.common.const')
local rebalance = require('crud.common.rebalance')
local bucket_ref_unref = require('crud.common.sharding.bucket_ref_unref')

local BaseIterator = require('crud.common.map_call_cases.base_iter')
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
Expand All @@ -15,13 +18,48 @@ local CallError = errors.new_class('CallError')
local CALL_FUNC_NAME = 'call_on_storage'
local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME)

-- Methods that can continue execution in fast mode when rebalance starts
local safe_mode_allowed_fast_methods = {
[CRUD_CALL_FUNC_NAME .. '/fast/' .. utils.get_storage_call('readview_open_on_storage')] = true,
[CRUD_CALL_FUNC_NAME .. '/fast/' .. utils.get_storage_call('readview_close_on_storage')] = true,
[CRUD_CALL_FUNC_NAME .. '/fast/' .. utils.get_storage_call('select_readview_on_storage')] = true,
[CRUD_CALL_FUNC_NAME .. '/fast/' .. utils.get_storage_call('truncate_on_storage')] = true,
[CRUD_CALL_FUNC_NAME .. '/fast/' .. utils.get_storage_call('len_on_storage')] = true,
[CRUD_CALL_FUNC_NAME .. '/fast/' .. utils.get_storage_call('count_on_storage')] = true,
[CRUD_CALL_FUNC_NAME .. '/fast/' .. utils.get_storage_call('get_border_on_storage')] = true,
}

local call = {}

local function call_on_storage(run_as_user, func_name, ...)
local function call_on_storage_safe(run_as_user, func_name, ...)
fiber.name(CRUD_CALL_FUNC_NAME .. '/safe/' .. func_name)
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
end

local function call_on_storage_fast(run_as_user, func_name, ...)
fiber.name(CRUD_CALL_FUNC_NAME .. '/fast/' .. func_name)
return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
end

local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast

rebalance.on_safe_mode_toggle(function(is_enabled)
if is_enabled then
call_on_storage = call_on_storage_safe

for fb_id, fb in pairs(fiber.info()) do
local fibers_killed = 0
if fb.name:startswith(CRUD_CALL_FUNC_NAME .. '/fast/') and not safe_mode_allowed_fast_methods[fb.name] then
fiber.kill(fb_id)
fibers_killed = fibers_killed + 1
end
log.debug('Killed %d fibers with fast-mode crud requests.', fibers_killed)
end
else
call_on_storage = call_on_storage_fast
end
end)

call.storage_api = {[CALL_FUNC_NAME] = call_on_storage}

function call.get_vshard_call_name(mode, prefer_replica, balance)
Expand Down Expand Up @@ -82,7 +120,10 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc
))
end

local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts)
--- Executes a vshard call and retries once after performing recovery actions
--- like bucket cache reset, destination redirect (for single calls), or master discovery.
local function call_with_retry_and_recovery(vshard_router,
replicaset, method, func_name, func_args, call_opts, is_single_call)
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)

-- In case cluster was just bootstrapped with auto master discovery,
Expand All @@ -93,7 +134,27 @@ local function retry_call_with_master_discovery(replicaset, method, func_name, f
return resp, err
end

if err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
-- This is a partial copy of error handling from vshard.router.router_call_impl()
-- It is much simpler mostly because bucket_set() can't be accessed from outside vshard.
if err.class_name == bucket_ref_unref.BucketRefError.name then
if is_single_call and #err.bucket_ref_errs == 1 then
local single_err = err.bucket_ref_errs[1]
local destination = single_err.vshard_err.destination
if destination and vshard_router.replicasets[destination] then
replicaset = vshard_router.replicasets[destination]
end
end

for _, bucket_ref_err in pairs(err.bucket_ref_errs) do
local bucket_id = bucket_ref_err.bucket_id
local vshard_err = bucket_ref_err.vshard_err
if vshard_err.name == 'WRONG_BUCKET' or
vshard_err.name == 'BUCKET_IS_LOCKED' or
vshard_err.name == 'TRANSFER_IS_IN_PROGRESS' then
vshard_router:_bucket_reset(bucket_id)
end
end
elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
replicaset:locate_master()
end

Expand Down Expand Up @@ -147,8 +208,8 @@ function call.map(vshard_router, func_name, func_args, opts)
while iter:has_next() do
local args, replicaset, replicaset_id = iter:get()

local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
func_name, args, call_opts)
local future, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
func_name, args, call_opts, false)

if err ~= nil then
local result_info = {
Expand All @@ -170,9 +231,9 @@ function call.map(vshard_router, func_name, func_args, opts)
futures_by_replicasets[replicaset_id] = future
end

local deadline = fiber_clock() + timeout
local deadline = fiber.clock() + timeout
for replicaset_id, future in pairs(futures_by_replicasets) do
local wait_timeout = deadline - fiber_clock()
local wait_timeout = deadline - fiber.clock()
if wait_timeout < 0 then
wait_timeout = 0
end
Expand Down Expand Up @@ -221,9 +282,8 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts)
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
local request_timeout = opts.mode == 'read' and opts.request_timeout or nil

local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
func_name, func_args, {timeout = timeout,
request_timeout = request_timeout})
local res, err = call_with_retry_and_recovery(vshard_router, replicaset, vshard_call_name,
func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, true)
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
end
Expand All @@ -248,8 +308,8 @@ function call.any(vshard_router, func_name, func_args, opts)
end
local replicaset_id, replicaset = next(replicasets)

local res, err = retry_call_with_master_discovery(replicaset, 'call',
func_name, func_args, {timeout = timeout})
local res, err = call_with_retry_and_recovery(vshard_router, replicaset, 'call',
func_name, func_args, {timeout = timeout}, false)
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
end
Expand Down
Loading
Loading