Skip to content

Commit b414b40

Browse files
committed
Add common handling for userdata drop/borrow semantics everywhere
1 parent f78a324 commit b414b40

File tree

10 files changed

+133
-115
lines changed

10 files changed

+133
-115
lines changed

capi/examples/client.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ int main(int argc, char *argv[]) {
184184
hyper_clientconn_options_exec(opts, exec);
185185

186186
hyper_task *handshake = hyper_clientconn_handshake(io, opts);
187-
hyper_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE);
187+
hyper_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE, NULL);
188188

189189
// Let's wait for the handshake to finish...
190190
hyper_executor_push(exec, handshake);
@@ -230,7 +230,7 @@ int main(int argc, char *argv[]) {
230230

231231
// Send it!
232232
hyper_task *send = hyper_clientconn_send(client, req);
233-
hyper_task_set_userdata(send, (void *)EXAMPLE_SEND);
233+
hyper_task_set_userdata(send, (void *)EXAMPLE_SEND, NULL);
234234
printf("sending ...\n");
235235
hyper_executor_push(exec, send);
236236

@@ -261,8 +261,8 @@ int main(int argc, char *argv[]) {
261261
printf("\n");
262262

263263
hyper_body *resp_body = hyper_response_body(resp);
264-
hyper_task *foreach = hyper_body_foreach(resp_body, print_each_chunk, NULL);
265-
hyper_task_set_userdata(foreach, (void *)EXAMPLE_RESP_BODY);
264+
hyper_task *foreach = hyper_body_foreach(resp_body, print_each_chunk, NULL, NULL);
265+
hyper_task_set_userdata(foreach, (void *)EXAMPLE_RESP_BODY, NULL);
266266
hyper_executor_push(exec, foreach);
267267

268268
// No longer need the response

capi/examples/upload.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ int main(int argc, char *argv[]) {
224224
hyper_clientconn_options_exec(opts, exec);
225225

226226
hyper_task *handshake = hyper_clientconn_handshake(io, opts);
227-
hyper_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE);
227+
hyper_task_set_userdata(handshake, (void *)EXAMPLE_HANDSHAKE, NULL);
228228

229229
// Let's wait for the handshake to finish...
230230
hyper_executor_push(exec, handshake);
@@ -274,17 +274,17 @@ int main(int argc, char *argv[]) {
274274
// the body is sent immediately. This will just print if any
275275
// informational headers are received.
276276
printf(" with expect-continue ...\n");
277-
hyper_request_on_informational(req, print_informational, NULL);
277+
hyper_request_on_informational(req, print_informational, NULL, NULL);
278278

279279
// Prepare the req body
280280
hyper_body *body = hyper_body_new();
281-
hyper_body_set_userdata(body, &upload);
281+
hyper_body_set_userdata(body, &upload, NULL);
282282
hyper_body_set_data_func(body, poll_req_upload);
283283
hyper_request_set_body(req, body);
284284

285285
// Send it!
286286
hyper_task *send = hyper_clientconn_send(client, req);
287-
hyper_task_set_userdata(send, (void *)EXAMPLE_SEND);
287+
hyper_task_set_userdata(send, (void *)EXAMPLE_SEND, NULL);
288288
printf("sending ...\n");
289289
hyper_executor_push(exec, send);
290290

@@ -315,7 +315,7 @@ int main(int argc, char *argv[]) {
315315

316316
// Set us up to peel data from the body a chunk at a time
317317
hyper_task *body_data = hyper_body_data(resp_body);
318-
hyper_task_set_userdata(body_data, (void *)EXAMPLE_RESP_BODY);
318+
hyper_task_set_userdata(body_data, (void *)EXAMPLE_RESP_BODY, NULL);
319319
hyper_executor_push(exec, body_data);
320320

321321
// No longer need the response
@@ -335,7 +335,7 @@ int main(int argc, char *argv[]) {
335335
hyper_task_free(task);
336336

337337
hyper_task *body_data = hyper_body_data(resp_body);
338-
hyper_task_set_userdata(body_data, (void *)EXAMPLE_RESP_BODY);
338+
hyper_task_set_userdata(body_data, (void *)EXAMPLE_RESP_BODY, NULL);
339339
hyper_executor_push(exec, body_data);
340340

341341
break;

capi/include/hyper.h

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,20 @@ typedef struct hyper_waker hyper_waker;
233233

234234
typedef int (*hyper_body_foreach_callback)(void*, const struct hyper_buf*);
235235

236+
/*
237+
Many hyper entites can be given userdata to allow user callbacks to corellate work together.
238+
Since much of hyper is asychronous it's often useful to treat these userdata objects as "owned"
239+
by the hyper entity (and hence to be cleaned up when that entity is dropped).
240+
241+
To acheive this a `hyepr_userdata_drop` callback is passed by calling code alongside the
242+
userdata to register a cleanup function.
243+
244+
This function may be provided as NULL if the calling code wants to manage memory lifetimes
245+
itself, in which case the hyper object will logically consider the userdata "borrowed" until
246+
the hyper entity is dropped.
247+
*/
248+
typedef void (*hyper_userdata_drop)(void*);
249+
236250
typedef int (*hyper_body_data_callback)(void*, struct hyper_context*, struct hyper_buf**);
237251

238252
/*
@@ -252,20 +266,10 @@ typedef int (*hyper_body_data_callback)(void*, struct hyper_context*, struct hyp
252266
*/
253267
typedef void (*hyper_service_callback)(void*, struct hyper_request*, struct hyper_response_channel*);
254268

255-
/*
256-
Since a hyper_service owns the userdata passed to it the calling code can register a cleanup
257-
function to be called when the service is dropped. This function may be omitted/a no-op if
258-
the calling code wants to manage memory lifetimes itself (e.g. if the userdata is a static
259-
global)
260-
*/
261-
typedef void (*hyper_service_userdata_drop_callback)(void*);
262-
263269
typedef void (*hyper_request_on_informational_callback)(void*, struct hyper_response*);
264270

265271
typedef int (*hyper_headers_foreach_callback)(void*, const uint8_t*, size_t, const uint8_t*, size_t);
266272

267-
typedef void (*hyper_io_userdata_drop_callback)(void*);
268-
269273
typedef size_t (*hyper_io_read_callback)(void*, struct hyper_context*, uint8_t*, size_t);
270274

271275
typedef size_t (*hyper_io_write_callback)(void*, struct hyper_context*, const uint8_t*, size_t);
@@ -319,12 +323,13 @@ struct hyper_task *hyper_body_data(struct hyper_body *body);
319323
*/
320324
struct hyper_task *hyper_body_foreach(struct hyper_body *body,
321325
hyper_body_foreach_callback func,
322-
void *userdata);
326+
void *userdata,
327+
hyper_userdata_drop drop);
323328

324329
/*
325330
Set userdata on this body, which will be passed to callback functions.
326331
*/
327-
void hyper_body_set_userdata(struct hyper_body *body, void *userdata);
332+
void hyper_body_set_userdata(struct hyper_body *body, void *userdata, hyper_userdata_drop drop);
328333

329334
/*
330335
Set the data callback for this body.
@@ -659,12 +664,13 @@ struct hyper_service *hyper_service_new(hyper_service_callback service_fn);
659664
660665
The service takes ownership of the userdata and will call the `drop_userdata` callback when
661666
the service task is complete. If the `drop_userdata` callback is `NULL` then the service
662-
will instead forget the userdata when the associated task is completed and the calling code
663-
is responsible for cleaning up the userdata through some other mechanism.
667+
will instead borrow the userdata and forget it when the associated task is completed and
668+
thus the calling code is responsible for cleaning up the userdata through some other
669+
mechanism.
664670
*/
665671
void hyper_service_set_userdata(struct hyper_service *service,
666672
void *userdata,
667-
hyper_service_userdata_drop_callback drop_func);
673+
hyper_userdata_drop drop);
668674

669675
/*
670676
Frees a hyper_service object if no longer needed
@@ -896,7 +902,8 @@ struct hyper_body *hyper_request_body(struct hyper_request *req);
896902
*/
897903
enum hyper_code hyper_request_on_informational(struct hyper_request *req,
898904
hyper_request_on_informational_callback callback,
899-
void *data);
905+
void *data,
906+
hyper_userdata_drop drop);
900907

901908
/*
902909
Construct a new HTTP 200 Ok response
@@ -1047,9 +1054,7 @@ void hyper_io_free(struct hyper_io *io);
10471054
`hyper_io` is destroyed (either explicitely by `hyper_io_free` or
10481055
implicitely by an associated hyper task completing).
10491056
*/
1050-
void hyper_io_set_userdata(struct hyper_io *io,
1051-
void *data,
1052-
hyper_io_userdata_drop_callback drop_func);
1057+
void hyper_io_set_userdata(struct hyper_io *io, void *data, hyper_userdata_drop drop_func);
10531058

10541059
/*
10551060
Get the user data pointer for this IO value.
@@ -1161,7 +1166,7 @@ enum hyper_task_return_type hyper_task_type(struct hyper_task *task);
11611166
This value will be passed to task callbacks, and can be checked later
11621167
with `hyper_task_userdata`.
11631168
*/
1164-
void hyper_task_set_userdata(struct hyper_task *task, void *userdata);
1169+
void hyper_task_set_userdata(struct hyper_task *task, void *userdata, hyper_userdata_drop drop);
11651170

11661171
/*
11671172
Retrieve the userdata that has been set via `hyper_task_set_userdata`.

src/ffi/body.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use http_body_util::BodyExt as _;
77
use libc::{c_int, size_t};
88

99
use super::task::{hyper_context, hyper_task, hyper_task_return_type, AsTaskType};
10-
use super::{UserDataPointer, HYPER_ITER_CONTINUE};
10+
use super::HYPER_ITER_CONTINUE;
11+
use super::userdata::{Userdata, hyper_userdata_drop};
1112
use crate::body::{Bytes, Frame, Incoming as IncomingBody};
1213

1314
/// A streaming HTTP body.
@@ -18,7 +19,7 @@ pub struct hyper_buf(pub(super) Bytes);
1819

1920
pub(crate) struct UserBody {
2021
data_func: hyper_body_data_callback,
21-
userdata: *mut c_void,
22+
userdata: Userdata,
2223
}
2324

2425
// ===== Body =====
@@ -88,15 +89,15 @@ ffi_fn! {
8889
/// chunks as they are received, or `HYPER_ITER_BREAK` to cancel.
8990
///
9091
/// This will consume the `hyper_body *`, you shouldn't use it anymore or free it.
91-
fn hyper_body_foreach(body: *mut hyper_body, func: hyper_body_foreach_callback, userdata: *mut c_void) -> *mut hyper_task {
92+
fn hyper_body_foreach(body: *mut hyper_body, func: hyper_body_foreach_callback, userdata: *mut c_void, drop: hyper_userdata_drop) -> *mut hyper_task {
9293
let mut body = non_null!(Box::from_raw(body) ?= ptr::null_mut());
93-
let userdata = UserDataPointer(userdata);
94+
let userdata = Userdata::new(userdata, drop);
9495

9596
Box::into_raw(hyper_task::boxed(async move {
9697
while let Some(item) = body.0.frame().await {
9798
let frame = item?;
9899
if let Ok(chunk) = frame.into_data() {
99-
if HYPER_ITER_CONTINUE != func(userdata.0, &hyper_buf(chunk)) {
100+
if HYPER_ITER_CONTINUE != func(userdata.as_ptr(), &hyper_buf(chunk)) {
100101
return Err(crate::Error::new_user_aborted_by_callback());
101102
}
102103
}
@@ -108,9 +109,9 @@ ffi_fn! {
108109

109110
ffi_fn! {
110111
/// Set userdata on this body, which will be passed to callback functions.
111-
fn hyper_body_set_userdata(body: *mut hyper_body, userdata: *mut c_void) {
112+
fn hyper_body_set_userdata(body: *mut hyper_body, userdata: *mut c_void, drop: hyper_userdata_drop) {
112113
let b = non_null!(&mut *body ?= ());
113-
b.0.as_ffi_mut().userdata = userdata;
114+
b.0.as_ffi_mut().userdata = Userdata::new(userdata, drop);
114115
}
115116
}
116117

@@ -146,7 +147,7 @@ impl UserBody {
146147
pub(crate) fn new() -> UserBody {
147148
UserBody {
148149
data_func: data_noop,
149-
userdata: std::ptr::null_mut(),
150+
userdata: Userdata::default(),
150151
}
151152
}
152153

@@ -155,7 +156,7 @@ impl UserBody {
155156
cx: &mut Context<'_>,
156157
) -> Poll<Option<crate::Result<Frame<Bytes>>>> {
157158
let mut out = std::ptr::null_mut();
158-
match (self.data_func)(self.userdata, hyper_context::wrap(cx), &mut out) {
159+
match (self.data_func)(self.userdata.as_ptr(), hyper_context::wrap(cx), &mut out) {
159160
super::task::HYPER_POLL_READY => {
160161
if out.is_null() {
161162
Poll::Ready(None)

src/ffi/http_types.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use std::ffi::c_void;
55
use super::body::hyper_body;
66
use super::error::hyper_code;
77
use super::task::{hyper_task_return_type, AsTaskType};
8-
use super::{UserDataPointer, HYPER_ITER_CONTINUE};
8+
use super::HYPER_ITER_CONTINUE;
9+
use super::userdata::{Userdata, hyper_userdata_drop};
910
use crate::body::Incoming as IncomingBody;
1011
use crate::ext::{HeaderCaseMap, OriginalHeaderOrder, ReasonPhrase};
1112
use crate::header::{HeaderName, HeaderValue};
@@ -28,7 +29,7 @@ pub struct hyper_headers {
2829

2930
pub(crate) struct OnInformational {
3031
func: hyper_request_on_informational_callback,
31-
data: UserDataPointer,
32+
userdata: Userdata,
3233
}
3334

3435
type hyper_request_on_informational_callback = extern "C" fn(*mut c_void, *mut hyper_response);
@@ -337,10 +338,10 @@ ffi_fn! {
337338
/// NOTE: The `hyper_response *` is just borrowed data, and will not
338339
/// be valid after the callback finishes. You must copy any data you wish
339340
/// to persist.
340-
fn hyper_request_on_informational(req: *mut hyper_request, callback: hyper_request_on_informational_callback, data: *mut c_void) -> hyper_code {
341+
fn hyper_request_on_informational(req: *mut hyper_request, callback: hyper_request_on_informational_callback, data: *mut c_void, drop: hyper_userdata_drop) -> hyper_code {
341342
let ext = OnInformational {
342343
func: callback,
343-
data: UserDataPointer(data),
344+
userdata: Userdata::new(data, drop),
344345
};
345346
let req = non_null!(&mut *req ?= hyper_code::HYPERE_INVALID_ARG);
346347
req.0.extensions_mut().insert(ext);
@@ -724,7 +725,7 @@ unsafe fn raw_name_value(
724725
impl OnInformational {
725726
pub(crate) fn call(&mut self, resp: Response<IncomingBody>) {
726727
let mut resp = hyper_response::from(resp);
727-
(self.func)(self.data.0, &mut resp);
728+
(self.func)(self.userdata.as_ptr(), &mut resp);
728729
}
729730
}
730731

src/ffi/io.rs

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use libc::size_t;
66
use tokio::io::{AsyncRead, AsyncWrite};
77

88
use super::task::hyper_context;
9+
use super::userdata::Userdata;
910

1011
/// Sentinel value to return from a read or write callback that the operation
1112
/// is pending.
@@ -18,14 +19,12 @@ type hyper_io_read_callback =
1819
extern "C" fn(*mut c_void, *mut hyper_context<'_>, *mut u8, size_t) -> size_t;
1920
type hyper_io_write_callback =
2021
extern "C" fn(*mut c_void, *mut hyper_context<'_>, *const u8, size_t) -> size_t;
21-
type hyper_io_userdata_drop_callback = extern "C" fn(*mut c_void);
2222

2323
/// An IO object used to represent a socket or similar concept.
2424
pub struct hyper_io {
2525
read: hyper_io_read_callback,
2626
write: hyper_io_write_callback,
27-
userdata: *mut c_void,
28-
userdata_drop: Option<hyper_io_userdata_drop_callback>,
27+
userdata: Userdata,
2928
}
3029

3130
ffi_fn! {
@@ -37,8 +36,7 @@ ffi_fn! {
3736
Box::into_raw(Box::new(hyper_io {
3837
read: read_noop,
3938
write: write_noop,
40-
userdata: std::ptr::null_mut(),
41-
userdata_drop: None,
39+
userdata: Userdata::default(),
4240
}))
4341
} ?= std::ptr::null_mut()
4442
}
@@ -64,15 +62,10 @@ ffi_fn! {
6462
fn hyper_io_set_userdata(
6563
io: *mut hyper_io,
6664
data: *mut c_void,
67-
drop_func: hyper_io_userdata_drop_callback,
65+
drop_func: super::userdata::hyper_userdata_drop,
6866
) {
6967
let io = non_null!(&mut *io? = ());
70-
io.userdata = data;
71-
io.userdata_drop = if (drop_func as *const c_void).is_null() {
72-
None
73-
} else {
74-
Some(drop_func)
75-
}
68+
io.userdata = Userdata::new(data, drop_func);
7669
}
7770
}
7871

@@ -83,7 +76,7 @@ ffi_fn! {
8376
///
8477
/// Returns NULL if no userdata has been set.
8578
fn hyper_io_get_userdata(io: *mut hyper_io) -> *mut c_void {
86-
non_null!(&mut *io ?= std::ptr::null_mut()).userdata
79+
non_null!(&mut *io ?= std::ptr::null_mut()).userdata.as_ptr()
8780
}
8881
}
8982

@@ -148,14 +141,6 @@ extern "C" fn write_noop(
148141
0
149142
}
150143

151-
impl Drop for hyper_io {
152-
fn drop(&mut self) {
153-
if let Some(drop_fn) = self.userdata_drop {
154-
drop_fn(self.userdata)
155-
}
156-
}
157-
}
158-
159144
impl AsyncRead for hyper_io {
160145
fn poll_read(
161146
self: Pin<&mut Self>,
@@ -165,7 +150,7 @@ impl AsyncRead for hyper_io {
165150
let buf_ptr = unsafe { buf.unfilled_mut() }.as_mut_ptr() as *mut u8;
166151
let buf_len = buf.remaining();
167152

168-
match (self.read)(self.userdata, hyper_context::wrap(cx), buf_ptr, buf_len) {
153+
match (self.read)(self.userdata.as_ptr(), hyper_context::wrap(cx), buf_ptr, buf_len) {
169154
HYPER_IO_PENDING => Poll::Pending,
170155
HYPER_IO_ERROR => Poll::Ready(Err(std::io::Error::new(
171156
std::io::ErrorKind::Other,
@@ -191,7 +176,7 @@ impl AsyncWrite for hyper_io {
191176
let buf_ptr = buf.as_ptr();
192177
let buf_len = buf.len();
193178

194-
match (self.write)(self.userdata, hyper_context::wrap(cx), buf_ptr, buf_len) {
179+
match (self.write)(self.userdata.as_ptr(), hyper_context::wrap(cx), buf_ptr, buf_len) {
195180
HYPER_IO_PENDING => Poll::Pending,
196181
HYPER_IO_ERROR => Poll::Ready(Err(std::io::Error::new(
197182
std::io::ErrorKind::Other,

0 commit comments

Comments
 (0)