Skip to content
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ local readview = require('crud.readview')
local schema = require('crud.schema')
local storage_info = require('crud.storage_info')
local storage = require('crud.storage')
local rebalance = require('crud.common.rebalance')

local crud = {}

Expand Down Expand Up @@ -158,6 +159,20 @@ crud.readview = readview.new
-- @function schema
crud.schema = schema.call

crud.rebalance = {}

-- @refer rebalance.router_cache_clear
-- @function router_cache_clear
crud.rebalance.router_cache_clear = rebalance.router.cache_clear

-- @refer rebalance.router_cache_length
-- @function router_cache_length
crud.rebalance.router_cache_length = rebalance.router.cache_length

-- @refer rebalance.router_cache_last_clear_ts
-- @function router_cache_last_clear_ts
crud.rebalance.router_cache_last_clear_ts = rebalance.router.cache_last_clear_ts

function crud.init_router()
rawset(_G, 'crud', crud)
end
Expand Down
113 changes: 97 additions & 16 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
local errors = require('errors')
local vshard = require('vshard')

local call_cache = require('crud.common.call_cache')
local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')
local sharding_utils = require('crud.common.sharding.utils')
local fiber_clock = require('fiber').clock
local fiber = require('fiber')
local const = require('crud.common.const')
local rebalance = require('crud.common.rebalance')

local BaseIterator = require('crud.common.map_call_cases.base_iter')
local BasePostprocessor = require('crud.common.map_call_cases.base_postprocessor')
Expand All @@ -14,14 +16,79 @@ local CallError = errors.new_class('CallError')

local CALL_FUNC_NAME = 'call_on_storage'
local CRUD_CALL_FUNC_NAME = utils.get_storage_call(CALL_FUNC_NAME)

local CRUD_CALL_FIBER_NAME = CRUD_CALL_FUNC_NAME .. '/'

local call = {}

local function call_on_storage(run_as_user, func_name, ...)
local function bucket_unref_many(bucket_ids, mode)
local all_ok = true
local last_err = nil
for _, bucket_id in pairs(bucket_ids) do
local ok, err = vshard.storage.bucket_unref(bucket_id, mode)
if not ok then
all_ok = nil
last_err = err
end
end
return all_ok, last_err
end

local function bucket_ref_many(bucket_ids, mode)
local reffed = {}
for _, bucket_id in pairs(bucket_ids) do
local ok, err = vshard.storage.bucket_ref(bucket_id, mode)
if not ok then
bucket_unref_many(reffed, mode)
return nil, err
end
table.insert(reffed, bucket_id)
end
return true, nil
end

local function call_on_storage_safe(run_as_user, bucket_ids, mode, func_name, ...)
fiber.name(CRUD_CALL_FIBER_NAME .. func_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we mark safe functions?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is definitely not necessary but may be useful for debugging


local ok, ref_err = bucket_ref_many(bucket_ids, mode)
if not ok then
return nil, ref_err
end

local res = {box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)}

ok, ref_err = bucket_unref_many(bucket_ids, mode)
if not ok then
return nil, ref_err
end

return unpack(res, 1, table.maxn(res))
end

local function call_on_storage_fast(run_as_user, _, _, func_name, ...)
fiber.name(CRUD_CALL_FIBER_NAME .. func_name)

return box.session.su(run_as_user, call_cache.func_name_to_func(func_name), ...)
end

local call_on_storage = rebalance.safe_mode and call_on_storage_safe or call_on_storage_fast

local function safe_mode_enable()
call_on_storage = call_on_storage_safe

for fb_id, fb in pairs(fiber.info()) do
if string.find(fb.name, CRUD_CALL_FIBER_NAME) then
fiber.kill(fb_id)
end
end
end

local function safe_mode_disable()
call_on_storage = call_on_storage_fast
end

rebalance.register_safe_mode_enable_hook(safe_mode_enable)
rebalance.register_safe_mode_disable_hook(safe_mode_disable)

call.storage_api = {[CALL_FUNC_NAME] = call_on_storage}

function call.get_vshard_call_name(mode, prefer_replica, balance)
Expand Down Expand Up @@ -82,8 +149,10 @@ local function wrap_vshard_err(vshard_router, err, func_name, replicaset_id, buc
))
end

local function retry_call_with_master_discovery(replicaset, method, func_name, func_args, call_opts)
local func_args_ext = utils.append_array({ box.session.effective_user(), func_name }, func_args)
local function retry_call_with_master_discovery(vshard_router, replicaset,
method, func_name, func_args,
call_opts, mode, bucket_ids)
local func_args_ext = utils.append_array({ box.session.effective_user(), bucket_ids, mode, func_name }, func_args)

-- In case cluster was just bootstrapped with auto master discovery,
-- replicaset may miss master.
Expand All @@ -93,7 +162,18 @@ local function retry_call_with_master_discovery(replicaset, method, func_name, f
return resp, err
end

if err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
if err.name == 'WRONG_BUCKET' or
err.name == 'BUCKET_IS_LOCKED' or
err.name == 'TRANSFER_IS_IN_PROGRESS' then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentations

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried to beautify it)

vshard_router:_bucket_reset(err.bucket_id)

-- Substitute replicaset only for single bucket_id calls.
if err.destination and vshard_router.replicasets[err.destination] and #bucket_ids == 1 then
replicaset = vshard_router.replicasets[err.destination]
else
return nil, err
end
Comment on lines +170 to +177
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but very very simplified. Mostly because bucket_set() can't be accessed from outside vshard.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment about vshard above

elseif err.name == 'MISSING_MASTER' and replicaset.locate_master ~= nil then
replicaset:locate_master()
end

Expand Down Expand Up @@ -145,10 +225,10 @@ function call.map(vshard_router, func_name, func_args, opts)
request_timeout = opts.mode == 'read' and opts.request_timeout or nil,
}
while iter:has_next() do
local args, replicaset, replicaset_id = iter:get()
local args, replicaset, replicaset_id, bucket_ids = iter:get()

local future, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
func_name, args, call_opts)
local future, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
func_name, args, call_opts, opts.mode, bucket_ids)

if err ~= nil then
local result_info = {
Expand All @@ -170,9 +250,9 @@ function call.map(vshard_router, func_name, func_args, opts)
futures_by_replicasets[replicaset_id] = future
end

local deadline = fiber_clock() + timeout
local deadline = fiber.clock() + timeout
for replicaset_id, future in pairs(futures_by_replicasets) do
local wait_timeout = deadline - fiber_clock()
local wait_timeout = deadline - fiber.clock()
if wait_timeout < 0 then
wait_timeout = 0
end
Expand Down Expand Up @@ -221,9 +301,9 @@ function call.single(vshard_router, bucket_id, func_name, func_args, opts)
local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT
local request_timeout = opts.mode == 'read' and opts.request_timeout or nil

local res, err = retry_call_with_master_discovery(replicaset, vshard_call_name,
func_name, func_args, {timeout = timeout,
request_timeout = request_timeout})
local res, err = retry_call_with_master_discovery(vshard_router, replicaset, vshard_call_name,
func_name, func_args, {timeout = timeout, request_timeout = request_timeout},
opts.mode, {bucket_id})
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
end
Expand All @@ -248,8 +328,9 @@ function call.any(vshard_router, func_name, func_args, opts)
end
local replicaset_id, replicaset = next(replicasets)

local res, err = retry_call_with_master_discovery(replicaset, 'call',
func_name, func_args, {timeout = timeout})
local res, err = retry_call_with_master_discovery(vshard_router, replicaset, 'call',
func_name, func_args, {timeout = timeout},
'read', {})
if err ~= nil then
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset_id)
end
Expand Down
2 changes: 1 addition & 1 deletion crud/common/map_call_cases/base_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ function BaseIterator:get()
local replicaset = self.next_replicaset
self.next_index, self.next_replicaset = next(self.replicasets, self.next_index)

return self.func_args, replicaset, replicaset_id
return self.func_args, replicaset, replicaset_id, {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please write 4th return name in doc for method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

end

return BaseIterator
3 changes: 2 additions & 1 deletion crud/common/map_call_cases/batch_insert_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ function BatchInsertIterator:get()
self.next_batch.tuples,
self.opts,
}
local bucket_ids = self.next_batch.bucket_ids

self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)

return func_args, replicaset, replicaset_id
return func_args, replicaset, replicaset_id, bucket_ids
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please write 4th return name in doc for method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

end

return BatchInsertIterator
3 changes: 2 additions & 1 deletion crud/common/map_call_cases/batch_upsert_iter.lua
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ function BatchUpsertIterator:get()
self.next_batch.operations,
self.opts,
}
local bucket_ids = self.next_batch.bucket_ids

self.next_index, self.next_batch = next(self.batches_by_replicasets, self.next_index)

return func_args, replicaset, replicaset_id
return func_args, replicaset, replicaset_id, bucket_ids
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please write 4th return name in doc for method

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

end

return BatchUpsertIterator
Loading
Loading