diff --git a/twitter/Cargo.toml b/twitter/Cargo.toml
index 7beb28e..5aaa92b 100644
--- a/twitter/Cargo.toml
+++ b/twitter/Cargo.toml
@@ -8,12 +8,15 @@ edition = "2021"
[dependencies]
serde = { version = "1.0.140", features = ["derive"] }
serde_json = "1.0"
+serde_urlencoded = "0.7.1"
lazy_static = "1.4.0"
hex = "0.4.0"
reqwest = { version = "0.11.10", features = ["json", "multipart"] }
-axum = { version="0.5", features = ["multipart"] }
+oauth1 = { version = "1.0.0" }
+axum = { version="0.5.13", features = ["multipart"] }
tokio = { version = "1.20.0", features = ["full"] }
-urlencoding = "2.1.0"
-rsa = "0.6.0"
+rsa = "0.7.0"
rand = "0.8"
rand_chacha = "0.3"
+base64 = "0.13.0"
+hmac-sha256 = "1.1.4"
diff --git a/twitter/src/connect.html b/twitter/src/connect.html
deleted file mode 100644
index b089644..0000000
--- a/twitter/src/connect.html
+++ /dev/null
@@ -1,55 +0,0 @@
-
-
-
-
- Connect with SendGrid
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/twitter/src/main.rs b/twitter/src/main.rs
index c0e1bcb..18be8ba 100644
--- a/twitter/src/main.rs
+++ b/twitter/src/main.rs
@@ -6,58 +6,53 @@ use axum::{
Router,
};
use lazy_static::lazy_static;
+use oauth1::Token;
use rand::SeedableRng;
-use rand::{distributions::Alphanumeric, Rng};
use rand_chacha::ChaCha8Rng;
-use reqwest::{Client, ClientBuilder};
+use reqwest::{header, Client, ClientBuilder};
use rsa::{PaddingScheme, PublicKey, RsaPrivateKey, RsaPublicKey};
use serde::{Deserialize, Serialize};
-use serde_json::Value;
-use std::{
- collections::HashMap,
- env,
- net::SocketAddr,
- sync::{Arc, Mutex},
- time::{Duration, Instant},
-};
+use serde_json::{json, Value};
+use std::{borrow::Cow, collections::HashMap, env, net::SocketAddr, time::Duration};
const TIMEOUT: u64 = 120;
const RSA_BITS: usize = 2048;
-const STATE_MAP_MAX: usize = 100;
-const STATE_BLOCK_EXPIRE_SEC: u64 = 10 * 60;
-
lazy_static! {
- static ref REACTOR_API_PREFIX: String =
- env::var("REACTOR_API_PREFIX").expect("Env variable REACTOR_API_PREFIX not set");
- static ref REACTOR_AUTH_TOKEN: String =
- env::var("REACTOR_AUTH_TOKEN").expect("Env variable REACTOR_AUTH_TOKEN not set");
- static ref TWITTER_OAUTH_CLIENT_ID: String =
- env::var("TWITTER_OAUTH_CLIENT_ID").expect("Env variable TWITTER_OAUTH_CLIENT_ID not set");
- static ref TWITTER_OAUTH_CLIENT_SECRET: String = env::var("TWITTER_OAUTH_CLIENT_SECRET")
- .expect("Env variable TWITTER_OAUTH_CLIENT_SECRET not set");
- static ref TWITTER_OAUTH_REDIRECT_URL: String = env::var("TWITTER_OAUTH_REDIRECT_URL")
- .expect("Env variable TWITTER_OAUTH_REDIRECT_URL not set");
+ static ref HAIKU_API_PREFIX: String =
+ env::var("HAIKU_API_PREFIX").expect("Env variable HAIKU_API_PREFIX not set");
+ static ref HAIKU_AUTH_TOKEN: String =
+ env::var("HAIKU_AUTH_TOKEN").expect("Env variable HAIKU_AUTH_TOKEN not set");
+ static ref SERVICE_API_PREFIX: String =
+ env::var("SERVICE_API_PREFIX").expect("Env variable SERVICE_API_PREFIX not set");
+ static ref TWITTER_CONSUMER: Token<'static> = Token::new(
+ env::var("TWITTER_CONSUMER_KEY").expect("Env variable TWITTER_CONSUMER_KEY not set"),
+ env::var("TWITTER_CONSUMER_SECRET").expect("Env variable TWITTER_CONSUMER_SECRET not set")
+ );
+ static ref TWITTER_BEARER_TOKEN: String =
+ env::var("TWITTER_BEARER_TOKEN").expect("Env variable TWITTER_BEARER_TOKEN not set");
+ static ref ACCESS_TOKEN: String =
+ env::var("ACCESS_TOKEN").expect("Env variable ACCESS_TOKEN not set");
+ static ref ACCESS_TOKEN_SECRET: String =
+ env::var("ACCESS_TOKEN_SECRET").expect("Env variable ACCESS_TOKEN_SECRET not set");
+ static ref ENV_NAME: String = env::var("ENV_NAME").expect("Env variable ENV_NAME not set");
static ref RSA_RAND_SEED: [u8; 32] = env::var("RSA_RAND_SEED")
.expect("Env variable RSA_RAND_SEED not set")
.as_bytes()
.try_into()
.unwrap();
+ static ref DEV_ACCESS_TOKEN: Token<'static> = Token::new(&*ACCESS_TOKEN, &*ACCESS_TOKEN_SECRET);
static ref CHACHA8RNG: ChaCha8Rng = ChaCha8Rng::from_seed(*RSA_RAND_SEED);
static ref PRIVATE_KEY: RsaPrivateKey =
RsaPrivateKey::new(&mut CHACHA8RNG.clone(), RSA_BITS).expect("failed to generate a key");
static ref PUBLIC_KEY: RsaPublicKey = RsaPublicKey::from(&*PRIVATE_KEY);
- static ref STATE_MAP: Arc>> =
- Arc::new(Mutex::new(HashMap::new()));
static ref HTTP_CLIENT: Client = ClientBuilder::new()
.timeout(Duration::from_secs(TIMEOUT))
.build()
.expect("Can't build the reqwest client");
}
-static CONNECT_HTML: &str = include_str!("./connect.html");
-
fn encrypt(data: &str) -> String {
hex::encode(
PUBLIC_KEY
@@ -82,188 +77,168 @@ fn decrypt(data: &str) -> String {
.unwrap()
}
-struct StateBlock {
- time_instant: Instant,
- block_content: Option,
-}
-
-fn clear_state_map(state_map: &mut HashMap) {
- state_map.retain(|_, v| {
- let elapsed_time = v.time_instant.elapsed();
- elapsed_time.as_secs() < STATE_BLOCK_EXPIRE_SEC
- });
+#[derive(Deserialize)]
+struct Session {
+ flows_auth_session: String,
}
-async fn connect() -> impl IntoResponse {
- match STATE_MAP.lock() {
- Ok(mut state_map) => {
- if state_map.len() > STATE_MAP_MAX {
- clear_state_map(&mut state_map);
- }
- let s: String = rand::thread_rng()
- .sample_iter(&Alphanumeric)
- .take(10)
- .map(char::from)
- .collect();
-
- let html = CONNECT_HTML;
- // let html = std::fs::read("/home/darumadocker/github/second-state/reactor-connector-rs/twitter/src/connect.html").unwrap();
- // let html = String::from_utf8_lossy(&html);
-
- let html = html
- .replace("%STATE%", &s)
- .replace(
- "%TWITTER_OAUTH_CLIENT_ID%",
- TWITTER_OAUTH_CLIENT_ID.as_str(),
- )
- .replace(
- "%TWITTER_OAUTH_REDIRECT_URL%",
- TWITTER_OAUTH_REDIRECT_URL.as_str(),
- );
-
- state_map.insert(
- s,
- StateBlock {
- time_instant: Instant::now(),
- block_content: None,
- },
- );
-
- Ok((StatusCode::OK, [("Content-Type", "text/html")], html))
- }
- Err(_) => Err((
+async fn connect(query: Query) -> impl IntoResponse {
+ let params = [(
+ "oauth_callback",
+ format!(
+ "{}/auth?session_id={}",
+ *SERVICE_API_PREFIX, query.flows_auth_session
+ ),
+ )];
+
+ let tokens = HTTP_CLIENT
+ .post("https://api.twitter.com/oauth/request_token")
+ .query(¶ms)
+ .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded")
+ .header(
+ header::AUTHORIZATION,
+ oauth1::authorize(
+ "POST",
+ "https://api.twitter.com/oauth/request_token",
+ &*TWITTER_CONSUMER,
+ None,
+ Some(
+ params
+ .into_iter()
+ .map(|item| (item.0, Cow::from(item.1)))
+ .collect(),
+ ),
+ ),
+ )
+ .send()
+ .await
+ .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?
+ .bytes()
+ .await
+ .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
+ .and_then(|b| {
+ serde_urlencoded::from_bytes::(&b)
+ .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
+ })?;
+
+ if tokens.oauth_token.is_empty() {
+ return Err((
StatusCode::INTERNAL_SERVER_ERROR,
- "Unexpected error".to_string(),
- )),
+ "Parse OAuth body failed".to_string(),
+ ));
}
-}
-#[derive(Deserialize)]
-struct PollQuery {
- state: String,
-}
+ let location = format!(
+ "https://api.twitter.com/oauth/authorize?oauth_token={}",
+ tokens.oauth_token
+ );
-async fn poll_block(Query(pq): Query) -> impl IntoResponse {
- match STATE_MAP.lock() {
- Ok(state_map) => match state_map.get(&pq.state) {
- Some(block) => match &block.block_content {
- Some(location) => Ok((StatusCode::FOUND, Ok([("Location", location.clone())]))),
- None => Ok((StatusCode::OK, Err(()))),
- },
- None => Err((StatusCode::NOT_FOUND, "State not found")),
- },
- Err(_) => Err((StatusCode::INTERNAL_SERVER_ERROR, "Unexpected error")),
- }
+ Ok((StatusCode::FOUND, [("Location", location)]))
}
-#[derive(Deserialize, Serialize)]
+#[derive(Serialize, Deserialize)]
struct AuthBody {
- code: Option,
- state: String,
+ oauth_token: String,
+ oauth_token_secret: Option,
+ oauth_verifier: Option,
+ session_id: Option,
}
-async fn auth(Query(auth_body): Query) -> impl IntoResponse {
- let code = auth_body.code;
- if code.is_none() {
- Err((StatusCode::BAD_REQUEST, "No code".to_string()))
- } else {
- match get_access_token(&code.unwrap()).await {
- Ok(at) => match get_authed_user(&at.access_token).await {
- Ok(person) => {
- let location = format!(
- "{}/api/connected?authorId={}&authorName={}&authorState={}&refreshState={}",
- REACTOR_API_PREFIX.as_str(),
- person.id,
- urlencoding::encode(&person.name),
- encrypt(&at.access_token),
- encrypt(&at.refresh_token)
- );
- match STATE_MAP.lock() {
- Ok(mut state_map) => {
- if let Some(block) = state_map.get_mut(&auth_body.state) {
- block.block_content = Some(location);
- }
- Ok((
- StatusCode::OK,
- [("Content-Type", "text/html")],
- "",
- ))
- }
- Err(_) => Err((
- StatusCode::INTERNAL_SERVER_ERROR,
- "Unexpected error".to_string(),
- )),
- }
- }
- Err(err_msg) => Err((StatusCode::INTERNAL_SERVER_ERROR, err_msg)),
- },
- Err(err_msg) => Err((StatusCode::INTERNAL_SERVER_ERROR, err_msg)),
- }
+async fn auth(req: Query) -> impl IntoResponse {
+ if req.oauth_token.is_empty() || req.oauth_verifier.is_none() {
+ return Err((
+ StatusCode::BAD_REQUEST,
+ "Missing oauth_token or oauth_verifier".to_string(),
+ ));
}
-}
-#[derive(Serialize, Deserialize)]
-struct TokenBody {
- access_token: String,
- refresh_token: String,
+ let at = get_access_token(&req)
+ .await
+ .map_err(|e| (StatusCode::UNAUTHORIZED, e))?;
+
+ let person = get_authed_user(&at)
+ .await
+ .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
+
+ post_authorized_state(
+ req.session_id
+ .as_ref()
+ .ok_or((StatusCode::BAD_REQUEST, "Missing session_id".to_string()))?,
+ &person.id,
+ &person.name,
+ encrypt(&serde_json::to_string(&at).unwrap()),
+ )
+ .await
+ .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?;
+
+ Ok((
+ StatusCode::OK,
+ [(header::CONTENT_TYPE, "text/html")],
+ "",
+ ))
}
-async fn get_access_token(code: &str) -> Result {
+async fn get_access_token(auth: &AuthBody) -> Result {
let params = [
- ("grant_type", "authorization_code"),
- ("code", &code),
- ("redirect_uri", TWITTER_OAUTH_REDIRECT_URL.as_str()),
- ("code_verifier", "challenge"),
+ ("oauth_token", auth.oauth_token.clone()),
+ ("oauth_verifier", auth.oauth_verifier.clone().unwrap()),
];
- let response = HTTP_CLIENT
- .post("https://api.twitter.com/2/oauth2/token")
- .basic_auth(
- &*TWITTER_OAUTH_CLIENT_ID,
- Some(&*TWITTER_OAUTH_CLIENT_SECRET),
- )
- .form(¶ms)
+ let body = HTTP_CLIENT
+ .post("https://api.twitter.com/oauth/access_token")
+ .query(¶ms)
.send()
- .await;
- match response {
- Ok(r) => {
- let oauth_body = r.json::().await;
- match oauth_body {
- Ok(at) => Ok(at),
- Err(_) => Err("Failed to get access token".to_string()),
- }
- }
- Err(_) => Err("Failed to get access token".to_string()),
+ .await
+ .map_err(|e| e.to_string())?
+ .bytes()
+ .await
+ .map_err(|e| e.to_string())?;
+
+ let auth = serde_urlencoded::from_bytes::(&body).map_err(|e| e.to_string())?;
+
+ if auth.oauth_token.is_empty() || auth.oauth_token_secret.is_none() {
+ return Err("Failed to get access token".to_string());
}
+
+ Ok(auth)
+}
+
+#[derive(Deserialize)]
+struct User {
+ data: UserData,
}
-struct Person {
+#[derive(Deserialize)]
+struct UserData {
id: String,
name: String,
}
-async fn get_authed_user(access_token: &str) -> Result {
- let response = HTTP_CLIENT
+async fn get_authed_user(access_token: &AuthBody) -> Result {
+ HTTP_CLIENT
.get("https://api.twitter.com/2/users/me")
- .bearer_auth(access_token)
+ .header(
+ header::AUTHORIZATION,
+ oauth1::authorize(
+ "GET",
+ "https://api.twitter.com/2/users/me",
+ &*TWITTER_CONSUMER,
+ Some(&Token::new(
+ access_token.oauth_token.clone(),
+ access_token.oauth_token_secret.clone().unwrap(),
+ )),
+ None,
+ ),
+ )
.send()
- .await;
-
- match response {
- Ok(res) => match res.text().await {
- Ok(body) => {
- if let Ok(v) = serde_json::from_str::(&body) {
- let id = v["data"]["id"].as_str().unwrap().to_string();
- let name = v["data"]["name"].as_str().unwrap().to_string();
- Ok(Person { id, name })
- } else {
- Err("Failed to get user's name".to_string())
- }
- }
- Err(_) => Err("Failed to get user's profile".to_string()),
- },
- Err(_) => Err("Failed to get user's profile".to_string()),
- }
+ .await
+ .map_err(|_| "Failed to get user's profile".to_string())?
+ .json::()
+ .await
+ .map(|u| u.data)
+ .map_err(|e| e.to_string())
}
async fn actions() -> impl IntoResponse {
@@ -278,112 +253,448 @@ async fn actions() -> impl IntoResponse {
Json(events)
}
-#[derive(Debug, Deserialize, Serialize)]
-struct ForwardRoute {
- route: String,
+#[derive(Default, Deserialize)]
+struct RouteItem {
+ // field: String,
value: String,
}
+#[derive(Default, Deserialize)]
+struct Routes {
+ #[serde(default)]
+ action: Vec,
+}
+
#[derive(Deserialize)]
-struct PostBody {
- // user: String,
- text: String,
+struct HaikuReqBody {
+ user: String,
state: String,
- forwards: Vec,
+ text: Option,
+ #[serde(default)]
+ forwards: Routes,
}
-async fn post_msg(Json(msg_body): Json) -> impl IntoResponse {
- let routes = msg_body
+async fn post_msg(req: Json) -> impl IntoResponse {
+ let action = &req
.forwards
- .iter()
- .map(|route| (route.route.clone(), route.value.clone()))
- .collect::>();
-
- let action = if let Some(a) = routes.get("action") {
- a
- } else {
- return Err((StatusCode::BAD_REQUEST, "Missing action"));
- };
+ .action
+ .first()
+ .ok_or((StatusCode::BAD_REQUEST, "Missing action item".to_string()))?
+ .value;
+
+ let text = req
+ .text
+ .clone()
+ .ok_or((StatusCode::BAD_REQUEST, "Missing text".to_string()))?;
+
+ let auth = serde_json::from_str::(&decrypt(&req.state))
+ .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?;
+
+ if auth.oauth_token.is_empty() || auth.oauth_token_secret.is_none() {
+ return Err((
+ StatusCode::BAD_REQUEST,
+ "Missing oauth_token or oauth_token_secret".to_string(),
+ ));
+ }
match action.as_str() {
- "create-tweet" => {
- let resp = HTTP_CLIENT
- .post("https://api.twitter.com/2/tweets")
- .bearer_auth(decrypt(&msg_body.state))
- .json(&serde_json::json!({
- "text": msg_body.text
- }))
- .send()
- .await;
-
- if resp.is_ok() {
- Ok((StatusCode::FOUND, "Ok"))
- } else {
- Err((StatusCode::INTERNAL_SERVER_ERROR, "Create tweet failed"))
- }
- }
- _ => Err((StatusCode::BAD_REQUEST, "Unsupport action")),
+ "create-tweet" => HTTP_CLIENT
+ .post("https://api.twitter.com/2/tweets")
+ .header(
+ header::AUTHORIZATION,
+ oauth1::authorize(
+ "POST",
+ "https://api.twitter.com/2/tweets",
+ &*TWITTER_CONSUMER,
+ Some(&Token::new(
+ auth.oauth_token,
+ auth.oauth_token_secret.unwrap(),
+ )),
+ None,
+ ),
+ )
+ .json(&json!({ "text": text }))
+ .send()
+ .await
+ .map(|_| ())
+ .map_err(|_| {
+ (
+ StatusCode::INTERNAL_SERVER_ERROR,
+ "Create tweet failed".to_string(),
+ )
+ }),
+ _ => Err((StatusCode::BAD_REQUEST, "Unsupport action".to_string())),
}
}
#[derive(Deserialize)]
-struct RefreshState {
- refresh_state: String,
+struct CrcRequest {
+ crc_token: String,
}
-async fn refresh_token(Json(msg_body): Json) -> impl IntoResponse {
- let params = [
- ("grant_type", "refresh_token"),
- ("refresh_token", &decrypt(&msg_body.refresh_state)),
- ];
+async fn webhook_challenge(req: Query) -> impl IntoResponse {
+ let sha256_hash =
+ hmac_sha256::HMAC::mac(TWITTER_CONSUMER.secret.as_bytes(), req.crc_token.as_bytes());
- let response = HTTP_CLIENT
- .post("https://api.twitter.com/2/oauth2/token")
- .basic_auth(
- &*TWITTER_OAUTH_CLIENT_ID,
- Some(&*TWITTER_OAUTH_CLIENT_SECRET),
+ (
+ StatusCode::OK,
+ Json(json!({
+ "response_token": format!("{}", base64::encode(&sha256_hash)),
+ })),
+ )
+}
+
+#[allow(dead_code)]
+#[derive(Debug, Deserialize, Clone)]
+struct Webhook {
+ id: String,
+ url: String,
+ valid: bool,
+ created_at: String,
+}
+
+#[derive(Deserialize)]
+struct Environment {
+ webhooks: Vec,
+}
+
+#[derive(Deserialize)]
+struct Webhooks {
+ environments: Vec,
+}
+
+async fn fetch_webhooks() -> Result, Box> {
+ let hooks = HTTP_CLIENT
+ .get("https://api.twitter.com/1.1/account_activity/all/webhooks.json")
+ .header(
+ header::AUTHORIZATION,
+ oauth1::authorize(
+ "GET",
+ "https://api.twitter.com/1.1/account_activity/all/webhooks.json",
+ &*TWITTER_CONSUMER,
+ Some(&*DEV_ACCESS_TOKEN),
+ None,
+ ),
+ )
+ .send()
+ .await?
+ .json::()
+ .await?;
+
+ Ok(hooks
+ .environments
+ .iter()
+ .map(|e| e.webhooks.clone())
+ .flatten()
+ .collect::>())
+}
+
+async fn register_webhook() -> Result> {
+ let url = format!(
+ "https://api.twitter.com/1.1/account_activity/all/{}/webhooks.json",
+ *ENV_NAME
+ );
+
+ let params = [("url", format!("{}/webhook", &*SERVICE_API_PREFIX))];
+
+ HTTP_CLIENT
+ .post(url.clone())
+ .query(¶ms)
+ .header(
+ header::AUTHORIZATION,
+ oauth1::authorize(
+ "POST",
+ &url,
+ &*TWITTER_CONSUMER,
+ Some(&*DEV_ACCESS_TOKEN),
+ Some(
+ params
+ .into_iter()
+ .map(|item| (item.0, Cow::from(item.1)))
+ .collect(),
+ ),
+ ),
)
- .form(¶ms)
.send()
- .await;
-
- if let Ok(r) = response {
- if let Ok(at) = r.json::().await {
- let encrypted = serde_json::json!({
- "access_state": encrypt(&at.access_token),
- "refresh_state": encrypt(&at.refresh_token)
- });
- Ok((StatusCode::OK, serde_json::to_string(&encrypted).unwrap()))
- } else {
- Err((
- StatusCode::INTERNAL_SERVER_ERROR,
- "Failed to get access token".to_string(),
- ))
+ .await?
+ .json::()
+ .await
+ .map_err(Into::into)
+}
+
+async fn reenable_webhook(hook: &Webhook) -> Result<(), Box> {
+ let url = format!(
+ "https://api.twitter.com/1.1/account_activity/all/{}/webhooks/{}.json",
+ *ENV_NAME, hook.id
+ );
+
+ HTTP_CLIENT
+ .put(url.clone())
+ .header(
+ header::AUTHORIZATION,
+ oauth1::authorize(
+ "PUT",
+ &url,
+ &*TWITTER_CONSUMER,
+ Some(&*DEV_ACCESS_TOKEN),
+ None,
+ ),
+ )
+ .send()
+ .await?;
+
+ Ok(())
+}
+
+async fn init_webhook() -> Result<(), String> {
+ let hooks = fetch_webhooks()
+ .await
+ .map_err(|e| format!("fetch_webhooks failed: {}", e.to_string()))?;
+
+ let hook = if let Some(hook) = hooks.first() {
+ if !hook.valid {
+ reenable_webhook(hook)
+ .await
+ .map_err(|e| format!("reenable_webhook failed: {}", e.to_string()))?;
}
+ hooks.into_iter().next().unwrap()
} else {
- Err((
- StatusCode::INTERNAL_SERVER_ERROR,
- "Failed to get access token".to_string(),
+ register_webhook()
+ .await
+ .map_err(|e| format!("register_webhook failed: {}", e.to_string()))?
+ };
+
+ println!("init_webhook success:\n{:#?}", hook);
+
+ Ok(())
+}
+
+async fn events() -> impl IntoResponse {
+ Json(json!({
+ "list": [
+ {
+ "field": "Favorite",
+ "value": "favorite_events"
+ },
+ {
+ "field": "Follow",
+ "value": "follow_events"
+ },
+ {
+ "field": "Unfollow",
+ "value": "unfollow_events"
+ },
+ {
+ "field": "Block",
+ "value": "block_events"
+ },
+ {
+ "field": "Unblock",
+ "value": "unblock_events"
+ },
+ {
+ "field": "Mute",
+ "value": "mute_events"
+ },
+ {
+ "field": "Unmute",
+ "value": "unmute_events"
+ },
+ {
+ "field": "Direct message",
+ "value": "direct_message_events"
+ },
+ {
+ "field": "Tweet delete",
+ "value": "tweet_delete_events"
+ },
+ ]
+ }))
+}
+
+async fn subscribe(req: Json) -> impl IntoResponse {
+ let url = format!(
+ "https://api.twitter.com/1.1/account_activity/all/{}/subscriptions.json",
+ *ENV_NAME
+ );
+
+ let auth = serde_json::from_str::(&req.state)
+ .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?;
+
+ HTTP_CLIENT
+ .post(url.clone())
+ .header(
+ header::AUTHORIZATION,
+ oauth1::authorize(
+ "POST",
+ &url,
+ &*TWITTER_CONSUMER,
+ Some(&Token::new(
+ auth.oauth_token,
+ auth.oauth_token_secret.unwrap(),
+ )),
+ None,
+ ),
+ )
+ .send()
+ .await
+ .map(|_| {
+ Json(json!({
+ "revoke": format!("{}/unsubscribe?user_id={}", *SERVICE_API_PREFIX, req.user),
+ }))
+ })
+ .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
+}
+
+#[derive(Deserialize)]
+struct UnsubscribeReq {
+ user_id: String,
+}
+
+async fn unsubscribe(req: Json) -> impl IntoResponse {
+ let url = format!(
+ "https://api.twitter.com/1.1/account_activity/all/{}/subscriptions/{}.json",
+ *ENV_NAME, req.user_id
+ );
+
+ HTTP_CLIENT
+ .delete(url.clone())
+ .header(
+ header::AUTHORIZATION,
+ oauth1::authorize(
+ "DELETE",
+ &url,
+ &*TWITTER_CONSUMER,
+ Some(&*DEV_ACCESS_TOKEN),
+ None,
+ ),
+ )
+ .send()
+ .await
+ .map(|_| (StatusCode::OK, "OK".to_string()))
+ .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))
+}
+
+#[derive(Deserialize)]
+#[allow(dead_code)]
+struct Payload {
+ for_user_id: String,
+ user_has_blocked: Option,
+
+ #[serde(flatten)]
+ extra: HashMap,
+}
+
+async fn capture_event(req: Json) -> impl IntoResponse {
+ tokio::spawn(async move {
+ if let Err(e) = capture_event_inner(&req).await {
+ println!("{}", e.to_string());
+ }
+ });
+
+ StatusCode::OK
+}
+
+async fn capture_event_inner(payload: &Payload) -> Result<(), String> {
+ let event = payload
+ .extra
+ .iter()
+ .find(|(name, _)| name.as_str().ne("users"))
+ .ok_or("Invalid event".to_string())?
+ .0;
+
+ post_event_to_haiku(
+ &payload.for_user_id,
+ event,
+ serde_json::to_string(&payload.extra).unwrap(),
+ )
+ .await
+}
+
+async fn post_authorized_state(
+ session_id: SE,
+ id: I,
+ name: N,
+ state: ST,
+) -> Result<(), String>
+where
+ SE: Into,
+ I: Into,
+ N: Into,
+ ST: Into,
+{
+ let response = HTTP_CLIENT
+ .post(format!(
+ "{}/api/_connectings/_authorized",
+ &*HAIKU_API_PREFIX
))
+ .header(header::AUTHORIZATION, &*HAIKU_AUTH_TOKEN)
+ .json(&json!({
+ "sessionId": session_id.into(),
+ "authorId": id.into(),
+ "authorName": name.into(),
+ "authorState": state.into(),
+ }))
+ .send()
+ .await
+ .map_err(|e| e.to_string())?;
+
+ match response.status().is_success() {
+ true => Ok(()),
+ false => Err(response.text().await.unwrap_or_default()),
+ }
+}
+
+async fn post_event_to_haiku, E: Into, B: Into>(
+ id: I,
+ event_type: E,
+ event_body: B,
+) -> Result<(), String> {
+ let body = json!({
+ "user": id.into(),
+ "text": event_body.into(),
+ "triggers": {
+ "event": event_type.into(),
+ }
+ });
+
+ let response = HTTP_CLIENT
+ .post(format!("{}/api/_funcs/_post", *HAIKU_API_PREFIX))
+ .header(header::AUTHORIZATION, &*HAIKU_AUTH_TOKEN)
+ .json(&body)
+ .send()
+ .await
+ .map_err(|e| e.to_string())?;
+
+ match response.status().is_success() {
+ true => Ok(()),
+ false => Err(response.text().await.unwrap_or_default()),
}
}
#[tokio::main]
-async fn main() {
+async fn main() -> Result<(), Box> {
let app = Router::new()
- .route("/connect", get(connect))
- .route("/poll-block", get(poll_block))
.route("/auth", get(auth))
- .route("/refresh", post(refresh_token))
.route("/actions", post(actions))
- .route("/post", post(post_msg));
+ .route("/post", post(post_msg))
+ .route("/connect", get(connect))
+ .route("/webhook", get(webhook_challenge).post(capture_event))
+ .route("/events", post(events))
+ .route("/subscribe", get(subscribe))
+ .route("/unsubscribe", get(unsubscribe));
let port = env::var("PORT").unwrap_or_else(|_| "8090".to_string());
let port = port.parse::().unwrap();
let addr = SocketAddr::from(([127, 0, 0, 1], port));
+ if let Err(e) = init_webhook().await {
+ eprintln!("init_webhook failed: {}", e);
+ }
+
axum::Server::bind(&addr)
.serve(app.into_make_service())
- .await
- .unwrap();
+ .await?;
+
+ Ok(())
}