Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ CREATE SERVER my_etcd_server foreign data wrapper etcd_fdw options (connstr '127
```

```sql
CREATE foreign table test (key text, value text) server my_etcd_server options(rowid 'key');
CREATE foreign table test (key text, value text) server my_etcd_server options(rowid_column 'key');
```

```sql
Expand Down Expand Up @@ -93,6 +93,57 @@ Usage

Timeout in seconds to each request after the connection has been established.


## CREATE FOREIGN TABLE options

`etcd_fdw` accepts the following table-level options via the
`CREATE FOREIGN TABLE` command.

- **rowid_column** as *string*, mandatory, no default

Specifies which column should be treated as the unique row identifier.
Usually set to key.

- **prefix** as *string*, optional, no default

Restrict the scan to keys beginning with this prefix.
If not provided, the FDW will fetch all keys from the etcd server

- **keys_only** as *string*, optional, default `false`

If set to true, only the keys are fetched, not the values.
Useful to reduce network overhead when values are not needed.

- **revision** as *string*, optional, default `0`

Read key-value data at a specific etcd revision.
If 0, the latest revision is used.

- **key** as *string*, optional, no default

The starting key to fetch from etcd.

This option defines the beginning of the range.
If neither `prefix` nor `key` is specified, the FDW will default to `\0` (the lowest possible key).

- **range_end** as *string*, optional, no default

The exclusive end of the key range. Restricts the scan to the half-open interval `[key, range_end)`.

All keys between key (inclusive) and range_end (exclusive) will be returned.
If range_end is omitted, only the single key defined by key will be returned (unless prefix is used).

- **consistency** as *string*, optional, default `l`

Specifies the read consistency level for etcd queries.


Linearizable(`l`), Ensures the result reflects the latest consensus state of the cluster.
Linearizable reads have higher latency but guarantee fresh data.

Serializable(`s`), Allows serving results from a local etcd member without cluster-wide consensus.
Serializable reads are faster and lighter on the cluster, but may return stale data in some cases

## What doesn't work
etcd_fdw supports almost all kinds of CRUD operations. What doesn't work is modifying the key (which is the rowid value) directly using `UPDATE` statements.
What does work is the following workflow:
Expand Down
118 changes: 98 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ pgrx::pg_module_magic!();
pub(crate) struct EtcdFdw {
client: Client,
rt: Runtime,
prefix: String,
fetch_results: Vec<KeyValue>,
fetch_key: bool,
fetch_value: bool,
Expand Down Expand Up @@ -75,11 +74,20 @@ pub enum EtcdFdwError {
#[error("Key {0} already exists in etcd. No duplicates allowed")]
KeyAlreadyExists(String),

#[error("Options 'prefix' and 'range_end' cannot be used together")]
ConflictingPrefixAndRange,

#[error("Options 'prefix' and 'key' should not be used together")]
ConflictingPrefixAndKey,

#[error("Key {0} doesn't exist in etcd")]
KeyDoesntExist(String),

#[error("Invalid option '{0}' with value '{1}'")]
InvalidOption(String, String),

#[error("{0}")]
OptionsError(#[from] OptionsError),
}

impl From<EtcdFdwError> for ErrorReport {
Expand All @@ -90,13 +98,13 @@ impl From<EtcdFdwError> for ErrorReport {

/// Check whether dependent options exits
/// i.e username & pass, cert & key
fn require_pair<T>(
a: &Option<T>,
b: &Option<T>,
fn require_pair(
a: bool,
b: bool,
err: EtcdFdwError,
) -> Result<(), EtcdFdwError> {
match (a, b) {
(Some(_), None) | (None, Some(_)) => Err(err),
(true, false) | (false, true) => Err(err),
_ => Ok(()),
}
}
Expand Down Expand Up @@ -194,8 +202,8 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {

// ssl_cert + ssl_key must be both present or both absent
// username + password must be both present or both absent
require_pair(&cert_path, &key_path, EtcdFdwError::CertKeyMismatch(()))?;
require_pair(&username, &password, EtcdFdwError::UserPassMismatch(()))?;
require_pair(cert_path.is_some(), key_path.is_some(), EtcdFdwError::CertKeyMismatch(()))?;
require_pair(username.is_some(), password.is_some(), EtcdFdwError::UserPassMismatch(()))?;

config = EtcdConfig {
endpoints: vec![connstr],
Expand All @@ -213,16 +221,12 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
Ok(x) => x,
Err(e) => return Err(EtcdFdwError::ClientConnectionError(e.to_string())),
};
let prefix = match server.options.get("prefix") {
Some(x) => x.clone(),
None => String::from(""),
};

let fetch_results = vec![];

Ok(Self {
client,
rt,
prefix,
fetch_results,
fetch_key: false,
fetch_value: false,
Expand All @@ -235,23 +239,65 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
columns: &[Column],
_sorts: &[Sort],
limit: &Option<Limit>,
_options: &std::collections::HashMap<String, String>,
options: &std::collections::HashMap<String, String>,
) -> Result<(), EtcdFdwError> {
// Select get all rows as a result into a field of the struct
// Build Query options from parameters
let mut get_options = GetOptions::new().with_all_keys();
match limit {
Some(x) => get_options = get_options.with_limit(x.count),
None => (),
// parse the options defined when `CREATE FOREIGN TABLE`
let prefix = options.get("prefix").cloned();
let range_end = options.get("range_end").cloned();
let key_start = options.get("key").cloned();
let keys_only = options.get("keys_only").map(|v| v == "true").unwrap_or(false);
let revision = options.get("revision").and_then(|v| v.parse::<i64>().ok()).unwrap_or(0);
let serializable = options.get("consistency").map(|v| v == "s").unwrap_or(false);
let mut get_options = GetOptions::new();

// prefix and range are mutually exclusive
match (prefix.as_ref(), range_end.as_ref()) {
(Some(_), Some(_)) => {
return Err(EtcdFdwError::ConflictingPrefixAndRange);
}
(Some(_), None) => {
get_options = get_options.with_prefix();
}
(None, Some(r)) => {
get_options = get_options.with_range(r.clone());
}
(None, None) => {
if key_start.is_none() {
get_options = get_options.with_all_keys();
}
}
}

if let Some(x) = limit {
get_options = get_options.with_limit(x.count);
}

if keys_only {
get_options = get_options.with_keys_only();
}

if revision > 0 {
get_options = get_options.with_revision(revision);
}

if serializable {
get_options = get_options.with_serializable();
}

// preference order : prefix > key_start > default "\0"
// samllest possible valid key '\0'
let key = prefix.clone()
.or_else(|| key_start.clone())
.unwrap_or_else(|| String::from("\0"));

// Check if columns contains key and value
let colnames: Vec<String> = columns.iter().map(|x| x.name.clone()).collect();
self.fetch_key = colnames.contains(&String::from("key"));
self.fetch_value = colnames.contains(&String::from("value"));

let result = self
.rt
.block_on(self.client.get(self.prefix.clone(), Some(get_options)));
.block_on(self.client.get(key, Some(get_options)));
let mut result_unwrapped = match result {
Ok(x) => x,
Err(e) => return Err(EtcdFdwError::FetchError(e.to_string())),
Expand Down Expand Up @@ -424,6 +470,38 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
// This currently also does nothing
Ok(())
}

fn validator(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) -> EtcdFdwResult<()> {
if let Some(oid) = catalog {
if oid == FOREIGN_SERVER_RELATION_ID {
check_options_contain(&options, "connstr")?;

let cacert_path_exists = check_options_contain(&options, "ssl_ca").is_ok();
let cert_path_exists = check_options_contain(&options, "ssl_cert").is_ok();
let username_exists = check_options_contain(&options, "username").is_ok();
let password_exists = check_options_contain(&options, "password").is_ok();

require_pair(cacert_path_exists, cert_path_exists, EtcdFdwError::CertKeyMismatch(()))?;
require_pair(username_exists, password_exists, EtcdFdwError::UserPassMismatch(()))?;
} else if oid == FOREIGN_TABLE_RELATION_ID {
check_options_contain(&options, "rowid_column")?;

let prefix_exists = check_options_contain(&options, "prefix").is_ok();
let rannge_exists = check_options_contain(&options, "range_end").is_ok();
let key_exists = check_options_contain(&options, "key").is_ok();

if prefix_exists && rannge_exists {
return Err(EtcdFdwError::ConflictingPrefixAndRange);
}

if prefix_exists && key_exists {
return Err(EtcdFdwError::ConflictingPrefixAndKey);
}
}
}

Ok(())
}
}

#[cfg(test)]
Expand Down
Loading