Skip to content

Commit 5ab9c4f

Browse files
authored
use tokio-postgres in outbound-pg (#779)
* use `tokio-postgres` in `outbound-pg` Since `postgres` just wraps the async `tokio-postgres` library in a synchronous wrapper, and `outbound-pg` exports async methods, it makes more sense to use `tokio-postgres` directly. Addresses #776. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * fix PgError mapping Signed-off-by: Joel Dice <joel.dice@fermyon.com> Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 71a28f9 commit 5ab9c4f

File tree

3 files changed

+69
-69
lines changed

3 files changed

+69
-69
lines changed

Cargo.lock

Lines changed: 1 addition & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/outbound-pg/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ doctest = false
88

99
[dependencies]
1010
anyhow = "1.0"
11-
postgres = { version = "0.19.3" }
11+
tokio-postgres = { version = "0.7.7" }
1212
spin-engine = { path = "../engine" }
1313
spin-manifest = { path = "../manifest" }
1414
tokio = { version = "1", features = [ "rt-multi-thread" ] }
@@ -18,4 +18,3 @@ tracing = { version = "0.1", features = [ "log" ] }
1818
git = "https://github.com/bytecodealliance/wit-bindgen"
1919
rev = "cb871cfa1ee460b51eb1d144b175b9aab9c50aba"
2020
features = ["async"]
21-

crates/outbound-pg/src/lib.rs

Lines changed: 67 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
use anyhow::anyhow;
22
use outbound_pg::*;
3-
use postgres::{types::ToSql, types::Type, Client, NoTls, Row};
3+
use tokio_postgres::{
4+
tls::NoTlsStream,
5+
types::{ToSql, Type},
6+
Connection, NoTls, Row, Socket,
7+
};
48

59
pub use outbound_pg::add_to_linker;
610
use spin_engine::{
@@ -41,23 +45,24 @@ impl outbound_pg::OutboundPg for OutboundPg {
4145
statement: &str,
4246
params: Vec<ParameterValue<'_>>,
4347
) -> Result<u64, PgError> {
44-
// TODO: consider using async tokio-postgres crate
45-
tokio::task::block_in_place(|| {
46-
let mut client = Client::connect(address, NoTls)
47-
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;
48-
49-
let params: Vec<&(dyn ToSql + Sync)> = params
50-
.iter()
51-
.map(to_sql_parameter)
52-
.collect::<anyhow::Result<Vec<_>>>()
53-
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
54-
55-
let nrow = client
56-
.execute(statement, params.as_slice())
57-
.map_err(|e| PgError::ValueConversionFailed(format!("{:?}", e)))?;
58-
59-
Ok(nrow)
60-
})
48+
let (client, connection) = tokio_postgres::connect(address, NoTls)
49+
.await
50+
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;
51+
52+
spawn(connection);
53+
54+
let params: Vec<&(dyn ToSql + Sync)> = params
55+
.iter()
56+
.map(to_sql_parameter)
57+
.collect::<anyhow::Result<Vec<_>>>()
58+
.map_err(|e| PgError::ValueConversionFailed(format!("{:?}", e)))?;
59+
60+
let nrow = client
61+
.execute(statement, params.as_slice())
62+
.await
63+
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
64+
65+
Ok(nrow)
6166
}
6267

6368
async fn query(
@@ -66,36 +71,38 @@ impl outbound_pg::OutboundPg for OutboundPg {
6671
statement: &str,
6772
params: Vec<ParameterValue<'_>>,
6873
) -> Result<RowSet, PgError> {
69-
tokio::task::block_in_place(|| {
70-
let mut client = Client::connect(address, NoTls)
71-
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;
72-
73-
let params: Vec<&(dyn ToSql + Sync)> = params
74-
.iter()
75-
.map(to_sql_parameter)
76-
.collect::<anyhow::Result<Vec<_>>>()
77-
.map_err(|e| PgError::BadParameter(format!("{:?}", e)))?;
78-
79-
let results = client
80-
.query(statement, params.as_slice())
81-
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
82-
83-
if results.is_empty() {
84-
return Ok(RowSet {
85-
columns: vec![],
86-
rows: vec![],
87-
});
88-
}
74+
let (client, connection) = tokio_postgres::connect(address, NoTls)
75+
.await
76+
.map_err(|e| PgError::ConnectionFailed(format!("{:?}", e)))?;
77+
78+
spawn(connection);
79+
80+
let params: Vec<&(dyn ToSql + Sync)> = params
81+
.iter()
82+
.map(to_sql_parameter)
83+
.collect::<anyhow::Result<Vec<_>>>()
84+
.map_err(|e| PgError::BadParameter(format!("{:?}", e)))?;
85+
86+
let results = client
87+
.query(statement, params.as_slice())
88+
.await
89+
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
90+
91+
if results.is_empty() {
92+
return Ok(RowSet {
93+
columns: vec![],
94+
rows: vec![],
95+
});
96+
}
8997

90-
let columns = infer_columns(&results[0]);
91-
let rows = results
92-
.iter()
93-
.map(convert_row)
94-
.collect::<Result<Vec<_>, _>>()
95-
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
98+
let columns = infer_columns(&results[0]);
99+
let rows = results
100+
.iter()
101+
.map(convert_row)
102+
.collect::<Result<Vec<_>, _>>()
103+
.map_err(|e| PgError::QueryFailed(format!("{:?}", e)))?;
96104

97-
Ok(RowSet { columns, rows })
98-
})
105+
Ok(RowSet { columns, rows })
99106
}
100107
}
101108

@@ -110,10 +117,10 @@ fn to_sql_parameter<'a>(value: &'a ParameterValue) -> anyhow::Result<&'a (dyn To
110117
ParameterValue::Int16(v) => Ok(v),
111118
ParameterValue::Floating32(v) => Ok(v),
112119
ParameterValue::Floating64(v) => Ok(v),
113-
ParameterValue::Uint8(_) => Err(anyhow!("Postgres does not support unsigned integers")),
114-
ParameterValue::Uint16(_) => Err(anyhow!("Postgres does not support unsigned integers")),
115-
ParameterValue::Uint32(_) => Err(anyhow!("Postgres does not support unsigned integers")),
116-
ParameterValue::Uint64(_) => Err(anyhow!("Postgres does not support unsigned integers")),
120+
ParameterValue::Uint8(_)
121+
| ParameterValue::Uint16(_)
122+
| ParameterValue::Uint32(_)
123+
| ParameterValue::Uint64(_) => Err(anyhow!("Postgres does not support unsigned integers")),
117124
ParameterValue::Str(v) => Ok(v),
118125
ParameterValue::Binary(v) => Ok(v),
119126
ParameterValue::DbNull => Ok(&DB_NULL),
@@ -153,15 +160,15 @@ fn convert_data_type(pg_type: &Type) -> DbDataType {
153160
}
154161
}
155162

156-
fn convert_row(row: &Row) -> Result<Vec<DbValue>, postgres::Error> {
163+
fn convert_row(row: &Row) -> Result<Vec<DbValue>, tokio_postgres::Error> {
157164
let mut result = Vec::with_capacity(row.len());
158165
for index in 0..row.len() {
159166
result.push(convert_entry(row, index)?);
160167
}
161168
Ok(result)
162169
}
163170

164-
fn convert_entry(row: &Row, index: usize) -> Result<DbValue, postgres::Error> {
171+
fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Error> {
165172
let column = &row.columns()[index];
166173
let value = match column.type_() {
167174
&Type::BOOL => {
@@ -238,3 +245,11 @@ fn convert_entry(row: &Row, index: usize) -> Result<DbValue, postgres::Error> {
238245
};
239246
Ok(value)
240247
}
248+
249+
fn spawn(connection: Connection<Socket, NoTlsStream>) {
250+
tokio::spawn(async move {
251+
if let Err(e) = connection.await {
252+
tracing::warn!("Postgres connection error: {}", e);
253+
}
254+
});
255+
}

0 commit comments

Comments
 (0)