Skip to content

Commit 85761ec

Browse files
authored
Merge pull request #771 from lann/fix-768
fix: Use `block_in_place` in outbound-pg
2 parents 4e52721 + 217f24c commit 85761ec

File tree

4 files changed

+48
-42
lines changed

4 files changed

+48
-42
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/outbound-pg/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ anyhow = "1.0"
1111
postgres = { version = "0.19.3" }
1212
spin-engine = { path = "../engine" }
1313
spin-manifest = { path = "../manifest" }
14+
tokio = { version = "1", features = [ "rt-multi-thread" ] }
1415
tracing = { version = "0.1", features = [ "log" ] }
1516

1617
[dependencies.wit-bindgen-wasmtime]

crates/outbound-pg/src/lib.rs

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ impl HostComponent for OutboundPg {
3333
}
3434
}
3535

36-
// TODO: use spawn_blocking or an async Postgres client
3736
#[async_trait]
3837
impl outbound_pg::OutboundPg for OutboundPg {
3938
async fn execute(
@@ -42,20 +41,23 @@ impl outbound_pg::OutboundPg for OutboundPg {
4241
statement: &str,
4342
params: Vec<ParameterValue<'_>>,
4443
) -> Result<u64, PgError> {
45-
let mut client = Client::connect(address, NoTls)
46-
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;
47-
48-
let params: Vec<&(dyn ToSql + Sync)> = params
49-
.iter()
50-
.map(to_sql_parameter)
51-
.collect::<anyhow::Result<Vec<_>>>()
52-
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
53-
54-
let nrow = client
55-
.execute(statement, params.as_slice())
56-
.map_err(|e| PgError::ValueConversionFailed(format!("{:?}", e)))?;
57-
58-
Ok(nrow)
44+
// TODO: consider using async tokio-postgres crate
45+
tokio::task::block_in_place(|| {
46+
let mut client = Client::connect(address, NoTls)
47+
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;
48+
49+
let params: Vec<&(dyn ToSql + Sync)> = params
50+
.iter()
51+
.map(to_sql_parameter)
52+
.collect::<anyhow::Result<Vec<_>>>()
53+
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
54+
55+
let nrow = client
56+
.execute(statement, params.as_slice())
57+
.map_err(|e| PgError::ValueConversionFailed(format!("{:?}", e)))?;
58+
59+
Ok(nrow)
60+
})
5961
}
6062

6163
async fn query(
@@ -64,34 +66,36 @@ impl outbound_pg::OutboundPg for OutboundPg {
6466
statement: &str,
6567
params: Vec<ParameterValue<'_>>,
6668
) -> Result<RowSet, PgError> {
67-
let mut client = Client::connect(address, NoTls)
68-
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;
69-
70-
let params: Vec<&(dyn ToSql + Sync)> = params
71-
.iter()
72-
.map(to_sql_parameter)
73-
.collect::<anyhow::Result<Vec<_>>>()
74-
.map_err(|e| PgError::BadParameter(format!("{:?}", e)))?;
75-
76-
let results = client
77-
.query(statement, params.as_slice())
78-
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
79-
80-
if results.is_empty() {
81-
return Ok(RowSet {
82-
columns: vec![],
83-
rows: vec![],
84-
});
85-
}
69+
tokio::task::block_in_place(|| {
70+
let mut client = Client::connect(address, NoTls)
71+
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;
72+
73+
let params: Vec<&(dyn ToSql + Sync)> = params
74+
.iter()
75+
.map(to_sql_parameter)
76+
.collect::<anyhow::Result<Vec<_>>>()
77+
.map_err(|e| PgError::BadParameter(format!("{:?}", e)))?;
78+
79+
let results = client
80+
.query(statement, params.as_slice())
81+
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
82+
83+
if results.is_empty() {
84+
return Ok(RowSet {
85+
columns: vec![],
86+
rows: vec![],
87+
});
88+
}
8689

87-
let columns = infer_columns(&results[0]);
88-
let rows = results
89-
.iter()
90-
.map(convert_row)
91-
.collect::<Result<Vec<_>, _>>()
92-
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
90+
let columns = infer_columns(&results[0]);
91+
let rows = results
92+
.iter()
93+
.map(convert_row)
94+
.collect::<Result<Vec<_>, _>>()
95+
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
9396

94-
Ok(RowSet { columns, rows })
97+
Ok(RowSet { columns, rows })
98+
})
9599
}
96100
}
97101

examples/rust-outbound-pg/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)