From 67260d0c4ad41161209a7a3c7c63961f3016fcc9 Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Wed, 15 Oct 2025 19:14:24 +0300 Subject: [PATCH 01/13] TNTP-2109: Implement slow mode for single calls --- crud/common/call.lua | 58 ++++++++++++++++++++++++++++++----- crud/select/merger.lua | 14 ++++++--- test/unit/privileges_test.lua | 6 ++-- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/crud/common/call.lua b/crud/common/call.lua index 5887923f..bf2b5bb5 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -18,8 +18,49 @@ local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME) local call = {} -local function call_on_storage(run_as_user, func_name, ...) - return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) +local bucket_ref_many +local bucket_unref_many + +bucket_ref_many = function(bucket_ids, mode) + local reffed = {} + for _, bucket_id in pairs(bucket_ids) do + local ok, err = vshard.storage.bucket_ref(bucket_id, mode) + if not ok then + bucket_unref_many(reffed, mode) + return nil, err + end + table.insert(reffed, bucket_id) + end + return true, nil +end + +bucket_unref_many = function(bucket_ids, mode) + local all_ok = true + local last_err = nil + for _, bucket_id in pairs(bucket_ids) do + local ok, err = vshard.storage.bucket_unref(bucket_id, mode) + if not ok then + all_ok = nil + last_err = err + end + end + return all_ok, last_err +end + +local function call_on_storage(run_as_user, bucket_ids, mode, func_name, ...) + local ok, ref_err = bucket_ref_many(bucket_ids, mode) + if not ok then + return nil, ref_err + end + + local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)} + + ok, ref_err = bucket_unref_many(bucket_ids, mode) + if not ok then + return nil, ref_err + end + + return unpack(res, 1, table.maxn(res)) end call.storage_api = {[CALL_FUNC_NAME] = call_on_storage} @@ -82,8 +123,8 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc )) end -local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts) - local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) +local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts, mode, bucket_ids) + local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args) -- In case cluster was just bootstrapped with auto master discovery, -- replicaset may miss master. @@ -148,7 +189,7 @@ function call.map(vshard_router, func_name, func_args, opts) local args, replicaset, replicaset_id = iter:get() local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name, - func_name, args, call_opts) + func_name, args, call_opts, opts.mode, {}) -- TODO: provide bucket_ids if err ~= nil then local result_info = { @@ -222,8 +263,8 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts) local request_timeout = opts.mode == 'read' and opts.request_timeout or nil local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name, - func_name, func_args, {timeout = timeout, - request_timeout = request_timeout}) + func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, + opts.mode, {bucket_id}) if err ~= nil then return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id) end @@ -249,7 +290,8 @@ function call.any(vshard_router, func_name, func_args, opts) local replicaset_id, replicaset = next(replicasets) local res, err = retry_call_with_master_discovery(replicaset, 'call', - func_name, func_args, {timeout = timeout}) + func_name, func_args, {timeout = timeout}, + 'read', {}) if err ~= nil then return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id) end diff --git a/crud/select/merger.lua b/crud/select/merger.lua index 1fd75320..a6cc23d2 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -171,7 +171,9 @@ local function fetch_chunk(context, state) -- change context.func_args too, but it does not matter next_func_args[4].after_tuple = cursor.after_tuple - local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, next_func_args) + local mode = "read" + local bucket_ids = {} + local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, next_func_args) if context.readview then next_state = {future = context.future_replica.conn:call("_crud.call_on_storage", @@ -203,7 +205,8 @@ local function new(vshard_router, replicasets, space, index_id, func_name, func_ local buf = buffer.ibuf() local net_box_opts = {is_async = true, buffer = buf, skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} - local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) + local bucket_ids = {} + local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args) local future = replicaset[vshard_call_name](replicaset, "_crud.call_on_storage", func_args_ext, net_box_opts) @@ -279,8 +282,11 @@ local function new_readview(vshard_router, replicasets, readview_info, space, in local net_box_opts = {is_async = true, buffer = buf, skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} func_args[4].readview_id = replicaset_info.id - local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) - local future = replica.conn:call("_crud.call_on_storage", func_args_ext, net_box_opts) + local mode = "read" + local bucket_ids = {} + local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args) + local future = replica.conn:call("_crud.call_on_storage", + func_args_ext, net_box_opts) -- Create a source. local context = { diff --git a/test/unit/privileges_test.lua b/test/unit/privileges_test.lua index 51cec2c5..cd19fd1c 100644 --- a/test/unit/privileges_test.lua +++ b/test/unit/privileges_test.lua @@ -17,13 +17,13 @@ g.before_all(function() end) g.test_prepend_current_user_smoke = function() - local res = call.storage_api.call_on_storage(box.session.effective_user(), "unittestfunc", {"too", "foo"}) + local res = call.storage_api.call_on_storage(box.session.effective_user(), {}, "read", "unittestfunc", {"too", "foo"}) t.assert_equals(res, {"too", "foo"}) end g.test_non_existent_user = function() t.assert_error_msg_contains("User 'non_existent_user' is not found", - call.storage_api.call_on_storage, "non_existent_user", "unittestfunc") + call.storage_api.call_on_storage, "non_existent_user", {}, "read", "unittestfunc") end g.test_that_the_session_switches_back = function() @@ -34,7 +34,7 @@ g.test_that_the_session_switches_back = function() local reference_user = box.session.effective_user() t.assert_not_equals(reference_user, "unittestuser") - local res = call.storage_api.call_on_storage("unittestuser", "unittestfunc2") + local res = call.storage_api.call_on_storage("unittestuser", {}, "read", "unittestfunc2") t.assert_equals(res, "unittestuser") t.assert_equals(box.session.effective_user(), reference_user) end From 9b79a68a33d46c96dc769cab7c2cdb900e162e26 Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Wed, 15 Oct 2025 23:05:35 +0300 Subject: [PATCH 02/13] TNTP-2109: Add bucket_ref to map calls --- crud/common/call.lua | 4 ++-- crud/common/map_call_cases/base_iter.lua | 2 +- crud/common/map_call_cases/batch_insert_iter.lua | 3 ++- crud/common/map_call_cases/batch_upsert_iter.lua | 3 ++- crud/common/sharding/init.lua | 10 ++++++++++ 5 files changed, 17 insertions(+), 5 deletions(-) diff --git a/crud/common/call.lua b/crud/common/call.lua index bf2b5bb5..569106cb 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -186,10 +186,10 @@ function call.map(vshard_router, func_name, func_args, opts) request_timeout = opts.mode == 'read' and opts.request_timeout or nil, } while iter:has_next() do - local args, replicaset, replicaset_id = iter:get() + local args, replicaset, replicaset_id, bucket_ids = iter:get() local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name, - func_name, args, call_opts, opts.mode, {}) -- TODO: provide bucket_ids + func_name, args, call_opts, opts.mode, bucket_ids) if err ~= nil then local result_info = { diff --git a/crud/common/map_call_cases/base_iter.lua b/crud/common/map_call_cases/base_iter.lua index 452e1599..b608dc3e 100644 --- a/crud/common/map_call_cases/base_iter.lua +++ b/crud/common/map_call_cases/base_iter.lua @@ -72,7 +72,7 @@ function BaseIterator:get() local replicaset = self.next_replicaset self.next_index, self.next_replicaset = next(self.replicasets, self.next_index) - return self.func_args, replicaset, replicaset_id + return self.func_args, replicaset, replicaset_id, {} end return BaseIterator diff --git a/crud/common/map_call_cases/batch_insert_iter.lua b/crud/common/map_call_cases/batch_insert_iter.lua index 37867f1b..824688f4 100644 --- a/crud/common/map_call_cases/batch_insert_iter.lua +++ b/crud/common/map_call_cases/batch_insert_iter.lua @@ -76,10 +76,11 @@ function BatchInsertIterator:get() self.next_batch.tuples, self.opts, } + local bucket_ids = self.next_batch.bucket_ids self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) - return func_args, replicaset, replicaset_id + return func_args, replicaset, replicaset_id, bucket_ids end return BatchInsertIterator diff --git a/crud/common/map_call_cases/batch_upsert_iter.lua b/crud/common/map_call_cases/batch_upsert_iter.lua index d25e3ee8..5249ea55 100644 --- a/crud/common/map_call_cases/batch_upsert_iter.lua +++ b/crud/common/map_call_cases/batch_upsert_iter.lua @@ -85,10 +85,11 @@ function BatchUpsertIterator:get() self.next_batch.operations, self.opts, } + local bucket_ids = self.next_batch.bucket_ids self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) - return func_args, replicaset, replicaset_id + return func_args, replicaset, replicaset_id, bucket_ids end return BatchUpsertIterator diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index 02bb6beb..83314391 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -324,8 +324,10 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) local record_by_replicaset = batches[replicaset_id] or { replicaset = replicaset, tuples = {}, + bucket_ids = {}, } table.insert(record_by_replicaset.tuples, tuple) + record_by_replicaset.bucket_ids[sharding_data.bucket_id] = true if opts.operations ~= nil then record_by_replicaset.operations = record_by_replicaset.operations or {} @@ -335,6 +337,14 @@ function sharding.split_tuples_by_replicaset(vshard_router, tuples, space, opts) batches[replicaset_id] = record_by_replicaset end + for _, rbr in pairs(batches) do + local bucket_ids = {} + for bid, _ in pairs(rbr.bucket_ids) do + table.insert(bucket_ids, bid) + end + rbr.bucket_ids = bucket_ids + end + return { batches = batches, sharding_func_hash = sharding_func_hash, From 7730a5c452e8f4c7f443e88da14f52b766e22bb1 Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Wed, 15 Oct 2025 23:14:41 +0300 Subject: [PATCH 03/13] Fix luachecks warnings --- crud/common/call.lua | 1 + crud/select/merger.lua | 12 +++++++++--- test/unit/privileges_test.lua | 3 ++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/crud/common/call.lua b/crud/common/call.lua index 569106cb..ff2f66ae 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -1,4 +1,5 @@ local errors = require('errors') +local vshard = require('vshard') local call_cache = require('crud.common.call_cache') local dev_checks = require('crud.common.dev_checks') diff --git a/crud/select/merger.lua b/crud/select/merger.lua index a6cc23d2..48dfe1e4 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -173,7 +173,9 @@ local function fetch_chunk(context, state) next_func_args[4].after_tuple = cursor.after_tuple local mode = "read" local bucket_ids = {} - local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, next_func_args) + local func_args_ext = utils.append_array( + { box.session.effective_user(), bucket_ids, mode, func_name }, + next_func_args) if context.readview then next_state = {future = context.future_replica.conn:call("_crud.call_on_storage", @@ -206,7 +208,9 @@ local function new(vshard_router, replicasets, space, index_id, func_name, func_ local net_box_opts = {is_async = true, buffer = buf, skip_header = utils.tarantool_supports_netbox_skip_header_option() or nil} local bucket_ids = {} - local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args) + local func_args_ext = utils.append_array( + { box.session.effective_user(), bucket_ids, mode, func_name }, + func_args) local future = replicaset[vshard_call_name](replicaset, "_crud.call_on_storage", func_args_ext, net_box_opts) @@ -284,7 +288,9 @@ local function new_readview(vshard_router, replicasets, readview_info, space, in func_args[4].readview_id = replicaset_info.id local mode = "read" local bucket_ids = {} - local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args) + local func_args_ext = utils.append_array( + { box.session.effective_user(), bucket_ids, mode, func_name }, + func_args) local future = replica.conn:call("_crud.call_on_storage", func_args_ext, net_box_opts) diff --git a/test/unit/privileges_test.lua b/test/unit/privileges_test.lua index cd19fd1c..7a3b6036 100644 --- a/test/unit/privileges_test.lua +++ b/test/unit/privileges_test.lua @@ -17,7 +17,8 @@ g.before_all(function() end) g.test_prepend_current_user_smoke = function() - local res = call.storage_api.call_on_storage(box.session.effective_user(), {}, "read", "unittestfunc", {"too", "foo"}) + local res = call.storage_api.call_on_storage( + box.session.effective_user(), {}, "read", "unittestfunc", {"too", "foo"}) t.assert_equals(res, {"too", "foo"}) end From 8898804f182923ca5eb5eda828fc9e24717a4ae3 Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Tue, 28 Oct 2025 19:22:20 +0300 Subject: [PATCH 04/13] TNTP-2109: 'Fast' mode without any fiber manipulations --- crud/common/call.lua | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/crud/common/call.lua b/crud/common/call.lua index ff2f66ae..f5c4d889 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -48,7 +48,7 @@ bucket_unref_many = function(bucket_ids, mode) return all_ok, last_err end -local function call_on_storage(run_as_user, bucket_ids, mode, func_name, ...) +local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...) local ok, ref_err = bucket_ref_many(bucket_ids, mode) if not ok then return nil, ref_err @@ -64,6 +64,12 @@ local function call_on_storage(run_as_user, bucket_ids, mode, func_name, ...) return unpack(res, 1, table.maxn(res)) end +local function call_on_storage_fast(run_as_user, _, _, func_name, ...) + return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) +end + +local call_on_storage = call_on_storage_fast + call.storage_api = {[CALL_FUNC_NAME] = call_on_storage} function call.get_vshard_call_name(mode, prefer_replica, balance) From 358d0ac75d14387fb87c443376ae3d0ede7b16ef Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Tue, 28 Oct 2025 20:31:33 +0300 Subject: [PATCH 05/13] TNTP-2109: 'Fast' mode with fiber renaming --- crud/common/call.lua | 39 +++++++++++++++++++-------------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/crud/common/call.lua b/crud/common/call.lua index f5c4d889..d6cf733b 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -5,7 +5,7 @@ local call_cache = require('crud.common.call_cache') local dev_checks = require('crud.common.dev_checks') local utils = require('crud.common.utils') local sharding_utils = require('crud.common.sharding.utils') -local fiber_clock = require('fiber').clock +local fiber = require('fiber') local const = require('crud.common.const') local BaseIterator = require('crud.common.map_call_cases.base_iter') @@ -15,14 +15,24 @@ local CallError = errors.new_class('CallError') local CALL_FUNC_NAME = 'call_on_storage' local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME) - +local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/' local call = {} -local bucket_ref_many -local bucket_unref_many +local function bucket_unref_many(bucket_ids, mode) + local all_ok = true + local last_err = nil + for _, bucket_id in pairs(bucket_ids) do + local ok, err = vshard.storage.bucket_unref(bucket_id, mode) + if not ok then + all_ok = nil + last_err = err + end + end + return all_ok, last_err +end -bucket_ref_many = function(bucket_ids, mode) +local function bucket_ref_many(bucket_ids, mode) local reffed = {} for _, bucket_id in pairs(bucket_ids) do local ok, err = vshard.storage.bucket_ref(bucket_id, mode) @@ -35,19 +45,6 @@ bucket_ref_many = function(bucket_ids, mode) return true, nil end -bucket_unref_many = function(bucket_ids, mode) - local all_ok = true - local last_err = nil - for _, bucket_id in pairs(bucket_ids) do - local ok, err = vshard.storage.bucket_unref(bucket_id, mode) - if not ok then - all_ok = nil - last_err = err - end - end - return all_ok, last_err -end - local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...) local ok, ref_err = bucket_ref_many(bucket_ids, mode) if not ok then @@ -65,6 +62,8 @@ local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, .. end local function call_on_storage_fast(run_as_user, _, _, func_name, ...) + fiber.name(CRUD_CALL_FIBER_NAME .. func_name) + return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) end @@ -218,9 +217,9 @@ function call.map(vshard_router, func_name, func_args, opts) futures_by_replicasets[replicaset_id] = future end - local deadline = fiber_clock() + timeout + local deadline = fiber.clock() + timeout for replicaset_id, future in pairs(futures_by_replicasets) do - local wait_timeout = deadline - fiber_clock() + local wait_timeout = deadline - fiber.clock() if wait_timeout < 0 then wait_timeout = 0 end From 475590aa660b8cbdd91ba2a8390ba708c6e055cb Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Fri, 14 Nov 2025 22:15:31 +0300 Subject: [PATCH 06/13] TNTP-2109: Auto switch to safe mode on rebalance start --- crud.lua | 15 ++++ crud/common/call.lua | 22 ++++- crud/common/rebalance.lua | 181 ++++++++++++++++++++++++++++++++++++++ crud/schema.lua | 2 + crud/storage.lua | 6 ++ 5 files changed, 225 insertions(+), 1 deletion(-) create mode 100644 crud/common/rebalance.lua diff --git a/crud.lua b/crud.lua index 60301163..37ec7f7d 100644 --- a/crud.lua +++ b/crud.lua @@ -23,6 +23,7 @@ local readview = require('crud.readview') local schema = require('crud.schema') local storage_info = require('crud.storage_info') local storage = require('crud.storage') +local rebalance = require('crud.common.rebalance') local crud = {} @@ -158,6 +159,20 @@ crud.readview = readview.new -- @function schema crud.schema = schema.call +crud.rebalance = {} + +-- @refer rebalance.router_cache_clear +-- @function router_cache_clear +crud.rebalance.router_cache_clear = rebalance.router.cache_clear + +-- @refer rebalance.router_cache_length +-- @function router_cache_length +crud.rebalance.router_cache_length = rebalance.router.cache_length + +-- @refer rebalance.router_cache_last_clear_ts +-- @function router_cache_last_clear_ts +crud.rebalance.router_cache_last_clear_ts = rebalance.router.cache_last_clear_ts + function crud.init_router() rawset(_G, 'crud', crud) end diff --git a/crud/common/call.lua b/crud/common/call.lua index d6cf733b..8a8a5418 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -7,6 +7,7 @@ local utils = require('crud.common.utils') local sharding_utils = require('crud.common.sharding.utils') local fiber = require('fiber') local const = require('crud.common.const') +local rebalance = require('crud.common.rebalance') local BaseIterator = require('crud.common.map_call_cases.base_iter') local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') @@ -46,6 +47,8 @@ local function bucket_ref_many(bucket_ids, mode) end local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...) + fiber.name(CRUD_CALL_FIBER_NAME .. func_name) + local ok, ref_err = bucket_ref_many(bucket_ids, mode) if not ok then return nil, ref_err @@ -67,7 +70,24 @@ local function call_on_storage_fast(run_as_user, _, _, func_name, ...) return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) end -local call_on_storage = call_on_storage_fast +local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast + +local function safe_mode_enable() + call_on_storage = call_on_storage_safe + + for fb_id, fb in pairs(fiber.info()) do + if string.find(fb.name, CRUD_CALL_FIBER_NAME) then + fiber.kill(fb_id) + end + end +end + +local function safe_mode_disable() + call_on_storage = call_on_storage_fast +end + +rebalance.register_safe_mode_enable_hook(safe_mode_enable) +rebalance.register_safe_mode_disable_hook(safe_mode_disable) call.storage_api = {[CALL_FUNC_NAME] = call_on_storage} diff --git a/crud/common/rebalance.lua b/crud/common/rebalance.lua new file mode 100644 index 00000000..0985ff42 --- /dev/null +++ b/crud/common/rebalance.lua @@ -0,0 +1,181 @@ +local fiber = require('fiber') +local vshard_consts = require('vshard.consts') +local utils = require('crud.common.utils') + +local MODULE_INTERNALS = '__module_crud_rebalance' +local SETTINGS_SPACE_NAME = '_crud_settings' + + +local M = rawget(_G, MODULE_INTERNALS) +if not M then + M = { + safe_mode = false, + safe_mode_enable_hooks = {}, + safe_mode_disable_hooks = {}, + _router_cache_last_clear_ts = fiber.time() + } +else + return M +end + +local function create_space() + local settings_space = box.schema.space.create(SETTINGS_SPACE_NAME, { + engine = 'memtx', + format = { + { name = 'key', type = 'string' }, + { name = 'value', type = 'any' }, + }, + if_not_exists = true, + }) + settings_space:create_index('primary', { parts = { 'key' }, if_not_exists = true }) +end + +local function safe_mode_trigger(_, new, space, op) + if space ~= '_bucket' then + return + end + if (op == 'INSERT' and new.status == vshard_consts.BUCKET.RECEIVING) or + (op == 'REPLACE' and new.status == vshard_consts.BUCKET.SENDING) then + box.broadcast('_crud.safe_mode_enable', true) + end +end + +local function register_enable_hook(func) + M.safe_mode_enable_hooks[func] = true +end + +local function remove_enable_hook(func) + M.safe_mode_enable_hooks[func] = nil +end + +local function register_disable_hook(func) + M.safe_mode_disable_hooks[func] = true +end + +local function remove_disable_hook(func) + M.safe_mode_disable_hooks[func] = nil +end + +local function safe_mode_status() + return M.safe_mode +end + +local function safe_mode_enable() + if not box.info.ro then + box.space[SETTINGS_SPACE_NAME]:replace{ 'safe_mode', true } + box.space._bucket:on_replace(nil, safe_mode_trigger) + end + M.safe_mode = true + + for hook, _ in pairs(M.safe_mode_enable_hooks) do + hook() + end +end + +local function safe_mode_disable() + if not box.info.ro then + box.space[SETTINGS_SPACE_NAME]:replace{ 'safe_mode', false } + box.space._bucket:on_replace(safe_mode_trigger) + end + M.safe_mode = false + + for hook, _ in pairs(M.safe_mode_disable_hooks) do + hook() + end +end + +local function rebalance_init() + box.watch('box.status', function() + if box.info.ro or box.space[SETTINGS_SPACE_NAME] == nil then + return + end + + local stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' } + M.safe_mode = stored_safe_mode.value + + if M.safe_mode then + for hook, _ in pairs(M.safe_mode_enable_hooks) do + hook() + end + else + box.space._bucket:on_replace(safe_mode_trigger) + for hook, _ in pairs(M.safe_mode_disable_hooks) do + hook() + end + end + end) + + box.watch('_crud.safe_mode_enable', function(_, do_enable) + if box.info.ro or not do_enable then + return + end + safe_mode_enable() + end) + + if box.info.ro then + return + end + + local stored_safe_mode + if box.space[SETTINGS_SPACE_NAME] == nil then + create_space() + box.space[SETTINGS_SPACE_NAME]:insert{ 'safe_mode', false } + else + stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' } + end + M.safe_mode = stored_safe_mode and stored_safe_mode.value or false + + if M.safe_mode then + for hook, _ in pairs(M.safe_mode_enable_hooks) do + hook() + end + else + box.space._bucket:on_replace(safe_mode_trigger) + for hook, _ in pairs(M.safe_mode_disable_hooks) do + hook() + end + end +end + +local function rebalance_stop() + M.safe_mode_disable() +end + +local function router_cache_clear() + local r = utils.get_vshard_router_instance() + M._router_cache_last_clear_ts = fiber.time() + return r:_route_map_clear() +end + +local function router_cache_length() + local r = utils.get_vshard_router_instance() + return r.known_bucket_count +end + +local function router_cache_last_clear_ts() + return M._router_cache_last_clear_ts +end + +M.init = rebalance_init +M.stop = rebalance_stop +M.safe_mode_status = safe_mode_status +M.safe_mode_enable = safe_mode_enable +M.safe_mode_disable = safe_mode_disable +M.register_safe_mode_enable_hook = register_enable_hook +M.remove_safe_mode_enable_hook = remove_enable_hook +M.register_safe_mode_disable_hook = register_disable_hook +M.remove_safe_mode_disable_hook = remove_disable_hook + +M.router = { + cache_clear = router_cache_clear, + cache_length = router_cache_length, + cache_last_clear_ts = router_cache_last_clear_ts, +} + +M.storage_api = { + rebalance_safe_mode_status = safe_mode_status, + rebalance_safe_mode_enable = safe_mode_enable, + rebalance_safe_mode_disable = safe_mode_disable, +} + +return M diff --git a/crud/schema.lua b/crud/schema.lua index 57743ba9..0a6d0473 100644 --- a/crud/schema.lua +++ b/crud/schema.lua @@ -46,6 +46,8 @@ schema.system_spaces = { ['_tt_migrations'] = true, -- https://github.com/tarantool/cluster-federation/blob/01738cafa0dc7a3138e64f93c4e84cb323653257/src/internal/utils/utils.go#L17 ['_cdc_state'] = true, + -- crud/common/rebalance.lua + ['_crud_settings'] = true, } local function get_crud_schema(space) diff --git a/crud/storage.lua b/crud/storage.lua index 0b8ef770..58acc4ef 100644 --- a/crud/storage.lua +++ b/crud/storage.lua @@ -4,6 +4,7 @@ local dev_checks = require('crud.common.dev_checks') local stash = require('crud.common.stash') local utils = require('crud.common.utils') +local rebalance = require('crud.common.rebalance') local call = require('crud.common.call') local sharding_metadata = require('crud.common.sharding.sharding_metadata') local insert = require('crud.insert') @@ -62,6 +63,7 @@ local function init_storage_call(user, storage_api) end local modules_with_storage_api = { + rebalance, call, sharding_metadata, insert, @@ -103,6 +105,8 @@ local function init_impl() user = utils.get_this_replica_user() or 'guest' end + rebalance.init() + for _, module in ipairs(modules_with_storage_api) do init_storage_call(user, module.storage_api) end @@ -142,6 +146,8 @@ function storage.stop() internal_stash.watcher = nil end + rebalance.stop() + rawset(_G, utils.STORAGE_NAMESPACE, nil) end From 90e1fa29594a0bd4ffb510164093a719aeec6620 Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Tue, 18 Nov 2025 23:09:08 +0300 Subject: [PATCH 07/13] TNTP-2109: Process bucket_ref errors in crud.router --- crud/common/call.lua | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/crud/common/call.lua b/crud/common/call.lua index 8a8a5418..8597b77b 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -149,7 +149,9 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc )) end -local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts, mode, bucket_ids) +local function retry_call_with_master_discovery(vshard_router, replicaset, + method, func_name, func_args, + call_opts, mode, bucket_ids) local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args) -- In case cluster was just bootstrapped with auto master discovery, @@ -160,7 +162,18 @@ local function retry_call_with_master_discovery(replicaset, method, func_name, f return resp, err end - if err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then + if err.name == 'WRONG_BUCKET' or + err.name == 'BUCKET_IS_LOCKED' or + err.name == 'TRANSFER_IS_IN_PROGRESS' then + vshard_router:_bucket_reset(err.bucket_id) + + -- Substitute replicaset only for single bucket_id calls. + if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then + replicaset = vshard_router.replicasets[err.destination] + else + return nil, err + end + elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then replicaset:locate_master() end @@ -214,7 +227,7 @@ function call.map(vshard_router, func_name, func_args, opts) while iter:has_next() do local args, replicaset, replicaset_id, bucket_ids = iter:get() - local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name, + local future, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name, func_name, args, call_opts, opts.mode, bucket_ids) if err ~= nil then @@ -288,7 +301,7 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts) local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT local request_timeout = opts.mode == 'read' and opts.request_timeout or nil - local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name, + local res, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name, func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, opts.mode, {bucket_id}) if err ~= nil then @@ -315,7 +328,7 @@ function call.any(vshard_router, func_name, func_args, opts) end local replicaset_id, replicaset = next(replicasets) - local res, err = retry_call_with_master_discovery(replicaset, 'call', + local res, err = retry_call_with_master_discovery(vshard_router, replicaset, 'call', func_name, func_args, {timeout = timeout}, 'read', {}) if err ~= nil then From 1fa88c70701c075654a9410e33e6d5ccca8f16de Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Wed, 19 Nov 2025 20:17:44 +0300 Subject: [PATCH 08/13] TNTP-2109: Multiple minor fixes --- crud/common/call.lua | 10 ++-- crud/common/map_call_cases/base_iter.lua | 1 + .../map_call_cases/batch_insert_iter.lua | 1 + .../map_call_cases/batch_upsert_iter.lua | 1 + crud/common/rebalance.lua | 57 ++++++------------- 5 files changed, 26 insertions(+), 44 deletions(-) diff --git a/crud/common/call.lua b/crud/common/call.lua index 8597b77b..66616bb9 100644 --- a/crud/common/call.lua +++ b/crud/common/call.lua @@ -47,7 +47,7 @@ local function bucket_ref_many(bucket_ids, mode) end local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...) - fiber.name(CRUD_CALL_FIBER_NAME .. func_name) + fiber.name(CRUD_CALL_FIBER_NAME .. 'safe/' .. func_name) local ok, ref_err = bucket_ref_many(bucket_ids, mode) if not ok then @@ -65,7 +65,7 @@ local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, .. end local function call_on_storage_fast(run_as_user, _, _, func_name, ...) - fiber.name(CRUD_CALL_FIBER_NAME .. func_name) + fiber.name(CRUD_CALL_FIBER_NAME .. 'fast/' .. func_name) return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) end @@ -162,9 +162,11 @@ local function retry_call_with_master_discovery(vshard_router, replicaset, return resp, err end + -- This is a partial copy of error handling from vshard.router.router_call_impl() + -- It is much simpler mostly because bucket_set() can't be accessed from outside vshard. if err.name == 'WRONG_BUCKET' or - err.name == 'BUCKET_IS_LOCKED' or - err.name == 'TRANSFER_IS_IN_PROGRESS' then + err.name == 'BUCKET_IS_LOCKED' or + err.name == 'TRANSFER_IS_IN_PROGRESS' then vshard_router:_bucket_reset(err.bucket_id) -- Substitute replicaset only for single bucket_id calls. diff --git a/crud/common/map_call_cases/base_iter.lua b/crud/common/map_call_cases/base_iter.lua index b608dc3e..e52c8eb4 100644 --- a/crud/common/map_call_cases/base_iter.lua +++ b/crud/common/map_call_cases/base_iter.lua @@ -67,6 +67,7 @@ end -- @return[1] table func_args -- @return[2] table replicaset -- @return[3] string replicaset_id +-- @return[4] table bucket_ids function BaseIterator:get() local replicaset_id = self.next_index local replicaset = self.next_replicaset diff --git a/crud/common/map_call_cases/batch_insert_iter.lua b/crud/common/map_call_cases/batch_insert_iter.lua index 824688f4..2c22a798 100644 --- a/crud/common/map_call_cases/batch_insert_iter.lua +++ b/crud/common/map_call_cases/batch_insert_iter.lua @@ -68,6 +68,7 @@ end -- @return[1] table func_args -- @return[2] table replicaset -- @return[3] string replicaset_id +-- @return[4] table bucket_ids function BatchInsertIterator:get() local replicaset_id = self.next_index local replicaset = self.next_batch.replicaset diff --git a/crud/common/map_call_cases/batch_upsert_iter.lua b/crud/common/map_call_cases/batch_upsert_iter.lua index 5249ea55..658db9d4 100644 --- a/crud/common/map_call_cases/batch_upsert_iter.lua +++ b/crud/common/map_call_cases/batch_upsert_iter.lua @@ -76,6 +76,7 @@ end -- @return[1] table func_args -- @return[2] table replicaset -- @return[3] string replicaset_id +-- @return[4] table bucket_ids function BatchUpsertIterator:get() local replicaset_id = self.next_index local replicaset = self.next_batch.replicaset diff --git a/crud/common/rebalance.lua b/crud/common/rebalance.lua index 0985ff42..3b2f8671 100644 --- a/crud/common/rebalance.lua +++ b/crud/common/rebalance.lua @@ -2,21 +2,16 @@ local fiber = require('fiber') local vshard_consts = require('vshard.consts') local utils = require('crud.common.utils') -local MODULE_INTERNALS = '__module_crud_rebalance' local SETTINGS_SPACE_NAME = '_crud_settings' +local SAFE_MOD_ENABLE_EVENT = '_crud.safe_mode_enable' -local M = rawget(_G, MODULE_INTERNALS) -if not M then - M = { - safe_mode = false, - safe_mode_enable_hooks = {}, - safe_mode_disable_hooks = {}, - _router_cache_last_clear_ts = fiber.time() - } -else - return M -end +local M = { + safe_mode = false, + safe_mode_enable_hooks = {}, + safe_mode_disable_hooks = {}, + _router_cache_last_clear_ts = fiber.time() +} local function create_space() local settings_space = box.schema.space.create(SETTINGS_SPACE_NAME, { @@ -36,7 +31,7 @@ local function safe_mode_trigger(_, new, space, op) end if (op == 'INSERT' and new.status == vshard_consts.BUCKET.RECEIVING) or (op == 'REPLACE' and new.status == vshard_consts.BUCKET.SENDING) then - box.broadcast('_crud.safe_mode_enable', true) + box.broadcast(SAFE_MOD_ENABLE_EVENT, true) end end @@ -86,11 +81,17 @@ end local function rebalance_init() box.watch('box.status', function() - if box.info.ro or box.space[SETTINGS_SPACE_NAME] == nil then + if box.info.ro then return end - local stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' } + local stored_safe_mode + if box.space[SETTINGS_SPACE_NAME] == nil then + create_space() + box.space[SETTINGS_SPACE_NAME]:insert{ 'safe_mode', false } + else + stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' } + end M.safe_mode = stored_safe_mode.value if M.safe_mode then @@ -105,36 +106,12 @@ local function rebalance_init() end end) - box.watch('_crud.safe_mode_enable', function(_, do_enable) + box.watch(SAFE_MOD_ENABLE_EVENT, function(_, do_enable) if box.info.ro or not do_enable then return end safe_mode_enable() end) - - if box.info.ro then - return - end - - local stored_safe_mode - if box.space[SETTINGS_SPACE_NAME] == nil then - create_space() - box.space[SETTINGS_SPACE_NAME]:insert{ 'safe_mode', false } - else - stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' } - end - M.safe_mode = stored_safe_mode and stored_safe_mode.value or false - - if M.safe_mode then - for hook, _ in pairs(M.safe_mode_enable_hooks) do - hook() - end - else - box.space._bucket:on_replace(safe_mode_trigger) - for hook, _ in pairs(M.safe_mode_disable_hooks) do - hook() - end - end end local function rebalance_stop() From eefef58ade16ec1d3a626d9f4dce5b5e423db10a Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Thu, 20 Nov 2025 15:27:04 +0300 Subject: [PATCH 09/13] TNTP-2109: Add checks for safe mode trigger and stored value --- crud/common/rebalance.lua | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crud/common/rebalance.lua b/crud/common/rebalance.lua index 3b2f8671..9140571f 100644 --- a/crud/common/rebalance.lua +++ b/crud/common/rebalance.lua @@ -58,7 +58,11 @@ end local function safe_mode_enable() if not box.info.ro then box.space[SETTINGS_SPACE_NAME]:replace{ 'safe_mode', true } - box.space._bucket:on_replace(nil, safe_mode_trigger) + for _, trig in pairs(box.space._bucket:on_replace()) do + if trig == safe_mode_trigger then + box.space._bucket:on_replace(nil, safe_mode_trigger) + end + end end M.safe_mode = true @@ -92,7 +96,7 @@ local function rebalance_init() else stored_safe_mode = box.space[SETTINGS_SPACE_NAME]:get{ 'safe_mode' } end - M.safe_mode = stored_safe_mode.value + M.safe_mode = stored_safe_mode and stored_safe_mode.value or false if M.safe_mode then for hook, _ in pairs(M.safe_mode_enable_hooks) do From 5f3f46f3974f669747a79c19aa0e22661e779d6b Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Thu, 20 Nov 2025 17:16:09 +0300 Subject: [PATCH 10/13] TNTP-2109: Check that tarantool supports box.watch --- crud/common/rebalance.lua | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crud/common/rebalance.lua b/crud/common/rebalance.lua index 9140571f..e17c6d08 100644 --- a/crud/common/rebalance.lua +++ b/crud/common/rebalance.lua @@ -1,4 +1,5 @@ local fiber = require('fiber') +local log = require('log') local vshard_consts = require('vshard.consts') local utils = require('crud.common.utils') @@ -84,6 +85,13 @@ local function safe_mode_disable() end local function rebalance_init() + -- box.watch was introduced in tarantool 2.10.0 + if not utils.tarantool_supports_box_watch() then + log.warn('This version of tarantool does not support autoswitch to safe mode during rebalance. ' + .. 'Update to newer version or use `_crud.rebalance_safe_mode_enable()` to enable safe mode manually.') + return + end + box.watch('box.status', function() if box.info.ro then return From 339ca8fe6eadb8368994fba102d7ae6dd10ef9be Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Thu, 20 Nov 2025 20:33:24 +0300 Subject: [PATCH 11/13] TNTP-2109: Add rebalance related metrics --- crud.lua | 3 +- crud/common/rebalance.lua | 52 +++++++++++++++-- test/integration/metrics_test.lua | 94 +++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 6 deletions(-) create mode 100644 test/integration/metrics_test.lua diff --git a/crud.lua b/crud.lua index 37ec7f7d..2a5dcf95 100644 --- a/crud.lua +++ b/crud.lua @@ -174,7 +174,8 @@ crud.rebalance.router_cache_length = rebalance.router.cache_length crud.rebalance.router_cache_last_clear_ts = rebalance.router.cache_last_clear_ts function crud.init_router() - rawset(_G, 'crud', crud) + rawset(_G, 'crud', crud) + rebalance.metrics.enable_router_metrics() end function crud.stop_router() diff --git a/crud/common/rebalance.lua b/crud/common/rebalance.lua index e17c6d08..69c2506f 100644 --- a/crud/common/rebalance.lua +++ b/crud/common/rebalance.lua @@ -3,10 +3,11 @@ local log = require('log') local vshard_consts = require('vshard.consts') local utils = require('crud.common.utils') +local has_metrics_module, metrics = pcall(require, 'metrics') + local SETTINGS_SPACE_NAME = '_crud_settings' local SAFE_MOD_ENABLE_EVENT = '_crud.safe_mode_enable' - local M = { safe_mode = false, safe_mode_enable_hooks = {}, @@ -85,6 +86,8 @@ local function safe_mode_disable() end local function rebalance_init() + M.metrics.enable_storage_metrics() + -- box.watch was introduced in tarantool 2.10.0 if not utils.tarantool_supports_box_watch() then log.warn('This version of tarantool does not support autoswitch to safe mode during rebalance. ' @@ -131,20 +134,54 @@ local function rebalance_stop() end local function router_cache_clear() - local r = utils.get_vshard_router_instance() M._router_cache_last_clear_ts = fiber.time() - return r:_route_map_clear() + return utils.get_vshard_router_instance():_route_map_clear() end local function router_cache_length() - local r = utils.get_vshard_router_instance() - return r.known_bucket_count + return utils.get_vshard_router_instance().known_bucket_count end local function router_cache_last_clear_ts() return M._router_cache_last_clear_ts end +-- Rebalance related metrics +local function enable_storage_metrics() + if not has_metrics_module then + return + end + + local safe_mode_enabled_gauge = metrics.gauge( + 'tnt_crud_storage_safe_mode_enabled', + "is safe mode enabled on this storage instance" + ) + + metrics.register_callback(function() + safe_mode_enabled_gauge:set(safe_mode_status() and 1 or 0) + end) +end + +local function enable_router_metrics() + if not has_metrics_module then + return + end + + local router_cache_length_gauge = metrics.gauge( + 'tnt_crud_router_cache_length', + "number of bucket routes in vshard router cache" + ) + local router_cache_last_clear_ts_gauge = metrics.gauge( + 'tnt_crud_router_cache_last_clear_ts', + "when vshard router cache was cleared last time" + ) + + metrics.register_callback(function() + router_cache_length_gauge:set(router_cache_length()) + router_cache_last_clear_ts_gauge:set(router_cache_last_clear_ts()) + end) +end + M.init = rebalance_init M.stop = rebalance_stop M.safe_mode_status = safe_mode_status @@ -167,4 +204,9 @@ M.storage_api = { rebalance_safe_mode_disable = safe_mode_disable, } +M.metrics = { + enable_storage_metrics = enable_storage_metrics, + enable_router_metrics = enable_router_metrics, +} + return M diff --git a/test/integration/metrics_test.lua b/test/integration/metrics_test.lua new file mode 100644 index 00000000..e1dd3778 --- /dev/null +++ b/test/integration/metrics_test.lua @@ -0,0 +1,94 @@ +local helpers = require('test.helper') +local t = require('luatest') + +local pgroup = t.group('metrics_integration', helpers.backend_matrix({ + {engine = 'memtx'}, +})) + +local function before_all(g) + helpers.start_default_cluster(g, 'srv_stats') +end + +local function after_all(g) + helpers.stop_cluster(g.cluster, g.params.backend) +end + +local function before_each(g) + g.router:eval("crud = require('crud')") + helpers.call_on_storages(g.cluster, function(server) + server:call('_crud.rebalance_safe_mode_disable') + end) +end + +pgroup.before_all(before_all) + +pgroup.after_all(after_all) + +pgroup.before_each(before_each) + +pgroup.test_safe_mode_metrics = function(g) + local has_metrics_module = require('metrics') + t.skip_if(not has_metrics_module, 'No metrics module in current version') + + -- Check safe mode metric on storage + helpers.call_on_storages(g.cluster, function(server) + local observed = server:eval("return require('metrics').collect({ invoke_callbacks = true })") + local has_metric = false + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_storage_safe_mode_enabled' then + t.assert_equals(m.value, 0, 'Metric shows safe mode disabled') + has_metric = true + break + end + end + if not has_metric then + t.fail('No tnt_crud_storage_safe_mode_enabled metric found') + end + end) + + -- Enable safe mode + helpers.call_on_storages(g.cluster, function(server) + server:call('_crud.rebalance_safe_mode_enable') + end) + + -- Check that metric value has changed + helpers.call_on_storages(g.cluster, function(server) + local observed = server:eval("return require('metrics').collect({ invoke_callbacks = true })") + local has_metric = false + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_storage_safe_mode_enabled' then + t.assert_equals(m.value, 1, 'Metric shows safe mode enabled') + has_metric = true + break + end + end + if not has_metric then + t.fail('No tnt_crud_storage_safe_mode_enabled metric found') + end + end) + + -- Check router cache metric + local observed = g.router:eval("return require('metrics').collect({ invoke_callbacks = true })") + local first_ts = 0 + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_router_cache_last_clear_ts' then + first_ts = m.value + break + end + end + t.assert_gt(first_ts, 0, 'Last cache clear TS is greater than zero') + + -- Clear router cache + g.router:eval("crud.rebalance.router_cache_clear()") + + -- Check that last_clear_ts has changed + observed = g.router:eval("return require('metrics').collect({ invoke_callbacks = true })") + local new_ts = 0 + for _, m in pairs(observed) do + if m.metric_name == 'tnt_crud_router_cache_last_clear_ts' then + new_ts = m.value + break + end + end + t.assert_gt(new_ts, first_ts, 'Last cache clear TS is greater than the first one') +end From 0c6370fd48b815925dbdaf42c02a72ee2990b906 Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Tue, 25 Nov 2025 21:32:32 +0300 Subject: [PATCH 12/13] TNTP-2109: Add safe mode on/off to test matrix --- test/helper.lua | 38 +++++++++++++++++++++++++++++- test/unit/not_initialized_test.lua | 2 +- test/unit/stats_test.lua | 1 + 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/test/helper.lua b/test/helper.lua index 9aa127d1..eae17e8c 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -981,6 +981,16 @@ function helpers.start_cluster(g, cartridge_cfg, vshard_cfg, tarantool3_cluster_ error(err) end end + + if g.params and g.params.safe_mode ~= nil then + local safe_mod_func = '_crud.rebalance_safe_mode_disable' + if g.params.safe_mode then + safe_mod_func = '_crud.rebalance_safe_mode_enable' + end + helpers.call_on_storages(g.cluster, function(server) + server:call(safe_mod_func) + end) + end end local function count_storages_in_topology(g, backend, vshard_group, storage_roles) @@ -1178,8 +1188,34 @@ function helpers.is_cartridge_suite_supported() return is_module_provided and is_tarantool_supports end -function helpers.backend_matrix(base_matrix) +function helpers.safe_mode_matrix(base_matrix) + base_matrix = base_matrix or {{}} + + local safe_mode_params = { + { safe_mode = true }, + { safe_mode = false }, + } + + local matrix = {} + for _, params in ipairs(safe_mode_params) do + for _, base in ipairs(base_matrix) do + base = table.deepcopy(base) + base.safe_mode = params.safe_mode + table.insert(matrix, base) + end + end + + return matrix +end + +function helpers.backend_matrix(base_matrix, opts) base_matrix = base_matrix or {{}} + opts = opts or {} + + if not opts.skip_safe_mode then + base_matrix = helpers.safe_mode_matrix(base_matrix) + end + local backend_params = { { backend = helpers.backend.VSHARD, diff --git a/test/unit/not_initialized_test.lua b/test/unit/not_initialized_test.lua index 82f9b4ee..e2f6793a 100644 --- a/test/unit/not_initialized_test.lua +++ b/test/unit/not_initialized_test.lua @@ -5,7 +5,7 @@ local server = require('luatest.server') local pgroup = t.group('not-initialized', helpers.backend_matrix({ {}, -})) +}, { skip_safe_mode = true })) local vshard_cfg_template = { sharding = { diff --git a/test/unit/stats_test.lua b/test/unit/stats_test.lua index fdaf8c01..9ae6847d 100644 --- a/test/unit/stats_test.lua +++ b/test/unit/stats_test.lua @@ -45,6 +45,7 @@ local function enable_stats(g, params) params = table.deepcopy(params) params.backend = nil params.backend_cfg = nil + params.safe_mode = nil end g.router:eval("stats_module.enable(...)", { params }) end From 6683748b227102fb7ce9e70e92f3163131abd69e Mon Sep 17 00:00:00 2001 From: Sergey Morozov Date: Wed, 26 Nov 2025 20:03:06 +0300 Subject: [PATCH 13/13] TNTP-2109: Do not disable safe mode on server stop --- crud/common/rebalance.lua | 9 ++++----- crud/storage.lua | 3 --- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/crud/common/rebalance.lua b/crud/common/rebalance.lua index 69c2506f..2e6e0ccb 100644 --- a/crud/common/rebalance.lua +++ b/crud/common/rebalance.lua @@ -71,6 +71,8 @@ local function safe_mode_enable() for hook, _ in pairs(M.safe_mode_enable_hooks) do hook() end + + log.info('Rebalance safe mode enabled') end local function safe_mode_disable() @@ -83,6 +85,8 @@ local function safe_mode_disable() for hook, _ in pairs(M.safe_mode_disable_hooks) do hook() end + + log.info('Rebalance safe mode disabled') end local function rebalance_init() @@ -129,10 +133,6 @@ local function rebalance_init() end) end -local function rebalance_stop() - M.safe_mode_disable() -end - local function router_cache_clear() M._router_cache_last_clear_ts = fiber.time() return utils.get_vshard_router_instance():_route_map_clear() @@ -183,7 +183,6 @@ local function enable_router_metrics() end M.init = rebalance_init -M.stop = rebalance_stop M.safe_mode_status = safe_mode_status M.safe_mode_enable = safe_mode_enable M.safe_mode_disable = safe_mode_disable diff --git a/crud/storage.lua b/crud/storage.lua index 58acc4ef..c6356008 100644 --- a/crud/storage.lua +++ b/crud/storage.lua @@ -145,9 +145,6 @@ function storage.stop() internal_stash.watcher:unregister() internal_stash.watcher = nil end - - rebalance.stop() - rawset(_G, utils.STORAGE_NAMESPACE, nil) end