Skip to content

Commit 4449920

Browse files
committed
implmement RedisModule_ScanKey as foreach function with closure and Rust iterator using a KeyScanCursor type
add example todo integration tests
1 parent df56f94 commit 4449920

File tree

5 files changed

+360
-1
lines changed

5 files changed

+360
-1
lines changed

examples/scan_keys.rs

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,58 @@
1+
use std::cell::RefCell;
2+
3+
// This example shows the usage of the scan functionality of the Rust Redis Module API Wrapper.
4+
//
5+
// The example implements three commands:
6+
//
7+
// 1. `scan_keys` - scans all keys in the database and returns their names as an array of RedisString.
8+
// 2. `scan_key_it <key>` - scans all fields and values in a hash key providing an iterator and return the field/value pairs as an array of RedisString.
9+
// 3. `scan_key_fe <key>` - scans all fields and values in a hash key using a closure that stores tthe field/value pairs as an array of RedisString.
10+
//
11+
// `scan_key_it` always copies the field and value strings, while `scan_key_fe` uses references to the field and value strings. In that example
12+
// both implementations need to clone the strings, because we want to return them as an array of RedisString.
13+
//
14+
// ## Usage examples for scan_key:
15+
//
16+
// First we need to setup a hash key with some fields and values:
17+
//
18+
// ```
19+
// HSET user:123 name Alice age 29 location Austin
20+
// ```
21+
//
22+
// We need to start redis-server with the module loaded (example for macOS, adjust path as needed):
23+
//
24+
// ```
25+
// redis-server --loadmodule ./target/debug/examples/libscan_keys.dylib
26+
// ```
27+
//
28+
// Afterwards we can call the commands via redis-cli:
29+
//
30+
// ```
31+
// > SCAN_KEYS
32+
// 1) "user:123"
33+
//
34+
// > SCAN_KEY_IT user:123
35+
// 1) "name"
36+
// 2) "Alice"
37+
// 3) "age"
38+
// 4) "29"
39+
// 5) "location"
40+
// 6) "Austin"
41+
//
42+
// > SCAN_KEY_FE user:123
43+
// 1) "name"
44+
// 2) "Alice"
45+
// 3) "age"
46+
// 4) "29"
47+
// 5) "location"
48+
// 6) "Austin"
49+
// ```
50+
151
use redis_module::{
2-
key::RedisKey, redis_module, Context, KeysCursor, RedisResult, RedisString, RedisValue,
52+
key::{KeyFlags, RedisKey}, redis_module, Context, KeysCursor, RedisError, RedisResult, RedisString, RedisValue, ScanKeyCursor
353
};
454

55+
/// Scans all keys in the database and returns their names as an array of RedisString.
556
fn scan_keys(ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
657
let cursor = KeysCursor::new();
758
let mut res = Vec::new();
@@ -16,6 +67,59 @@ fn scan_keys(ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
1667
Ok(RedisValue::Array(res))
1768
}
1869

70+
/// Scans all fields and values in a hash key and returns them as an array of RedisString.
71+
/// The command takes one argument: the name of the hash key to scan.
72+
fn scan_key_foreach(ctx: &Context, args: Vec<RedisString>) -> RedisResult {
73+
// only argument is the key name
74+
if args.len() != 2 {
75+
return Err(RedisError::WrongArity);
76+
}
77+
78+
let key_name = &args[1];
79+
let key = ctx.open_key_with_flags(key_name, KeyFlags::NOEFFECTS | KeyFlags::NOEXPIRE | KeyFlags::ACCESS_EXPIRED );
80+
let ty = key.key_type();
81+
let cursor = ScanKeyCursor::new(key);
82+
let out = format!("Cursor created for Scanning key: {}, type={:?}", key_name, ty);
83+
ctx.log_notice(&out);
84+
85+
let res = RefCell::new(Vec::new());
86+
cursor.foreach(|_key, field, value| {
87+
let out = format!("Field: {}, Value: {}", field, value);
88+
ctx.log_notice(&out);
89+
90+
let mut res = res.borrow_mut();
91+
res.push(RedisValue::BulkRedisString(field.clone()));
92+
res.push(RedisValue::BulkRedisString(value.clone()));
93+
});
94+
95+
Ok(RedisValue::Array(res.take()))
96+
}
97+
98+
/// Scans all fields and values in a hash key and returns them as an array of RedisString.
99+
/// The command takes one argument: the name of the hash key to scan.
100+
fn scan_key_iterator(ctx: &Context, args: Vec<RedisString>) -> RedisResult {
101+
// only argument is the key name
102+
if args.len() != 2 {
103+
return Err(RedisError::WrongArity);
104+
}
105+
106+
let key_name = &args[1];
107+
let mut res = Vec::new();
108+
let key = ctx.open_key_with_flags(key_name, KeyFlags::NOEFFECTS | KeyFlags::NOEXPIRE | KeyFlags::ACCESS_EXPIRED );
109+
let ty = key.key_type();
110+
let cursor = ScanKeyCursor::new(key);
111+
let out = format!("Cursor created for Scanning key: {}, type={:?}", key_name, ty);
112+
ctx.log_notice(&out);
113+
114+
for (no, (field, value)) in cursor.iter().enumerate() {
115+
let out = format!("It: {}, Field: {}, Value: {}", no, field, value);
116+
ctx.log_notice(&out);
117+
res.push(RedisValue::BulkRedisString(field));
118+
res.push(RedisValue::BulkRedisString(value));
119+
}
120+
Ok(RedisValue::Array(res))
121+
}
122+
19123
//////////////////////////////////////////////////////
20124

21125
redis_module! {
@@ -25,5 +129,7 @@ redis_module! {
25129
data_types: [],
26130
commands: [
27131
["scan_keys", scan_keys, "readonly", 0, 0, 0, ""],
132+
["scan_key_it", scan_key_iterator, "readonly", 0, 0, 0, ""],
133+
["scan_key_fe", scan_key_foreach, "readonly", 0, 0, 0, ""],
28134
],
29135
}

src/context/key_scan_cursor.rs

Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
use std::{ffi::c_void, ptr::{self, addr_of_mut}};
2+
3+
use crate::{key::RedisKey, raw, Context, RedisString};
4+
5+
/// A cursor to scan fields and values in a hash key.
6+
///
7+
/// This is a wrapper around the RedisModule_ScanKey function from the C API. It provides access via [`ScanKeyCursor::foreach] and provides
8+
/// a Rust iterator.
9+
///
10+
/// Example usage:
11+
/// ```no_run
12+
///
13+
/// ```
14+
///
15+
/// The iterator yields tuples of (field: RedisString, value: RedisString).
16+
///
17+
/// ## Implementation notes
18+
///
19+
/// The `RedisModule_ScanKey` function from the C API uses a callback to return the field and value strings. We
20+
/// distinguish two cases:
21+
///
22+
/// 1. Either the callback is called once,
23+
/// 2. or multiple times
24+
///
25+
/// and this depends if a rehash happens during the scan.
26+
pub struct ScanKeyCursor {
27+
key: RedisKey,
28+
inner_cursor: *mut raw::RedisModuleScanCursor,
29+
}
30+
31+
//type ScanKeyCallback<F> = F where F: FnMut(&RedisKey, &RedisString, &RedisString);
32+
33+
impl ScanKeyCursor {
34+
pub fn new(key: RedisKey) -> Self {
35+
let inner_cursor = unsafe { raw::RedisModule_ScanCursorCreate.unwrap()() };
36+
Self { key, inner_cursor }
37+
}
38+
39+
pub fn restart(&self) {
40+
unsafe { raw::RedisModule_ScanCursorRestart.unwrap()(self.inner_cursor) };
41+
}
42+
43+
/// Implements a callback based foreach loop over all fields and values in the hash key, use for optimal performance.
44+
pub fn foreach<F: Fn(&RedisKey, &RedisString, &RedisString)>(&self, f: F) {
45+
// Safety: Assumption: c-side initialized the function ptr and it is is never changed,
46+
// i.e. after module initialization the function pointers stay valid till the end of the program.
47+
let scan_key = unsafe { raw::RedisModule_ScanKey.unwrap() };
48+
49+
let mut res = 1;
50+
while res != 0 {
51+
res = unsafe {
52+
scan_key(
53+
self.key.key_inner,
54+
self.inner_cursor,
55+
Some(foreach_callback::<F>),
56+
&f as *const F as *mut c_void,
57+
)
58+
}
59+
}
60+
}
61+
62+
pub fn iter(&self) -> ScanKeyCursorIterator<'_> {
63+
let ctx = Context::new(self.key.ctx);
64+
ctx.log_notice("Starting ScanKeyCursor iteration");
65+
ScanKeyCursorIterator {
66+
cursor: self,
67+
buf: Vec::new(),
68+
last_call: false,
69+
}
70+
}
71+
}
72+
73+
impl Drop for ScanKeyCursor {
74+
fn drop(&mut self) {
75+
unsafe { raw::RedisModule_ScanCursorDestroy.unwrap()(self.inner_cursor) };
76+
}
77+
}
78+
79+
pub type ScanKeyIteratorItem = (RedisString, RedisString);
80+
81+
pub struct ScanKeyCursorIterator<'a> {
82+
/// The cursor that is used for the iteration
83+
cursor: &'a ScanKeyCursor,
84+
85+
/// Buffer to hold the uninitialized data if the C callback is called multiple times.
86+
buf: Vec<ScanKeyIteratorItem>,
87+
88+
/// Stores a flag that indicates if scan_key needs to be called again
89+
last_call: bool,
90+
}
91+
92+
enum IteratorState {
93+
NeedToCallScanKey,
94+
HasBufferedItems,
95+
Done,
96+
}
97+
98+
enum StackSlotState {
99+
Empty,
100+
Filled(ScanKeyIteratorItem),
101+
}
102+
103+
struct StackSlot<'a> {
104+
state: StackSlotState,
105+
ctx: Context,
106+
buf: &'a mut Vec<ScanKeyIteratorItem>,
107+
}
108+
109+
impl ScanKeyCursorIterator<'_> {
110+
fn current_state(&self) -> IteratorState {
111+
if !self.buf.is_empty() {
112+
IteratorState::HasBufferedItems
113+
} else if self.last_call {
114+
IteratorState::Done
115+
} else {
116+
IteratorState::NeedToCallScanKey
117+
}
118+
}
119+
120+
fn next_scan_call(&mut self) -> Option<ScanKeyIteratorItem> {
121+
let ctx_ptr = self.cursor.key.ctx;
122+
let ctx = Context::new(ctx_ptr);
123+
124+
let mut stack_slot = StackSlot {
125+
state: StackSlotState::Empty,
126+
ctx: Context::new(ctx_ptr),
127+
buf: &mut self.buf
128+
};
129+
130+
let data_ptr = addr_of_mut!(stack_slot).cast::<c_void>();
131+
132+
// Safety: Assumption: c-side initialized the function ptr and it is is never changed,
133+
// i.e. after module initialization the function pointers stay valid till the end of the program.
134+
let scan_key = unsafe { raw::RedisModule_ScanKey.unwrap() };
135+
136+
// Safety: All pointers we pass here are guaranteed to remain valid during the `scan_key` call.
137+
let ret = unsafe {
138+
scan_key(
139+
self.cursor.key.key_inner,
140+
self.cursor.inner_cursor,
141+
Some(iterator_callback),
142+
data_ptr,
143+
)
144+
};
145+
146+
// Check if more calls are needed
147+
if ret == 0 {
148+
self.last_call = true;
149+
// we may still have buffered items
150+
}
151+
152+
let StackSlotState::Filled(reval) = stack_slot.state else {
153+
// should not happen
154+
panic!("ScanKey callback did not fill the stack slot");
155+
};
156+
157+
ctx.log_notice(&format!("next Reval field: {}, value: {}", reval.0, reval.1));
158+
159+
Some(reval)
160+
}
161+
162+
fn next_buffered_item(&mut self) -> Option<ScanKeyIteratorItem> {
163+
// todo: use different datatype / access pattern for performance
164+
Some(self.buf.remove(0))
165+
}
166+
}
167+
168+
/// The callback that is called by `RedisModule_ScanKey` to return the field and value strings.
169+
///
170+
/// The `data` pointer is a stack slot of type `RawData` that is used to pass the data back to the iterator.
171+
unsafe extern "C" fn foreach_callback<F: Fn(&RedisKey, &RedisString, &RedisString)>(
172+
key: *mut raw::RedisModuleKey,
173+
field: *mut raw::RedisModuleString,
174+
value: *mut raw::RedisModuleString,
175+
data: *mut c_void,
176+
) {
177+
let ctx = ptr::null_mut();
178+
let key = RedisKey::from_raw_parts(ctx, key);
179+
180+
let field = RedisString::from_redis_module_string(ctx, field);
181+
let value = RedisString::from_redis_module_string(ctx, value);
182+
183+
let callback = unsafe { &mut *(data.cast::<F>()) };
184+
callback(&key, &field, &value);
185+
186+
// we're not the owner of field and value strings
187+
field.take();
188+
value.take();
189+
190+
key.take(); // we're not the owner of the key either
191+
}
192+
193+
/// The callback that is called by `RedisModule_ScanKey` to return the field and value strings.
194+
///
195+
/// The `data` pointer is a stack slot of type `RawData` that is used to pass the data back to the iterator.
196+
unsafe extern "C" fn iterator_callback(
197+
_key: *mut raw::RedisModuleKey,
198+
field: *mut raw::RedisModuleString,
199+
value: *mut raw::RedisModuleString,
200+
data: *mut c_void,
201+
) {
202+
// SAFETY: this is the responsibility of the caller, see only usage below in `next()`
203+
// `data` is a stack slot of type Data
204+
let slot = data.cast::<StackSlot>();
205+
let slot = &mut (*slot);
206+
207+
// todo: use new-type with refcount handling for better performance
208+
let field = raw::RedisModule_CreateStringFromString.unwrap()(slot.ctx.get_raw(), field);
209+
let value = raw::RedisModule_CreateStringFromString.unwrap()(slot.ctx.get_raw(), value);
210+
211+
let field = RedisString::from_redis_module_string(slot.ctx.get_raw(), field);
212+
let value = RedisString::from_redis_module_string(slot.ctx.get_raw(), value);
213+
214+
match slot.state {
215+
StackSlotState::Empty => {
216+
let out = format!("CB - Fill empty slot - Field: {}, Value: {}", field, value);
217+
slot.ctx.log_notice(&out);
218+
slot.state = StackSlotState::Filled((field, value));
219+
}
220+
StackSlotState::Filled(_) => {
221+
// This is the case where the callback is called multiple times.
222+
// We need to buffer the data in the iterator state.
223+
let out = format!("CB - Buffer for future use - Field: {}, Value: {}", field, value);
224+
slot.ctx.log_notice(&out);
225+
slot.buf.push((field, value));
226+
227+
}
228+
}
229+
230+
}
231+
232+
// Implements an iterator for `KeysCursor` that yields (RedisKey, *mut RedisModuleString, *mut RedisModuleString) in a Rust for loop.
233+
// This is a wrapper around the RedisModule_ScanKey function from the C API and uses a pattern to get the values from the callback that
234+
// is also used in stack unwinding scenarios. There is not common term for that but here we can think of it as a "stack slot" pattern.
235+
impl Iterator for ScanKeyCursorIterator<'_> {
236+
type Item = ScanKeyIteratorItem;
237+
238+
fn next(&mut self) -> Option<Self::Item> {
239+
let ctx = Context::new(self.cursor.key.ctx);
240+
ctx.log_notice("ScanKeyCursorIterator next() called");
241+
match self.current_state() {
242+
IteratorState::NeedToCallScanKey => self.next_scan_call(),
243+
IteratorState::HasBufferedItems => self.next_buffered_item(),
244+
IteratorState::Done => None,
245+
}
246+
}
247+
}

src/context/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub mod call_reply;
3232
pub mod commands;
3333
pub mod defrag;
3434
pub mod info;
35+
pub mod key_scan_cursor;
3536
pub mod keys_cursor;
3637
pub mod server_events;
3738
pub mod thread_safe;

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ pub use crate::context::call_reply::{CallReply, CallResult, ErrorReply, PromiseC
3232
pub use crate::context::commands;
3333
pub use crate::context::defrag;
3434
pub use crate::context::keys_cursor::KeysCursor;
35+
pub use crate::context::key_scan_cursor::ScanKeyCursor;
3536
pub use crate::context::server_events;
3637
pub use crate::context::AclCategory;
3738
pub use crate::context::AclPermissions;

0 commit comments

Comments
 (0)