diff --git a/twitter/Cargo.toml b/twitter/Cargo.toml index 7beb28e..5aaa92b 100644 --- a/twitter/Cargo.toml +++ b/twitter/Cargo.toml @@ -8,12 +8,15 @@ edition = "2021" [dependencies] serde = { version = "1.0.140", features = ["derive"] } serde_json = "1.0" +serde_urlencoded = "0.7.1" lazy_static = "1.4.0" hex = "0.4.0" reqwest = { version = "0.11.10", features = ["json", "multipart"] } -axum = { version="0.5", features = ["multipart"] } +oauth1 = { version = "1.0.0" } +axum = { version="0.5.13", features = ["multipart"] } tokio = { version = "1.20.0", features = ["full"] } -urlencoding = "2.1.0" -rsa = "0.6.0" +rsa = "0.7.0" rand = "0.8" rand_chacha = "0.3" +base64 = "0.13.0" +hmac-sha256 = "1.1.4" diff --git a/twitter/src/connect.html b/twitter/src/connect.html deleted file mode 100644 index b089644..0000000 --- a/twitter/src/connect.html +++ /dev/null @@ -1,55 +0,0 @@ - - - - - Connect with SendGrid - - - -
-

Connect with Twitter

- Connect -
- - - \ No newline at end of file diff --git a/twitter/src/main.rs b/twitter/src/main.rs index c0e1bcb..18be8ba 100644 --- a/twitter/src/main.rs +++ b/twitter/src/main.rs @@ -6,58 +6,53 @@ use axum::{ Router, }; use lazy_static::lazy_static; +use oauth1::Token; use rand::SeedableRng; -use rand::{distributions::Alphanumeric, Rng}; use rand_chacha::ChaCha8Rng; -use reqwest::{Client, ClientBuilder}; +use reqwest::{header, Client, ClientBuilder}; use rsa::{PaddingScheme, PublicKey, RsaPrivateKey, RsaPublicKey}; use serde::{Deserialize, Serialize}; -use serde_json::Value; -use std::{ - collections::HashMap, - env, - net::SocketAddr, - sync::{Arc, Mutex}, - time::{Duration, Instant}, -}; +use serde_json::{json, Value}; +use std::{borrow::Cow, collections::HashMap, env, net::SocketAddr, time::Duration}; const TIMEOUT: u64 = 120; const RSA_BITS: usize = 2048; -const STATE_MAP_MAX: usize = 100; -const STATE_BLOCK_EXPIRE_SEC: u64 = 10 * 60; - lazy_static! { - static ref REACTOR_API_PREFIX: String = - env::var("REACTOR_API_PREFIX").expect("Env variable REACTOR_API_PREFIX not set"); - static ref REACTOR_AUTH_TOKEN: String = - env::var("REACTOR_AUTH_TOKEN").expect("Env variable REACTOR_AUTH_TOKEN not set"); - static ref TWITTER_OAUTH_CLIENT_ID: String = - env::var("TWITTER_OAUTH_CLIENT_ID").expect("Env variable TWITTER_OAUTH_CLIENT_ID not set"); - static ref TWITTER_OAUTH_CLIENT_SECRET: String = env::var("TWITTER_OAUTH_CLIENT_SECRET") - .expect("Env variable TWITTER_OAUTH_CLIENT_SECRET not set"); - static ref TWITTER_OAUTH_REDIRECT_URL: String = env::var("TWITTER_OAUTH_REDIRECT_URL") - .expect("Env variable TWITTER_OAUTH_REDIRECT_URL not set"); + static ref HAIKU_API_PREFIX: String = + env::var("HAIKU_API_PREFIX").expect("Env variable HAIKU_API_PREFIX not set"); + static ref HAIKU_AUTH_TOKEN: String = + env::var("HAIKU_AUTH_TOKEN").expect("Env variable HAIKU_AUTH_TOKEN not set"); + static ref SERVICE_API_PREFIX: String = + env::var("SERVICE_API_PREFIX").expect("Env variable SERVICE_API_PREFIX not set"); + static ref TWITTER_CONSUMER: Token<'static> = Token::new( + env::var("TWITTER_CONSUMER_KEY").expect("Env variable TWITTER_CONSUMER_KEY not set"), + env::var("TWITTER_CONSUMER_SECRET").expect("Env variable TWITTER_CONSUMER_SECRET not set") + ); + static ref TWITTER_BEARER_TOKEN: String = + env::var("TWITTER_BEARER_TOKEN").expect("Env variable TWITTER_BEARER_TOKEN not set"); + static ref ACCESS_TOKEN: String = + env::var("ACCESS_TOKEN").expect("Env variable ACCESS_TOKEN not set"); + static ref ACCESS_TOKEN_SECRET: String = + env::var("ACCESS_TOKEN_SECRET").expect("Env variable ACCESS_TOKEN_SECRET not set"); + static ref ENV_NAME: String = env::var("ENV_NAME").expect("Env variable ENV_NAME not set"); static ref RSA_RAND_SEED: [u8; 32] = env::var("RSA_RAND_SEED") .expect("Env variable RSA_RAND_SEED not set") .as_bytes() .try_into() .unwrap(); + static ref DEV_ACCESS_TOKEN: Token<'static> = Token::new(&*ACCESS_TOKEN, &*ACCESS_TOKEN_SECRET); static ref CHACHA8RNG: ChaCha8Rng = ChaCha8Rng::from_seed(*RSA_RAND_SEED); static ref PRIVATE_KEY: RsaPrivateKey = RsaPrivateKey::new(&mut CHACHA8RNG.clone(), RSA_BITS).expect("failed to generate a key"); static ref PUBLIC_KEY: RsaPublicKey = RsaPublicKey::from(&*PRIVATE_KEY); - static ref STATE_MAP: Arc>> = - Arc::new(Mutex::new(HashMap::new())); static ref HTTP_CLIENT: Client = ClientBuilder::new() .timeout(Duration::from_secs(TIMEOUT)) .build() .expect("Can't build the reqwest client"); } -static CONNECT_HTML: &str = include_str!("./connect.html"); - fn encrypt(data: &str) -> String { hex::encode( PUBLIC_KEY @@ -82,188 +77,168 @@ fn decrypt(data: &str) -> String { .unwrap() } -struct StateBlock { - time_instant: Instant, - block_content: Option, -} - -fn clear_state_map(state_map: &mut HashMap) { - state_map.retain(|_, v| { - let elapsed_time = v.time_instant.elapsed(); - elapsed_time.as_secs() < STATE_BLOCK_EXPIRE_SEC - }); +#[derive(Deserialize)] +struct Session { + flows_auth_session: String, } -async fn connect() -> impl IntoResponse { - match STATE_MAP.lock() { - Ok(mut state_map) => { - if state_map.len() > STATE_MAP_MAX { - clear_state_map(&mut state_map); - } - let s: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(10) - .map(char::from) - .collect(); - - let html = CONNECT_HTML; - // let html = std::fs::read("/home/darumadocker/github/second-state/reactor-connector-rs/twitter/src/connect.html").unwrap(); - // let html = String::from_utf8_lossy(&html); - - let html = html - .replace("%STATE%", &s) - .replace( - "%TWITTER_OAUTH_CLIENT_ID%", - TWITTER_OAUTH_CLIENT_ID.as_str(), - ) - .replace( - "%TWITTER_OAUTH_REDIRECT_URL%", - TWITTER_OAUTH_REDIRECT_URL.as_str(), - ); - - state_map.insert( - s, - StateBlock { - time_instant: Instant::now(), - block_content: None, - }, - ); - - Ok((StatusCode::OK, [("Content-Type", "text/html")], html)) - } - Err(_) => Err(( +async fn connect(query: Query) -> impl IntoResponse { + let params = [( + "oauth_callback", + format!( + "{}/auth?session_id={}", + *SERVICE_API_PREFIX, query.flows_auth_session + ), + )]; + + let tokens = HTTP_CLIENT + .post("https://api.twitter.com/oauth/request_token") + .query(¶ms) + .header(header::CONTENT_TYPE, "application/x-www-form-urlencoded") + .header( + header::AUTHORIZATION, + oauth1::authorize( + "POST", + "https://api.twitter.com/oauth/request_token", + &*TWITTER_CONSUMER, + None, + Some( + params + .into_iter() + .map(|item| (item.0, Cow::from(item.1))) + .collect(), + ), + ), + ) + .send() + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? + .bytes() + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) + .and_then(|b| { + serde_urlencoded::from_bytes::(&b) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) + })?; + + if tokens.oauth_token.is_empty() { + return Err(( StatusCode::INTERNAL_SERVER_ERROR, - "Unexpected error".to_string(), - )), + "Parse OAuth body failed".to_string(), + )); } -} -#[derive(Deserialize)] -struct PollQuery { - state: String, -} + let location = format!( + "https://api.twitter.com/oauth/authorize?oauth_token={}", + tokens.oauth_token + ); -async fn poll_block(Query(pq): Query) -> impl IntoResponse { - match STATE_MAP.lock() { - Ok(state_map) => match state_map.get(&pq.state) { - Some(block) => match &block.block_content { - Some(location) => Ok((StatusCode::FOUND, Ok([("Location", location.clone())]))), - None => Ok((StatusCode::OK, Err(()))), - }, - None => Err((StatusCode::NOT_FOUND, "State not found")), - }, - Err(_) => Err((StatusCode::INTERNAL_SERVER_ERROR, "Unexpected error")), - } + Ok((StatusCode::FOUND, [("Location", location)])) } -#[derive(Deserialize, Serialize)] +#[derive(Serialize, Deserialize)] struct AuthBody { - code: Option, - state: String, + oauth_token: String, + oauth_token_secret: Option, + oauth_verifier: Option, + session_id: Option, } -async fn auth(Query(auth_body): Query) -> impl IntoResponse { - let code = auth_body.code; - if code.is_none() { - Err((StatusCode::BAD_REQUEST, "No code".to_string())) - } else { - match get_access_token(&code.unwrap()).await { - Ok(at) => match get_authed_user(&at.access_token).await { - Ok(person) => { - let location = format!( - "{}/api/connected?authorId={}&authorName={}&authorState={}&refreshState={}", - REACTOR_API_PREFIX.as_str(), - person.id, - urlencoding::encode(&person.name), - encrypt(&at.access_token), - encrypt(&at.refresh_token) - ); - match STATE_MAP.lock() { - Ok(mut state_map) => { - if let Some(block) = state_map.get_mut(&auth_body.state) { - block.block_content = Some(location); - } - Ok(( - StatusCode::OK, - [("Content-Type", "text/html")], - "", - )) - } - Err(_) => Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "Unexpected error".to_string(), - )), - } - } - Err(err_msg) => Err((StatusCode::INTERNAL_SERVER_ERROR, err_msg)), - }, - Err(err_msg) => Err((StatusCode::INTERNAL_SERVER_ERROR, err_msg)), - } +async fn auth(req: Query) -> impl IntoResponse { + if req.oauth_token.is_empty() || req.oauth_verifier.is_none() { + return Err(( + StatusCode::BAD_REQUEST, + "Missing oauth_token or oauth_verifier".to_string(), + )); } -} -#[derive(Serialize, Deserialize)] -struct TokenBody { - access_token: String, - refresh_token: String, + let at = get_access_token(&req) + .await + .map_err(|e| (StatusCode::UNAUTHORIZED, e))?; + + let person = get_authed_user(&at) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; + + post_authorized_state( + req.session_id + .as_ref() + .ok_or((StatusCode::BAD_REQUEST, "Missing session_id".to_string()))?, + &person.id, + &person.name, + encrypt(&serde_json::to_string(&at).unwrap()), + ) + .await + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e))?; + + Ok(( + StatusCode::OK, + [(header::CONTENT_TYPE, "text/html")], + "", + )) } -async fn get_access_token(code: &str) -> Result { +async fn get_access_token(auth: &AuthBody) -> Result { let params = [ - ("grant_type", "authorization_code"), - ("code", &code), - ("redirect_uri", TWITTER_OAUTH_REDIRECT_URL.as_str()), - ("code_verifier", "challenge"), + ("oauth_token", auth.oauth_token.clone()), + ("oauth_verifier", auth.oauth_verifier.clone().unwrap()), ]; - let response = HTTP_CLIENT - .post("https://api.twitter.com/2/oauth2/token") - .basic_auth( - &*TWITTER_OAUTH_CLIENT_ID, - Some(&*TWITTER_OAUTH_CLIENT_SECRET), - ) - .form(¶ms) + let body = HTTP_CLIENT + .post("https://api.twitter.com/oauth/access_token") + .query(¶ms) .send() - .await; - match response { - Ok(r) => { - let oauth_body = r.json::().await; - match oauth_body { - Ok(at) => Ok(at), - Err(_) => Err("Failed to get access token".to_string()), - } - } - Err(_) => Err("Failed to get access token".to_string()), + .await + .map_err(|e| e.to_string())? + .bytes() + .await + .map_err(|e| e.to_string())?; + + let auth = serde_urlencoded::from_bytes::(&body).map_err(|e| e.to_string())?; + + if auth.oauth_token.is_empty() || auth.oauth_token_secret.is_none() { + return Err("Failed to get access token".to_string()); } + + Ok(auth) +} + +#[derive(Deserialize)] +struct User { + data: UserData, } -struct Person { +#[derive(Deserialize)] +struct UserData { id: String, name: String, } -async fn get_authed_user(access_token: &str) -> Result { - let response = HTTP_CLIENT +async fn get_authed_user(access_token: &AuthBody) -> Result { + HTTP_CLIENT .get("https://api.twitter.com/2/users/me") - .bearer_auth(access_token) + .header( + header::AUTHORIZATION, + oauth1::authorize( + "GET", + "https://api.twitter.com/2/users/me", + &*TWITTER_CONSUMER, + Some(&Token::new( + access_token.oauth_token.clone(), + access_token.oauth_token_secret.clone().unwrap(), + )), + None, + ), + ) .send() - .await; - - match response { - Ok(res) => match res.text().await { - Ok(body) => { - if let Ok(v) = serde_json::from_str::(&body) { - let id = v["data"]["id"].as_str().unwrap().to_string(); - let name = v["data"]["name"].as_str().unwrap().to_string(); - Ok(Person { id, name }) - } else { - Err("Failed to get user's name".to_string()) - } - } - Err(_) => Err("Failed to get user's profile".to_string()), - }, - Err(_) => Err("Failed to get user's profile".to_string()), - } + .await + .map_err(|_| "Failed to get user's profile".to_string())? + .json::() + .await + .map(|u| u.data) + .map_err(|e| e.to_string()) } async fn actions() -> impl IntoResponse { @@ -278,112 +253,448 @@ async fn actions() -> impl IntoResponse { Json(events) } -#[derive(Debug, Deserialize, Serialize)] -struct ForwardRoute { - route: String, +#[derive(Default, Deserialize)] +struct RouteItem { + // field: String, value: String, } +#[derive(Default, Deserialize)] +struct Routes { + #[serde(default)] + action: Vec, +} + #[derive(Deserialize)] -struct PostBody { - // user: String, - text: String, +struct HaikuReqBody { + user: String, state: String, - forwards: Vec, + text: Option, + #[serde(default)] + forwards: Routes, } -async fn post_msg(Json(msg_body): Json) -> impl IntoResponse { - let routes = msg_body +async fn post_msg(req: Json) -> impl IntoResponse { + let action = &req .forwards - .iter() - .map(|route| (route.route.clone(), route.value.clone())) - .collect::>(); - - let action = if let Some(a) = routes.get("action") { - a - } else { - return Err((StatusCode::BAD_REQUEST, "Missing action")); - }; + .action + .first() + .ok_or((StatusCode::BAD_REQUEST, "Missing action item".to_string()))? + .value; + + let text = req + .text + .clone() + .ok_or((StatusCode::BAD_REQUEST, "Missing text".to_string()))?; + + let auth = serde_json::from_str::(&decrypt(&req.state)) + .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; + + if auth.oauth_token.is_empty() || auth.oauth_token_secret.is_none() { + return Err(( + StatusCode::BAD_REQUEST, + "Missing oauth_token or oauth_token_secret".to_string(), + )); + } match action.as_str() { - "create-tweet" => { - let resp = HTTP_CLIENT - .post("https://api.twitter.com/2/tweets") - .bearer_auth(decrypt(&msg_body.state)) - .json(&serde_json::json!({ - "text": msg_body.text - })) - .send() - .await; - - if resp.is_ok() { - Ok((StatusCode::FOUND, "Ok")) - } else { - Err((StatusCode::INTERNAL_SERVER_ERROR, "Create tweet failed")) - } - } - _ => Err((StatusCode::BAD_REQUEST, "Unsupport action")), + "create-tweet" => HTTP_CLIENT + .post("https://api.twitter.com/2/tweets") + .header( + header::AUTHORIZATION, + oauth1::authorize( + "POST", + "https://api.twitter.com/2/tweets", + &*TWITTER_CONSUMER, + Some(&Token::new( + auth.oauth_token, + auth.oauth_token_secret.unwrap(), + )), + None, + ), + ) + .json(&json!({ "text": text })) + .send() + .await + .map(|_| ()) + .map_err(|_| { + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Create tweet failed".to_string(), + ) + }), + _ => Err((StatusCode::BAD_REQUEST, "Unsupport action".to_string())), } } #[derive(Deserialize)] -struct RefreshState { - refresh_state: String, +struct CrcRequest { + crc_token: String, } -async fn refresh_token(Json(msg_body): Json) -> impl IntoResponse { - let params = [ - ("grant_type", "refresh_token"), - ("refresh_token", &decrypt(&msg_body.refresh_state)), - ]; +async fn webhook_challenge(req: Query) -> impl IntoResponse { + let sha256_hash = + hmac_sha256::HMAC::mac(TWITTER_CONSUMER.secret.as_bytes(), req.crc_token.as_bytes()); - let response = HTTP_CLIENT - .post("https://api.twitter.com/2/oauth2/token") - .basic_auth( - &*TWITTER_OAUTH_CLIENT_ID, - Some(&*TWITTER_OAUTH_CLIENT_SECRET), + ( + StatusCode::OK, + Json(json!({ + "response_token": format!("{}", base64::encode(&sha256_hash)), + })), + ) +} + +#[allow(dead_code)] +#[derive(Debug, Deserialize, Clone)] +struct Webhook { + id: String, + url: String, + valid: bool, + created_at: String, +} + +#[derive(Deserialize)] +struct Environment { + webhooks: Vec, +} + +#[derive(Deserialize)] +struct Webhooks { + environments: Vec, +} + +async fn fetch_webhooks() -> Result, Box> { + let hooks = HTTP_CLIENT + .get("https://api.twitter.com/1.1/account_activity/all/webhooks.json") + .header( + header::AUTHORIZATION, + oauth1::authorize( + "GET", + "https://api.twitter.com/1.1/account_activity/all/webhooks.json", + &*TWITTER_CONSUMER, + Some(&*DEV_ACCESS_TOKEN), + None, + ), + ) + .send() + .await? + .json::() + .await?; + + Ok(hooks + .environments + .iter() + .map(|e| e.webhooks.clone()) + .flatten() + .collect::>()) +} + +async fn register_webhook() -> Result> { + let url = format!( + "https://api.twitter.com/1.1/account_activity/all/{}/webhooks.json", + *ENV_NAME + ); + + let params = [("url", format!("{}/webhook", &*SERVICE_API_PREFIX))]; + + HTTP_CLIENT + .post(url.clone()) + .query(¶ms) + .header( + header::AUTHORIZATION, + oauth1::authorize( + "POST", + &url, + &*TWITTER_CONSUMER, + Some(&*DEV_ACCESS_TOKEN), + Some( + params + .into_iter() + .map(|item| (item.0, Cow::from(item.1))) + .collect(), + ), + ), ) - .form(¶ms) .send() - .await; - - if let Ok(r) = response { - if let Ok(at) = r.json::().await { - let encrypted = serde_json::json!({ - "access_state": encrypt(&at.access_token), - "refresh_state": encrypt(&at.refresh_token) - }); - Ok((StatusCode::OK, serde_json::to_string(&encrypted).unwrap())) - } else { - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to get access token".to_string(), - )) + .await? + .json::() + .await + .map_err(Into::into) +} + +async fn reenable_webhook(hook: &Webhook) -> Result<(), Box> { + let url = format!( + "https://api.twitter.com/1.1/account_activity/all/{}/webhooks/{}.json", + *ENV_NAME, hook.id + ); + + HTTP_CLIENT + .put(url.clone()) + .header( + header::AUTHORIZATION, + oauth1::authorize( + "PUT", + &url, + &*TWITTER_CONSUMER, + Some(&*DEV_ACCESS_TOKEN), + None, + ), + ) + .send() + .await?; + + Ok(()) +} + +async fn init_webhook() -> Result<(), String> { + let hooks = fetch_webhooks() + .await + .map_err(|e| format!("fetch_webhooks failed: {}", e.to_string()))?; + + let hook = if let Some(hook) = hooks.first() { + if !hook.valid { + reenable_webhook(hook) + .await + .map_err(|e| format!("reenable_webhook failed: {}", e.to_string()))?; } + hooks.into_iter().next().unwrap() } else { - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to get access token".to_string(), + register_webhook() + .await + .map_err(|e| format!("register_webhook failed: {}", e.to_string()))? + }; + + println!("init_webhook success:\n{:#?}", hook); + + Ok(()) +} + +async fn events() -> impl IntoResponse { + Json(json!({ + "list": [ + { + "field": "Favorite", + "value": "favorite_events" + }, + { + "field": "Follow", + "value": "follow_events" + }, + { + "field": "Unfollow", + "value": "unfollow_events" + }, + { + "field": "Block", + "value": "block_events" + }, + { + "field": "Unblock", + "value": "unblock_events" + }, + { + "field": "Mute", + "value": "mute_events" + }, + { + "field": "Unmute", + "value": "unmute_events" + }, + { + "field": "Direct message", + "value": "direct_message_events" + }, + { + "field": "Tweet delete", + "value": "tweet_delete_events" + }, + ] + })) +} + +async fn subscribe(req: Json) -> impl IntoResponse { + let url = format!( + "https://api.twitter.com/1.1/account_activity/all/{}/subscriptions.json", + *ENV_NAME + ); + + let auth = serde_json::from_str::(&req.state) + .map_err(|e| (StatusCode::BAD_REQUEST, e.to_string()))?; + + HTTP_CLIENT + .post(url.clone()) + .header( + header::AUTHORIZATION, + oauth1::authorize( + "POST", + &url, + &*TWITTER_CONSUMER, + Some(&Token::new( + auth.oauth_token, + auth.oauth_token_secret.unwrap(), + )), + None, + ), + ) + .send() + .await + .map(|_| { + Json(json!({ + "revoke": format!("{}/unsubscribe?user_id={}", *SERVICE_API_PREFIX, req.user), + })) + }) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) +} + +#[derive(Deserialize)] +struct UnsubscribeReq { + user_id: String, +} + +async fn unsubscribe(req: Json) -> impl IntoResponse { + let url = format!( + "https://api.twitter.com/1.1/account_activity/all/{}/subscriptions/{}.json", + *ENV_NAME, req.user_id + ); + + HTTP_CLIENT + .delete(url.clone()) + .header( + header::AUTHORIZATION, + oauth1::authorize( + "DELETE", + &url, + &*TWITTER_CONSUMER, + Some(&*DEV_ACCESS_TOKEN), + None, + ), + ) + .send() + .await + .map(|_| (StatusCode::OK, "OK".to_string())) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, e.to_string())) +} + +#[derive(Deserialize)] +#[allow(dead_code)] +struct Payload { + for_user_id: String, + user_has_blocked: Option, + + #[serde(flatten)] + extra: HashMap, +} + +async fn capture_event(req: Json) -> impl IntoResponse { + tokio::spawn(async move { + if let Err(e) = capture_event_inner(&req).await { + println!("{}", e.to_string()); + } + }); + + StatusCode::OK +} + +async fn capture_event_inner(payload: &Payload) -> Result<(), String> { + let event = payload + .extra + .iter() + .find(|(name, _)| name.as_str().ne("users")) + .ok_or("Invalid event".to_string())? + .0; + + post_event_to_haiku( + &payload.for_user_id, + event, + serde_json::to_string(&payload.extra).unwrap(), + ) + .await +} + +async fn post_authorized_state( + session_id: SE, + id: I, + name: N, + state: ST, +) -> Result<(), String> +where + SE: Into, + I: Into, + N: Into, + ST: Into, +{ + let response = HTTP_CLIENT + .post(format!( + "{}/api/_connectings/_authorized", + &*HAIKU_API_PREFIX )) + .header(header::AUTHORIZATION, &*HAIKU_AUTH_TOKEN) + .json(&json!({ + "sessionId": session_id.into(), + "authorId": id.into(), + "authorName": name.into(), + "authorState": state.into(), + })) + .send() + .await + .map_err(|e| e.to_string())?; + + match response.status().is_success() { + true => Ok(()), + false => Err(response.text().await.unwrap_or_default()), + } +} + +async fn post_event_to_haiku, E: Into, B: Into>( + id: I, + event_type: E, + event_body: B, +) -> Result<(), String> { + let body = json!({ + "user": id.into(), + "text": event_body.into(), + "triggers": { + "event": event_type.into(), + } + }); + + let response = HTTP_CLIENT + .post(format!("{}/api/_funcs/_post", *HAIKU_API_PREFIX)) + .header(header::AUTHORIZATION, &*HAIKU_AUTH_TOKEN) + .json(&body) + .send() + .await + .map_err(|e| e.to_string())?; + + match response.status().is_success() { + true => Ok(()), + false => Err(response.text().await.unwrap_or_default()), } } #[tokio::main] -async fn main() { +async fn main() -> Result<(), Box> { let app = Router::new() - .route("/connect", get(connect)) - .route("/poll-block", get(poll_block)) .route("/auth", get(auth)) - .route("/refresh", post(refresh_token)) .route("/actions", post(actions)) - .route("/post", post(post_msg)); + .route("/post", post(post_msg)) + .route("/connect", get(connect)) + .route("/webhook", get(webhook_challenge).post(capture_event)) + .route("/events", post(events)) + .route("/subscribe", get(subscribe)) + .route("/unsubscribe", get(unsubscribe)); let port = env::var("PORT").unwrap_or_else(|_| "8090".to_string()); let port = port.parse::().unwrap(); let addr = SocketAddr::from(([127, 0, 0, 1], port)); + if let Err(e) = init_webhook().await { + eprintln!("init_webhook failed: {}", e); + } + axum::Server::bind(&addr) .serve(app.into_make_service()) - .await - .unwrap(); + .await?; + + Ok(()) }