Skip to content

Commit 732b2bc

Browse files
author
Meir Shpilraien (Spielrein)
authored
Added cron server event. (#349)
* Added cron server event. The cron server event will be called whenever Redis runs its cron jobs (usually a few times per second). Example: ```rust #[cron_event_handler] fn cron_event_handler(ctx: &Context, hz: u64) { // run some code here that should run periodically. } ```
1 parent 191f218 commit 732b2bc

File tree

5 files changed

+75
-1
lines changed

5 files changed

+75
-1
lines changed

examples/server_events.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ use std::sync::atomic::{AtomicI64, Ordering};
33
use redis_module::{
44
redis_module, server_events::FlushSubevent, Context, RedisResult, RedisString, RedisValue,
55
};
6-
use redis_module_macros::{config_changed_event_handler, flush_event_handler};
6+
use redis_module_macros::{config_changed_event_handler, cron_event_handler, flush_event_handler};
77

88
static NUM_FLUSHES: AtomicI64 = AtomicI64::new(0);
9+
static NUM_CRONS: AtomicI64 = AtomicI64::new(0);
910
static NUM_MAX_MEMORY_CONFIGURATION_CHANGES: AtomicI64 = AtomicI64::new(0);
1011

1112
#[flush_event_handler]
@@ -23,10 +24,19 @@ fn config_changed_event_handler(_ctx: &Context, changed_configs: &[&str]) {
2324
.map(|_| NUM_MAX_MEMORY_CONFIGURATION_CHANGES.fetch_add(1, Ordering::SeqCst));
2425
}
2526

27+
#[cron_event_handler]
28+
fn cron_event_handler(_ctx: &Context, _hz: u64) {
29+
NUM_CRONS.fetch_add(1, Ordering::SeqCst);
30+
}
31+
2632
fn num_flushed(_ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
2733
Ok(RedisValue::Integer(NUM_FLUSHES.load(Ordering::SeqCst)))
2834
}
2935

36+
fn num_crons(_ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
37+
Ok(RedisValue::Integer(NUM_CRONS.load(Ordering::SeqCst)))
38+
}
39+
3040
fn num_maxmemory_changes(_ctx: &Context, _args: Vec<RedisString>) -> RedisResult {
3141
Ok(RedisValue::Integer(
3242
NUM_MAX_MEMORY_CONFIGURATION_CHANGES.load(Ordering::SeqCst),
@@ -43,5 +53,6 @@ redis_module! {
4353
commands: [
4454
["num_flushed", num_flushed, "readonly", 0, 0, 0],
4555
["num_max_memory_changes", num_maxmemory_changes, "readonly", 0, 0, 0],
56+
["num_crons", num_crons, "readonly", 0, 0, 0],
4657
],
4758
}

redismodule-rs-macros/src/lib.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,28 @@ pub fn config_changed_event_handler(_attr: TokenStream, item: TokenStream) -> To
195195
gen.into()
196196
}
197197

198+
/// Proc macro which is set on a function that need to be called on Redis cron.
199+
/// The function must accept a [Context] and [u64] that represent the cron hz.
200+
///
201+
/// Example:
202+
///
203+
/// ```rust,no_run,ignore
204+
/// #[cron_event_handler]
205+
/// fn cron_event_handler(ctx: &Context, hz: u64) { ... }
206+
/// ```
207+
#[proc_macro_attribute]
208+
pub fn cron_event_handler(_attr: TokenStream, item: TokenStream) -> TokenStream {
209+
let ast: ItemFn = match syn::parse(item) {
210+
Ok(res) => res,
211+
Err(e) => return e.to_compile_error().into(),
212+
};
213+
let gen = quote! {
214+
#[linkme::distributed_slice(redis_module::server_events::CRON_SERVER_EVENTS_LIST)]
215+
#ast
216+
};
217+
gen.into()
218+
}
219+
198220
/// The macro auto generate a [From] implementation that can convert the struct into [RedisValue].
199221
///
200222
/// Example:

src/context/call_reply.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,13 @@ pub enum ErrorReply<'root> {
109109
RedisError(ErrorCallReply<'root>),
110110
}
111111

112+
/// Send implementation to [ErrorCallReply].
113+
/// We need to implements this trait because [ErrorCallReply] hold
114+
/// raw pointers to C data which does not auto implement the [Send] trait.
115+
/// By implementing [Send] on [ErrorCallReply] we basically tells the compiler
116+
/// that it is safe to send the underline C data between threads.
117+
unsafe impl<'root> Send for ErrorCallReply<'root> {}
118+
112119
impl<'root> ErrorReply<'root> {
113120
/// Convert [ErrorCallReply] to [String] or [None] if its not a valid utf8.
114121
pub fn to_utf8_string(&self) -> Option<String> {
@@ -632,6 +639,13 @@ pub enum CallReply<'root> {
632639
VerbatimString(VerbatimStringCallReply<'root>),
633640
}
634641

642+
/// Send implementation to [CallReply].
643+
/// We need to implements this trait because [CallReply] hold
644+
/// raw pointers to C data which does not auto implement the [Send] trait.
645+
/// By implementing [Send] on [CallReply] we basically tells the compiler
646+
/// that it is safe to send the underline C data between threads.
647+
unsafe impl<'root> Send for CallReply<'root> {}
648+
635649
impl<'root> Display for CallReply<'root> {
636650
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
637651
match self {

src/context/server_events.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,23 @@ pub static MODULE_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, ModuleChangeSubevent
5454
#[distributed_slice()]
5555
pub static CONFIG_CHANGED_SERVER_EVENTS_LIST: [fn(&Context, &[&str])] = [..];
5656

57+
#[distributed_slice()]
58+
pub static CRON_SERVER_EVENTS_LIST: [fn(&Context, u64)] = [..];
59+
60+
extern "C" fn cron_callback(
61+
ctx: *mut raw::RedisModuleCtx,
62+
_eid: raw::RedisModuleEvent,
63+
_subevent: u64,
64+
data: *mut ::std::os::raw::c_void,
65+
) {
66+
let data: &raw::RedisModuleConfigChangeV1 =
67+
unsafe { &*(data as *mut raw::RedisModuleConfigChangeV1) };
68+
let ctx = Context::new(ctx);
69+
CRON_SERVER_EVENTS_LIST.iter().for_each(|callback| {
70+
callback(&ctx, data.version);
71+
});
72+
}
73+
5774
extern "C" fn role_changed_callback(
5875
ctx: *mut raw::RedisModuleCtx,
5976
_eid: raw::RedisModuleEvent,
@@ -212,5 +229,11 @@ pub fn register_server_events(ctx: &Context) -> Result<(), RedisError> {
212229
raw::REDISMODULE_EVENT_CONFIG,
213230
Some(config_change_event_callback),
214231
)?;
232+
register_single_server_event_type(
233+
ctx,
234+
&CRON_SERVER_EVENTS_LIST,
235+
raw::REDISMODULE_EVENT_CRON_LOOP,
236+
Some(cron_callback),
237+
)?;
215238
Ok(())
216239
}

tests/integration.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,10 @@ fn test_server_event() -> Result<()> {
411411

412412
assert_eq!(res, 2);
413413

414+
let res: i64 = redis::cmd("num_crons").query(&mut con)?;
415+
416+
assert!(res > 0);
417+
414418
Ok(())
415419
}
416420

0 commit comments

Comments
 (0)