Skip to content
Open
18 changes: 17 additions & 1 deletion crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ local readview = require('crud.readview')
local schema = require('crud.schema')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guys, the commits are absolutely chaotic, this is not the way, we develop open-source modules. We should rebase that branch in master to make the commit history linear, and not just merge it. So, all commits like Minor changes according to review comments will be visible to our users. You can check out, how Georgy Moiseev did it: proper commits, proper commit messages, tests in every commit (e.g. 8d7cae0).

Now, I'm forced to review more than 1k lines in one step, which is very inconvenient and increases the chanse of missed bugs. And our users won't be able to check the individual commits, if they want to. Of course, it's up to you, since I'm not the maintainer of that module, but it's just not nice to develop and merge code like that.

IMHO, the features should be properly split between commits, every commit must include the associated tests, proper commit messages and mentioning of the #448 ticket. Of course, refactoring or code moving should be in the separate commits. Between commits all of the test must pass

Copy link
Author

@ita-sammann ita-sammann Dec 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guys, the commits are absolutely chaotic

Of course these commits will not go to master, I will re-split them before merging the PR.

Now, I'm forced to review more than 1k lines in one step

My bad. Never thought of this PR as of multiple features that can be reviewed separately.

local storage_info = require('crud.storage_info')
local storage = require('crud.storage')
local rebalance = require('crud.common.rebalance')

local crud = {}

Expand Down Expand Up @@ -158,8 +159,23 @@ crud.readview = readview.new
-- @function schema
crud.schema = schema.call

crud.rebalance = {}

-- @refer rebalance.router_cache_clear
-- @function router_cache_clear
crud.rebalance.router_cache_clear = rebalance.router.cache_clear

-- @refer rebalance.router_cache_length
-- @function router_cache_length
crud.rebalance.router_cache_length = rebalance.router.cache_length

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we export router_cache_length and router_cache_last_clear_ts to public API? I understand that for router_cache_clear, which is supposed to be called by the TCM/cartridge/ATE, but I don't understand, why getters are needed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they will be useful in TCM/Cartridge to indicate that cache has actually been cleared and when did it happen.


-- @refer rebalance.router_cache_last_clear_ts
-- @function router_cache_last_clear_ts
crud.rebalance.router_cache_last_clear_ts = rebalance.router.cache_last_clear_ts

function crud.init_router()
rawset(_G, 'crud', crud)
rawset(_G, 'crud', crud)
rebalance.metrics.enable_router_metrics()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a crutch. Will it be disabled, when user disables metrics? It seems the whole metrics part should be done in the metrics_registry

end

function crud.stop_router()
Expand Down
211 changes: 211 additions & 0 deletions crud/common/rebalance.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
local fiber = require('fiber')
local log = require('log')
local vshard_consts = require('vshard.consts')
local utils = require('crud.common.utils')

local has_metrics_module, metrics = pcall(require, 'metrics')

local SETTINGS_SPACE_NAME = '_crud_settings'
local SAFE_MOD_ENABLE_EVENT = '_crud.safe_mode_enable'

local M = {
safe_mode = false,
safe_mode_enable_hooks = {},

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use ('internal.trigger')for things like that

safe_mode_disable_hooks = {},
_router_cache_last_clear_ts = fiber.time()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we initialize the timestamp with current time on module load? It may lead to bugs, we didn't really cleared the router`s cache on module load

}

local function create_space()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICS, the space has general name _crud_settings, its schema is also general. So, I suppose, in the future, we may have new crud settings, which we want to persist. So, the space has nothing to do with rebalance module, which just uses the space. And it seems its creation should be done somewhere in more general place, e.g. crud.schema or smth like that.

local settings_space = box.schema.space.create(SETTINGS_SPACE_NAME, {
engine = 'memtx',
format = {
{ name = 'key', type = 'string' },
{ name = 'value', type = 'any' },
},
if_not_exists = true,
})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the space is not local, but before restart instances are in different states:

  1. Replicaset has s1, s2 and s3 instances.
  2. s1 is master, safe mode
  3. Master switches to s2, there box.status is broadcasted, we enable safe mode
  4. Now s1 and s2 are in safe mode, s3 is not
  5. If s3 is restarted it will also become safe

That all seems inconsistent and dangerous. Why can't we just enable safe mode on replicas, when we see the change in the _bucket space.

settings_space:create_index('primary', { parts = { 'key' }, if_not_exists = true })
end

local function safe_mode_trigger(_, new, space, op)
if space ~= '_bucket' then
return
end
if (op == 'INSERT' and new.status == vshard_consts.BUCKET.RECEIVING) or
(op == 'REPLACE' and new.status == vshard_consts.BUCKET.SENDING) then
box.broadcast(SAFE_MOD_ENABLE_EVENT, true)
end
end

local function register_enable_hook(func)
M.safe_mode_enable_hooks[func] = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need that variable and why cannot we just set on_replace trigger for _bucket on enable and remove it on disable?

end

local function remove_enable_hook(func)
M.safe_mode_enable_hooks[func] = nil
end

local function register_disable_hook(func)
M.safe_mode_disable_hooks[func] = true
end

local function remove_disable_hook(func)
M.safe_mode_disable_hooks[func] = nil
end

local function safe_mode_status()
return M.safe_mode
end

local function safe_mode_enable()
if not box.info.ro then
box.space[SETTINGS_SPACE_NAME]:replace{ 'safe_mode', true }
for _, trig in pairs(box.space._bucket:on_replace()) do
if trig == safe_mode_trigger then
box.space._bucket:on_replace(nil, safe_mode_trigger)
end
end
end
M.safe_mode = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative to the following:

Why can't we just enable safe mode on replicas, when we see the change in the _bucket space.


We can make the change of safe_mode Lua variable as on_replace trigger for _crud_settings. If user e.g. want forcefully make crud work in safe mod, he may just replace the value and we won't notice it. Currently user doesn't have the access to that function in public API. It seems the hooks should also be executed from on_replace trigger, as well as logging of the rebalance mode

The problem with this approach is that _crud_settings is global and in absense of Raft failover, it's not safe to rely on that:

  1. Master changes the mode to safe in space, the lua var is changed from the on_replace trigger
  2. Replica lags from master, it gets the change of bucket state, but doesn't get the change of _crud_settings.
  3. Replica becomes master, it's in non-safe mode

The preferred solution, from my point of view: _crud_settings becomes _crud_settings_local, it's local now. We set on replace triggers for _bucket and _crud_settings_local. The first one makes replace in _crud_settings_local, the second one changes the lua variable and executes hooks. This way we won't need all these difficult watchers and we'll be able to support versions below 2.10


for hook, _ in pairs(M.safe_mode_enable_hooks) do
hook()
end

log.info('Rebalance safe mode enabled')
end

local function safe_mode_disable()
if not box.info.ro then
box.space[SETTINGS_SPACE_NAME]:replace{ 'safe_mode', false }
box.space._bucket:on_replace(safe_mode_trigger)
end
M.safe_mode = false

for hook, _ in pairs(M.safe_mode_disable_hooks) do
hook()
end

log.info('Rebalance safe mode disabled')
end

local function rebalance_init()
M.metrics.enable_storage_metrics()

-- box.watch was introduced in tarantool 2.10.0
if not utils.tarantool_supports_box_watch() then
log.warn('This version of tarantool does not support autoswitch to safe mode during rebalance. '
.. 'Update to newer version or use `_crud.rebalance_safe_mode_enable()` to enable safe mode manually.')
return
end

box.watch('box.status', function()
if box.info.ro then
return
end

local stored_safe_mode
if box.space[SETTINGS_SPACE_NAME] == nil then
create_space()
box.space[SETTINGS_SPACE_NAME]:insert{ 'safe_mode', false }
else
stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' }
end
M.safe_mode = stored_safe_mode and stored_safe_mode.value or false

if M.safe_mode then
for hook, _ in pairs(M.safe_mode_enable_hooks) do
hook()
end
else
box.space._bucket:on_replace(safe_mode_trigger)
for hook, _ in pairs(M.safe_mode_disable_hooks) do
hook()
end
end
end)

box.watch(SAFE_MOD_ENABLE_EVENT, function(_, do_enable)
if box.info.ro or not do_enable then
return
end
safe_mode_enable()
end)
end

local function router_cache_clear()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't we supposed to check, that rebalancing is ended and we have no buckets in forbidden states?

M._router_cache_last_clear_ts = fiber.time()
return utils.get_vshard_router_instance():_route_map_clear()
end

local function router_cache_length()
return utils.get_vshard_router_instance().known_bucket_count
end

local function router_cache_last_clear_ts()
return M._router_cache_last_clear_ts
end

-- Rebalance related metrics
local function enable_storage_metrics()
if not has_metrics_module then
return
end

local safe_mode_enabled_gauge = metrics.gauge(
'tnt_crud_storage_safe_mode_enabled',
"is safe mode enabled on this storage instance"
)

metrics.register_callback(function()
safe_mode_enabled_gauge:set(safe_mode_status() and 1 or 0)
end)
end

local function enable_router_metrics()
if not has_metrics_module then
return
end

local router_cache_length_gauge = metrics.gauge(
'tnt_crud_router_cache_length',
"number of bucket routes in vshard router cache"
)
local router_cache_last_clear_ts_gauge = metrics.gauge(
'tnt_crud_router_cache_last_clear_ts',
"when vshard router cache was cleared last time"
)

metrics.register_callback(function()
router_cache_length_gauge:set(router_cache_length())
router_cache_last_clear_ts_gauge:set(router_cache_last_clear_ts())
end)
end

M.init = rebalance_init

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: let's make the code consistent with all other modules, it seems we tend to make the variable with proper name and assign methods to it .

function registry.init(opts)

M.safe_mode_status = safe_mode_status
M.safe_mode_enable = safe_mode_enable
M.safe_mode_disable = safe_mode_disable
M.register_safe_mode_enable_hook = register_enable_hook
M.remove_safe_mode_enable_hook = remove_enable_hook
M.register_safe_mode_disable_hook = register_disable_hook
M.remove_safe_mode_disable_hook = remove_disable_hook

M.router = {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: api naming is incosistent, storage_api vs router

cache_clear = router_cache_clear,
cache_length = router_cache_length,
cache_last_clear_ts = router_cache_last_clear_ts,
}

M.storage_api = {
rebalance_safe_mode_status = safe_mode_status,
rebalance_safe_mode_enable = safe_mode_enable,
rebalance_safe_mode_disable = safe_mode_disable,
}

M.metrics = {
enable_storage_metrics = enable_storage_metrics,
enable_router_metrics = enable_router_metrics,
}

return M
2 changes: 2 additions & 0 deletions crud/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ schema.system_spaces = {
['_tt_migrations'] = true,
-- https://github.com/tarantool/cluster-federation/blob/01738cafa0dc7a3138e64f93c4e84cb323653257/src/internal/utils/utils.go#L17
['_cdc_state'] = true,
-- crud/common/rebalance.lua
['_crud_settings'] = true,
}

local function get_crud_schema(space)
Expand Down
5 changes: 4 additions & 1 deletion crud/storage.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local dev_checks = require('crud.common.dev_checks')
local stash = require('crud.common.stash')
local utils = require('crud.common.utils')

local rebalance = require('crud.common.rebalance')
local call = require('crud.common.call')
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
local insert = require('crud.insert')
Expand Down Expand Up @@ -62,6 +63,7 @@ local function init_storage_call(user, storage_api)
end

local modules_with_storage_api = {
rebalance,
call,
sharding_metadata,
insert,
Expand Down Expand Up @@ -103,6 +105,8 @@ local function init_impl()
user = utils.get_this_replica_user() or 'guest'
end

rebalance.init()

for _, module in ipairs(modules_with_storage_api) do
init_storage_call(user, module.storage_api)
end
Expand Down Expand Up @@ -141,7 +145,6 @@ function storage.stop()
internal_stash.watcher:unregister()
internal_stash.watcher = nil
end

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this change is not needed) We should minimize the changed lines in commits, when we can

rawset(_G, utils.STORAGE_NAMESPACE, nil)
end

Expand Down