Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 62 additions & 1 deletion examples/scan_keys.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
// This example shows the usage of the scan functionality of the Rust Redis Module API Wrapper.
//
// The example implements three commands:
//
// 1. `scan_keys` - scans all keys in the database and returns their names as an array of RedisString.
// 2. `scan_key <key>` - scans all fields by using a closure and a while loop, thus allowing an early stop. Don't use the early stop but collects all the field/value pairs as an array of RedisString.
// 3. `scan_key_for_each <key>` - scans all fields and values in a hash key using a closure that stores the field/value pairs as an array of RedisString.

use redis_module::{
key::RedisKey, redis_module, Context, KeysCursor, RedisResult, RedisString, RedisValue,
key::{KeyFlags, RedisKey},
redis_module, Context, KeysCursor, RedisError, RedisResult, RedisString, RedisValue,
ScanKeyCursor,
};

/// Scans all keys in the database and returns their names as an array of RedisString.
fn scan_keys(ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
let cursor = KeysCursor::new();
let mut res = Vec::new();
Expand All @@ -16,6 +27,54 @@ fn scan_keys(ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
Ok(RedisValue::Array(res))
}

fn scan_key(ctx: &Context, args: Vec<RedisString>) -> RedisResult {
// only argument is the key name
if args.len() != 2 {
return Err(RedisError::WrongArity);
}

let key_name = &args[1];
let key = ctx.open_key_with_flags(
key_name,
KeyFlags::NOEFFECTS | KeyFlags::NOEXPIRE | KeyFlags::ACCESS_EXPIRED,
);
let cursor = ScanKeyCursor::new(key);

let mut res = Vec::new();
while cursor.scan(|_key, field, value| {
res.push(RedisValue::BulkRedisString(field.clone()));
res.push(RedisValue::BulkRedisString(value.clone()));
}) {
// here we could do something between scans if needed, like an early stop
}

Ok(RedisValue::Array(res))
}

/// Scans all fields and values in a hash key and returns them as an array of RedisString.
/// The command takes one argument: the name of the hash key to scan.
fn scan_key_for_each(ctx: &Context, args: Vec<RedisString>) -> RedisResult {
// only argument is the key name
if args.len() != 2 {
return Err(RedisError::WrongArity);
}

let key_name = &args[1];
let key = ctx.open_key_with_flags(
key_name,
KeyFlags::NOEFFECTS | KeyFlags::NOEXPIRE | KeyFlags::ACCESS_EXPIRED,
);
let cursor = ScanKeyCursor::new(key);

let mut res = Vec::new();
cursor.for_each(|_key, field, value| {
res.push(RedisValue::BulkRedisString(field.clone()));
res.push(RedisValue::BulkRedisString(value.clone()));
});

Ok(RedisValue::Array(res))
}

//////////////////////////////////////////////////////

redis_module! {
Expand All @@ -25,5 +84,7 @@ redis_module! {
data_types: [],
commands: [
["scan_keys", scan_keys, "readonly", 0, 0, 0, ""],
["scan_key", scan_key, "readonly", 0, 0, 0, ""],
["scan_key_for_each", scan_key_for_each, "readonly", 0, 0, 0, ""],
],
}
78 changes: 78 additions & 0 deletions src/context/call_reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ impl<'root> StringCallReply<'root> {
};
unsafe { slice::from_raw_parts(reply_string, len) }
}

/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

impl<'root> Drop for StringCallReply<'root> {
Expand Down Expand Up @@ -77,6 +82,11 @@ impl<'root> ErrorCallReply<'root> {
};
unsafe { slice::from_raw_parts(reply_string, len) }
}

/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

impl<'root> Drop for ErrorCallReply<'root> {
Expand Down Expand Up @@ -150,6 +160,11 @@ impl<'root> I64CallReply<'root> {
pub fn to_i64(&self) -> i64 {
call_reply_integer(self.reply.as_ptr())
}

/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

impl<'root> Drop for I64CallReply<'root> {
Expand Down Expand Up @@ -204,6 +219,11 @@ impl<'root> ArrayCallReply<'root> {
pub fn len(&self) -> usize {
call_reply_length(self.reply.as_ptr())
}

/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

pub struct ArrayCallReplyIterator<'root, 'curr> {
Expand Down Expand Up @@ -254,6 +274,13 @@ pub struct NullCallReply<'root> {
_dummy: PhantomData<&'root ()>,
}

impl<'root> NullCallReply<'root> {
/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

impl<'root> Drop for NullCallReply<'root> {
fn drop(&mut self) {
free_call_reply(self.reply.as_ptr());
Expand Down Expand Up @@ -303,6 +330,11 @@ impl<'root> MapCallReply<'root> {
pub fn len(&self) -> usize {
call_reply_length(self.reply.as_ptr())
}

/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

pub struct MapCallReplyIterator<'root, 'curr> {
Expand Down Expand Up @@ -384,6 +416,11 @@ impl<'root> SetCallReply<'root> {
pub fn len(&self) -> usize {
call_reply_length(self.reply.as_ptr())
}

/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

pub struct SetCallReplyIterator<'root, 'curr> {
Expand Down Expand Up @@ -445,6 +482,11 @@ impl<'root> BoolCallReply<'root> {
pub fn to_bool(&self) -> bool {
call_reply_bool(self.reply.as_ptr())
}

/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

impl<'root> Drop for BoolCallReply<'root> {
Expand Down Expand Up @@ -478,6 +520,11 @@ impl<'root> DoubleCallReply<'root> {
pub fn to_double(&self) -> f64 {
call_reply_double(self.reply.as_ptr())
}

/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

impl<'root> Drop for DoubleCallReply<'root> {
Expand Down Expand Up @@ -512,6 +559,11 @@ impl<'root> BigNumberCallReply<'root> {
pub fn to_string(&self) -> Option<String> {
call_reply_big_number(self.reply.as_ptr())
}

/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

impl<'root> Drop for BigNumberCallReply<'root> {
Expand Down Expand Up @@ -540,6 +592,13 @@ pub struct VerbatimStringCallReply<'root> {
_dummy: PhantomData<&'root ()>,
}

impl<'root> VerbatimStringCallReply<'root> {
/// Return the raw pointer to the underlying [RedisModuleCallReply].
pub fn get_raw(&self) -> *mut RedisModuleCallReply {
self.reply.as_ptr()
}
}

/// RESP3 state that the verbatim string format must be of length 3.
const VERBATIM_FORMAT_LENGTH: usize = 3;
/// The string format of a verbatim string ([VerbatimStringCallReply]).
Expand Down Expand Up @@ -639,6 +698,25 @@ pub enum CallReply<'root> {
VerbatimString(VerbatimStringCallReply<'root>),
}

impl<'root> CallReply<'root> {
/// Return the raw pointer to the underlying [RedisModuleCallReply], or `None` if this is the `Unknown` variant.
pub fn get_raw(&self) -> Option<*mut RedisModuleCallReply> {
match self {
CallReply::Unknown => None,
CallReply::I64(inner) => Some(inner.get_raw()),
CallReply::String(inner) => Some(inner.get_raw()),
CallReply::Array(inner) => Some(inner.get_raw()),
CallReply::Null(inner) => Some(inner.get_raw()),
CallReply::Map(inner) => Some(inner.get_raw()),
CallReply::Set(inner) => Some(inner.get_raw()),
CallReply::Bool(inner) => Some(inner.get_raw()),
CallReply::Double(inner) => Some(inner.get_raw()),
CallReply::BigNumber(inner) => Some(inner.get_raw()),
CallReply::VerbatimString(inner) => Some(inner.get_raw()),
}
}
}

/// Send implementation to [CallReply].
/// We need to implements this trait because [CallReply] hold
/// raw pointers to C data which does not auto implement the [Send] trait.
Expand Down
131 changes: 131 additions & 0 deletions src/context/key_cursor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
use std::{
ffi::c_void,
ptr::{self},
};

use crate::{key::RedisKey, raw, RedisString};

/// A cursor to scan field/value pairs of a (hash) key.
///
/// It provides access via a closure given to [`ScanKeyCursor::for_each`] or if you need more control, you can use [`ScanKeyCursor::scan`]
/// and implement your own loop, e.g. to allow an early stop.
///
/// ## Example usage
///
/// Here we show how to extract values to communicate them back to the Redis client. We assume that the following hash key is setup before:
///
/// ```text
/// HSET user:123 name Alice age 29 location Austin
/// ```
///
/// The following example command implementation scans all fields and values in the hash key and returns them as an array of RedisString.
///
/// ```ignore
/// fn example_scan_key_for_each(ctx: &Context) -> RedisResult {
/// let key = ctx.open_key_with_flags("user:123", KeyFlags::NOEFFECTS | KeyFlags::NOEXPIRE | KeyFlags::ACCESS_EXPIRED );
/// let cursor = ScanKeyCursor::new(key);
///
/// let res = RefCell::new(Vec::new());
/// cursor.for_each(|_key, field, value| {
/// let mut res = res.borrow_mut();
/// res.push(RedisValue::BulkRedisString(field.clone()));
/// res.push(RedisValue::BulkRedisString(value.clone()));
/// });
///
/// Ok(RedisValue::Array(res.take()))
/// }
/// ```
///
/// The method will produce the following output:
///
/// ```text
/// 1) "name"
/// 2) "Alice"
/// 3) "age"
/// 4) "29"
/// 5) "location"
/// 6) "Austin"
/// ```
pub struct ScanKeyCursor {
key: RedisKey,
inner_cursor: *mut raw::RedisModuleScanCursor,
}

impl ScanKeyCursor {
/// Creates a new scan cursor for the given key.
pub fn new(key: RedisKey) -> Self {
let inner_cursor = unsafe { raw::RedisModule_ScanCursorCreate.unwrap()() };
Self { key, inner_cursor }
}

/// Restarts the cursor from the beginning.
pub fn restart(&self) {
unsafe { raw::RedisModule_ScanCursorRestart.unwrap()(self.inner_cursor) };
}

/// Implements a call to `RedisModule_ScanKey` and calls the given closure for each callback invocation by ScanKey.
/// Returns `true` if there are more fields to scan, `false` otherwise.
///
/// The callback may be called multiple times per `RedisModule_ScanKey` invocation.
///
/// ## Example
///
/// ```ignore
/// while cursor.scan(|_key, field, value| {
/// // do something with field and value
/// }) {
/// // do something between scans if needed, like an early stop
/// }
pub fn scan<F: FnMut(&RedisKey, &RedisString, &RedisString)>(&self, f: F) -> bool {
unsafe extern "C" fn scan_callback<F: FnMut(&RedisKey, &RedisString, &RedisString)>(
key: *mut raw::RedisModuleKey,
field: *mut raw::RedisModuleString,
value: *mut raw::RedisModuleString,
data: *mut c_void,
) {
let ctx = ptr::null_mut();
let key = RedisKey::from_raw_parts(ctx, key);

let field = RedisString::from_redis_module_string(ctx, field);
let value = RedisString::from_redis_module_string(ctx, value);

let callback = unsafe { &mut *(data.cast::<F>()) };
callback(&key, &field, &value);

// we're not the owner of field and value strings
field.take();
value.take();

key.take(); // we're not the owner of the key either
}

// Safety: The c-side initialized the function ptr and it is is never changed,
// i.e. after module initialization the function pointers stay valid till the end of the program.
let scan_key = unsafe { raw::RedisModule_ScanKey.unwrap() };

let res = unsafe {
scan_key(
self.key.key_inner,
self.inner_cursor,
Some(scan_callback::<F>),
&f as *const F as *mut c_void,
Copy link

Copilot AI Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a mutable pointer from an immutable reference to f. The callback function should receive f as mutable since the closure F implements FnMut. Change to &mut f as *mut F as *mut c_void.

Suggested change
&f as *const F as *mut c_void,
&mut f as *mut F as *mut c_void,

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f is not mutable, we're not allowed to change the function pointer, but we need to cast to *mut void to fit the function signature of the C callback.

FnMut allows changing the captured variables. Such that it can take a let mut array = Vec::new(); and add entries.

)
};

res != 0
}

/// Implements a callback based for_each loop over all fields and values in the hash key.
/// If you need more control, e.g. stopping after a scan invocation, then use [`ScanKeyCursor::scan`] directly.
pub fn for_each<F: FnMut(&RedisKey, &RedisString, &RedisString)>(&self, mut f: F) {
while self.scan(&mut f) {
// do nothing, the callback does the work
}
}
}

impl Drop for ScanKeyCursor {
fn drop(&mut self) {
unsafe { raw::RedisModule_ScanCursorDestroy.unwrap()(self.inner_cursor) };
}
}
1 change: 1 addition & 0 deletions src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod call_reply;
pub mod commands;
pub mod defrag;
pub mod info;
pub mod key_cursor;
pub mod keys_cursor;
pub mod server_events;
pub mod thread_safe;
Expand Down
Loading