-
Notifications
You must be signed in to change notification settings - Fork 15
TNTP-2109: Switch to "safe" mode on vshard rebalance #462
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 7 commits
67260d0
9b79a68
7730a5c
8898804
358d0ac
475590a
90e1fa2
1fa88c7
eefef58
5f3f46f
339ca8f
0c6370f
6683748
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,11 +1,13 @@ | ||
| local errors = require('errors') | ||
| local vshard = require('vshard') | ||
|
|
||
| local call_cache = require('crud.common.call_cache') | ||
| local dev_checks = require('crud.common.dev_checks') | ||
| local utils = require('crud.common.utils') | ||
| local sharding_utils = require('crud.common.sharding.utils') | ||
| local fiber_clock = require('fiber').clock | ||
| local fiber = require('fiber') | ||
| local const = require('crud.common.const') | ||
| local rebalance = require('crud.common.rebalance') | ||
|
|
||
| local BaseIterator = require('crud.common.map_call_cases.base_iter') | ||
| local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor') | ||
|
|
@@ -14,14 +16,79 @@ local CallError = errors.new_class('CallError') | |
|
|
||
| local CALL_FUNC_NAME = 'call_on_storage' | ||
| local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME) | ||
|
|
||
| local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/' | ||
|
|
||
| local call = {} | ||
|
|
||
| local function call_on_storage(run_as_user, func_name, ...) | ||
| local function bucket_unref_many(bucket_ids, mode) | ||
| local all_ok = true | ||
| local last_err = nil | ||
| for _, bucket_id in pairs(bucket_ids) do | ||
| local ok, err = vshard.storage.bucket_unref(bucket_id, mode) | ||
| if not ok then | ||
| all_ok = nil | ||
| last_err = err | ||
| end | ||
| end | ||
| return all_ok, last_err | ||
| end | ||
|
|
||
| local function bucket_ref_many(bucket_ids, mode) | ||
| local reffed = {} | ||
| for _, bucket_id in pairs(bucket_ids) do | ||
| local ok, err = vshard.storage.bucket_ref(bucket_id, mode) | ||
| if not ok then | ||
| bucket_unref_many(reffed, mode) | ||
| return nil, err | ||
| end | ||
| table.insert(reffed, bucket_id) | ||
| end | ||
| return true, nil | ||
| end | ||
|
|
||
| local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...) | ||
| fiber.name(CRUD_CALL_FIBER_NAME .. func_name) | ||
|
|
||
| local ok, ref_err = bucket_ref_many(bucket_ids, mode) | ||
| if not ok then | ||
| return nil, ref_err | ||
| end | ||
|
|
||
| local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)} | ||
|
|
||
| ok, ref_err = bucket_unref_many(bucket_ids, mode) | ||
| if not ok then | ||
| return nil, ref_err | ||
| end | ||
|
|
||
| return unpack(res, 1, table.maxn(res)) | ||
| end | ||
|
|
||
| local function call_on_storage_fast(run_as_user, _, _, func_name, ...) | ||
| fiber.name(CRUD_CALL_FIBER_NAME .. func_name) | ||
|
|
||
| return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...) | ||
| end | ||
|
|
||
| local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast | ||
|
|
||
| local function safe_mode_enable() | ||
| call_on_storage = call_on_storage_safe | ||
|
|
||
| for fb_id, fb in pairs(fiber.info()) do | ||
| if string.find(fb.name, CRUD_CALL_FIBER_NAME) then | ||
| fiber.kill(fb_id) | ||
| end | ||
| end | ||
| end | ||
|
|
||
| local function safe_mode_disable() | ||
| call_on_storage = call_on_storage_fast | ||
| end | ||
|
|
||
| rebalance.register_safe_mode_enable_hook(safe_mode_enable) | ||
| rebalance.register_safe_mode_disable_hook(safe_mode_disable) | ||
|
|
||
| call.storage_api = {[CALL_FUNC_NAME] = call_on_storage} | ||
|
|
||
| function call.get_vshard_call_name(mode, prefer_replica, balance) | ||
|
|
@@ -82,8 +149,10 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc | |
| )) | ||
| end | ||
|
|
||
| local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts) | ||
| local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args) | ||
| local function retry_call_with_master_discovery(vshard_router, replicaset, | ||
| method, func_name, func_args, | ||
| call_opts, mode, bucket_ids) | ||
| local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args) | ||
|
|
||
| -- In case cluster was just bootstrapped with auto master discovery, | ||
| -- replicaset may miss master. | ||
|
|
@@ -93,7 +162,18 @@ local function retry_call_with_master_discovery(replicaset, method, func_name, f | |
| return resp, err | ||
| end | ||
|
|
||
| if err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then | ||
| if err.name == 'WRONG_BUCKET' or | ||
| err.name == 'BUCKET_IS_LOCKED' or | ||
| err.name == 'TRANSFER_IS_IN_PROGRESS' then | ||
|
||
| vshard_router:_bucket_reset(err.bucket_id) | ||
|
|
||
| -- Substitute replicaset only for single bucket_id calls. | ||
| if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then | ||
| replicaset = vshard_router.replicasets[err.destination] | ||
| else | ||
| return nil, err | ||
| end | ||
|
Comment on lines
+170
to
+177
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but very very simplified. Mostly because
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a comment about vshard above |
||
| elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then | ||
| replicaset:locate_master() | ||
| end | ||
|
|
||
|
|
@@ -145,10 +225,10 @@ function call.map(vshard_router, func_name, func_args, opts) | |
| request_timeout = opts.mode == 'read' and opts.request_timeout or nil, | ||
| } | ||
| while iter:has_next() do | ||
| local args, replicaset, replicaset_id = iter:get() | ||
| local args, replicaset, replicaset_id, bucket_ids = iter:get() | ||
|
|
||
| local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name, | ||
| func_name, args, call_opts) | ||
| local future, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name, | ||
| func_name, args, call_opts, opts.mode, bucket_ids) | ||
|
|
||
| if err ~= nil then | ||
| local result_info = { | ||
|
|
@@ -170,9 +250,9 @@ function call.map(vshard_router, func_name, func_args, opts) | |
| futures_by_replicasets[replicaset_id] = future | ||
| end | ||
|
|
||
| local deadline = fiber_clock() + timeout | ||
| local deadline = fiber.clock() + timeout | ||
| for replicaset_id, future in pairs(futures_by_replicasets) do | ||
| local wait_timeout = deadline - fiber_clock() | ||
| local wait_timeout = deadline - fiber.clock() | ||
| if wait_timeout < 0 then | ||
| wait_timeout = 0 | ||
| end | ||
|
|
@@ -221,9 +301,9 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts) | |
| local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT | ||
| local request_timeout = opts.mode == 'read' and opts.request_timeout or nil | ||
|
|
||
| local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name, | ||
| func_name, func_args, {timeout = timeout, | ||
| request_timeout = request_timeout}) | ||
| local res, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name, | ||
| func_name, func_args, {timeout = timeout, request_timeout = request_timeout}, | ||
| opts.mode, {bucket_id}) | ||
| if err ~= nil then | ||
| return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id) | ||
| end | ||
|
|
@@ -248,8 +328,9 @@ function call.any(vshard_router, func_name, func_args, opts) | |
| end | ||
| local replicaset_id, replicaset = next(replicasets) | ||
|
|
||
| local res, err = retry_call_with_master_discovery(replicaset, 'call', | ||
| func_name, func_args, {timeout = timeout}) | ||
| local res, err = retry_call_with_master_discovery(vshard_router, replicaset, 'call', | ||
| func_name, func_args, {timeout = timeout}, | ||
| 'read', {}) | ||
| if err ~= nil then | ||
| return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id) | ||
| end | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -72,7 +72,7 @@ function BaseIterator:get() | |
| local replicaset = self.next_replicaset | ||
| self.next_index, self.next_replicaset = next(self.replicasets, self.next_index) | ||
|
|
||
| return self.func_args, replicaset, replicaset_id | ||
| return self.func_args, replicaset, replicaset_id, {} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please write 4th return name in doc for method
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
| end | ||
|
|
||
| return BaseIterator | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -76,10 +76,11 @@ function BatchInsertIterator:get() | |
| self.next_batch.tuples, | ||
| self.opts, | ||
| } | ||
| local bucket_ids = self.next_batch.bucket_ids | ||
|
|
||
| self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) | ||
|
|
||
| return func_args, replicaset, replicaset_id | ||
| return func_args, replicaset, replicaset_id, bucket_ids | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please write 4th return name in doc for method
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
| end | ||
|
|
||
| return BatchInsertIterator | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,10 +85,11 @@ function BatchUpsertIterator:get() | |
| self.next_batch.operations, | ||
| self.opts, | ||
| } | ||
| local bucket_ids = self.next_batch.bucket_ids | ||
|
|
||
| self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index) | ||
|
|
||
| return func_args, replicaset, replicaset_id | ||
| return func_args, replicaset, replicaset_id, bucket_ids | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please write 4th return name in doc for method
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fixed |
||
| end | ||
|
|
||
| return BatchUpsertIterator | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we mark safe functions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is definitely not necessary but may be useful for debugging