Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ CREATE SERVER my_etcd_server foreign data wrapper etcd_fdw options (connstr '127
```

```sql
CREATE foreign table test (key text, value text) server my_etcd_server options(rowid 'key');
CREATE foreign table test (key text, value text) server my_etcd_server options(rowid_column 'key');
```

```sql
Expand Down Expand Up @@ -99,7 +99,7 @@ Usage
`etcd_fdw` accepts the following table-level options via the
`CREATE FOREIGN TABLE` command.

- **rowid** as *string*, mandatory, no default
- **rowid_column** as *string*, mandatory, no default

Specifies which column should be treated as the unique row identifier.
Usually set to key.
Expand All @@ -119,13 +119,30 @@ Usage
Read key-value data at a specific etcd revision.
If 0, the latest revision is used.

- **range** as *string*, optional, no default
- **key** as *string*, optional, no default

Restricts the scan to the half-open interval `[key, range)`.
Example: with range `/gamma` and scan starting at `/`, the query will return keys strictly less than `/gamma`.
The starting key to fetch from etcd.

This option defines the beginning of the range.
If neither `prefix` nor `key` is specified, the FDW will default to `\0` (the lowest possible key).

Note: Cannot be used together with `prefix`.
- **range_end** as *string*, optional, no default

The exclusive end of the key range. Restricts the scan to the half-open interval `[key, range_end)`.

All keys between key (inclusive) and range_end (exclusive) will be returned.
If range_end is omitted, only the single key defined by key will be returned (unless prefix is used).

- **consistency** as *string*, optional, default `l`

Specifies the read consistency level for etcd queries.


Linearizable(`l`), Ensures the result reflects the latest consensus state of the cluster.
Linearizable reads have higher latency but guarantee fresh data.

Serializable(`s`), Allows serving results from a local etcd member without cluster-wide consensus.
Serializable reads are faster and lighter on the cluster, but may return stale data in some cases

## What doesn't work
etcd_fdw supports almost all kinds of CRUD operations. What doesn't work is modifying the key (which is the rowid value) directly using `UPDATE` statements.
Expand Down
74 changes: 62 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,20 @@ pub enum EtcdFdwError {
#[error("Key {0} already exists in etcd. No duplicates allowed")]
KeyAlreadyExists(String),

#[error("Options 'prefix' and 'range' cannot be used together")]
#[error("Options 'prefix' and 'range_end' cannot be used together")]
ConflictingPrefixAndRange,

#[error("Options 'prefix' and 'key' should not be used together")]
ConflictingPrefixAndKey,

#[error("Key {0} doesn't exist in etcd")]
KeyDoesntExist(String),

#[error("Invalid option '{0}' with value '{1}'")]
InvalidOption(String, String),

#[error("{0}")]
OptionsError(#[from] OptionsError),
}

impl From<EtcdFdwError> for ErrorReport {
Expand All @@ -92,13 +98,13 @@ impl From<EtcdFdwError> for ErrorReport {

/// Check whether dependent options exits
/// i.e username & pass, cert & key
fn require_pair<T>(
a: &Option<T>,
b: &Option<T>,
fn require_pair(
a: bool,
b: bool,
err: EtcdFdwError,
) -> Result<(), EtcdFdwError> {
match (a, b) {
(Some(_), None) | (None, Some(_)) => Err(err),
(true, false) | (false, true) => Err(err),
_ => Ok(()),
}
}
Expand Down Expand Up @@ -196,8 +202,8 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {

// ssl_cert + ssl_key must be both present or both absent
// username + password must be both present or both absent
require_pair(&cert_path, &key_path, EtcdFdwError::CertKeyMismatch(()))?;
require_pair(&username, &password, EtcdFdwError::UserPassMismatch(()))?;
require_pair(cert_path.is_some(), key_path.is_some(), EtcdFdwError::CertKeyMismatch(()))?;
require_pair(username.is_some(), password.is_some(), EtcdFdwError::UserPassMismatch(()))?;

config = EtcdConfig {
endpoints: vec![connstr],
Expand Down Expand Up @@ -237,13 +243,15 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
) -> Result<(), EtcdFdwError> {
// parse the options defined when `CREATE FOREIGN TABLE`
let prefix = _options.get("prefix").cloned();
let range = _options.get("range").cloned();
let range_end = _options.get("range_end").cloned();
let key_start = _options.get("key").cloned();
let keys_only = _options.get("keys_only").map(|v| v == "true").unwrap_or(false);
let revision = _options.get("revision").and_then(|v| v.parse::<i64>().ok()).unwrap_or(0);
let serializable = _options.get("consistency").map(|v| v == "s").unwrap_or(false);
let mut get_options = GetOptions::new();

// prefix and range are mutually exclusive
match (prefix.as_ref(), range.as_ref()) {
match (prefix.as_ref(), range_end.as_ref()) {
(Some(_), Some(_)) => {
return Err(EtcdFdwError::ConflictingPrefixAndRange);
}
Expand All @@ -254,7 +262,9 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
get_options = get_options.with_range(r.clone());
}
(None, None) => {
get_options = get_options.with_all_keys();
if key_start.is_none() {
get_options = get_options.with_all_keys();
}
}
}

Expand All @@ -270,13 +280,21 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
get_options = get_options.with_revision(revision);
}

if serializable {
get_options = get_options.with_serializable();
}

// preference order : prefix > key_start > default "\0"
// samllest possible valid key '\0'
let key = prefix.clone()
.or_else(|| key_start.clone())
.unwrap_or_else(|| String::from("\0"));

// Check if columns contains key and value
let colnames: Vec<String> = columns.iter().map(|x| x.name.clone()).collect();
self.fetch_key = colnames.contains(&String::from("key"));
self.fetch_value = colnames.contains(&String::from("value"));

// samllest possible valid key '\0', empty string will not work with with_range
let key = prefix.clone().unwrap_or_else(|| String::from("\0"));
let result = self
.rt
.block_on(self.client.get(key, Some(get_options)));
Expand Down Expand Up @@ -452,6 +470,38 @@ impl ForeignDataWrapper<EtcdFdwError> for EtcdFdw {
// This currently also does nothing
Ok(())
}

fn validator(options: Vec<Option<String>>, catalog: Option<pg_sys::Oid>) -> EtcdFdwResult<()> {
if let Some(oid) = catalog {
if oid == FOREIGN_SERVER_RELATION_ID {
check_options_contain(&options, "connstr")?;

let cacert_path_exists = check_options_contain(&options, "ssl_ca").is_ok();
let cert_path_exists = check_options_contain(&options, "ssl_cert").is_ok();
let username_exists = check_options_contain(&options, "username").is_ok();
let password_exists = check_options_contain(&options, "password").is_ok();

require_pair(cacert_path_exists, cert_path_exists, EtcdFdwError::CertKeyMismatch(()))?;
require_pair(username_exists, password_exists, EtcdFdwError::UserPassMismatch(()))?;
} else if oid == FOREIGN_TABLE_RELATION_ID {
check_options_contain(&options, "rowid_column")?;

let prefix_exists = check_options_contain(&options, "prefix").is_ok();
let rannge_exists = check_options_contain(&options, "range_end").is_ok();
let key_exists = check_options_contain(&options, "key").is_ok();

if prefix_exists && rannge_exists {
return Err(EtcdFdwError::ConflictingPrefixAndRange);
}

if prefix_exists && key_exists {
return Err(EtcdFdwError::ConflictingPrefixAndKey);
}
}
}

Ok(())
}
}

#[cfg(test)]
Expand Down
Loading