1717
1818use futures:: { Future , Stream } ;
1919use reqwest:: r#async:: { Client , Decoder } ;
20+ use reqwest:: { StatusCode , Url } ;
2021
2122use std:: mem;
2223
@@ -25,15 +26,45 @@ use crate::query::read_query::InfluxDbReadQuery;
2526use crate :: query:: write_query:: InfluxDbWriteQuery ;
2627use crate :: query:: InfluxDbQuery ;
2728
28- use url:: form_urlencoded;
29-
3029use std:: any:: Any ;
3130
31+ #[ derive( Clone , Debug ) ]
32+ /// Internal Authentication representation
33+ pub ( crate ) struct InfluxDbAuthentication {
34+ pub username : String ,
35+ pub password : String ,
36+ }
37+
38+ #[ derive( Clone , Debug ) ]
3239/// Internal Representation of a Client
3340pub struct InfluxDbClient {
3441 url : String ,
3542 database : String ,
36- // auth: Option<InfluxDbAuthentication>
43+ auth : Option < InfluxDbAuthentication > ,
44+ }
45+
46+ impl Into < Vec < ( String , String ) > > for InfluxDbClient {
47+ fn into ( self ) -> Vec < ( String , String ) > {
48+ let mut vec: Vec < ( String , String ) > = Vec :: new ( ) ;
49+ vec. push ( ( "db" . to_string ( ) , self . database ) ) ;
50+ if let Some ( auth) = self . auth {
51+ vec. push ( ( "u" . to_string ( ) , auth. username ) ) ;
52+ vec. push ( ( "p" . to_string ( ) , auth. password ) ) ;
53+ }
54+ vec
55+ }
56+ }
57+
58+ impl < ' a > Into < Vec < ( String , String ) > > for & ' a InfluxDbClient {
59+ fn into ( self ) -> Vec < ( String , String ) > {
60+ let mut vec: Vec < ( String , String ) > = Vec :: new ( ) ;
61+ vec. push ( ( "db" . to_string ( ) , self . database . to_owned ( ) ) ) ;
62+ if let Some ( auth) = & self . auth {
63+ vec. push ( ( "u" . to_string ( ) , auth. username . to_owned ( ) ) ) ;
64+ vec. push ( ( "p" . to_string ( ) , auth. password . to_owned ( ) ) ) ;
65+ }
66+ vec
67+ }
3768}
3869
3970impl InfluxDbClient {
@@ -59,9 +90,36 @@ impl InfluxDbClient {
5990 InfluxDbClient {
6091 url : url. to_string ( ) ,
6192 database : database. to_string ( ) ,
93+ auth : None ,
6294 }
6395 }
6496
97+ /// Add authentication/authorization information to [`InfluxDbClient`](crate::client::InfluxDbClient)
98+ ///
99+ /// # Arguments
100+ ///
101+ /// * username: The Username for InfluxDB.
102+ /// * password: THe Password for the user.
103+ ///
104+ /// # Examples
105+ ///
106+ /// ```rust
107+ /// use influxdb::client::InfluxDbClient;
108+ ///
109+ /// let _client = InfluxDbClient::new("http://localhost:9086", "test").with_auth("admin", "password");
110+ /// ```
111+ pub fn with_auth < ' a , S1 , S2 > ( mut self , username : S1 , password : S2 ) -> Self
112+ where
113+ S1 : ToString ,
114+ S2 : ToString ,
115+ {
116+ self . auth = Some ( InfluxDbAuthentication {
117+ username : username. to_string ( ) ,
118+ password : password. to_string ( ) ,
119+ } ) ;
120+ self
121+ }
122+
65123 /// Returns the name of the database the client is using
66124 pub fn database_name ( & self ) -> & str {
67125 & self . database
@@ -100,7 +158,7 @@ impl InfluxDbClient {
100158 } )
101159 }
102160
103- /// Sends a [`InfluxDbReadQuery`](crate::query::read_query::InfluxDbReadQuery) or [`InfluxDbWriteQuery`](crate::query::write_query::InfluxDbWriteQuery) to the InfluxDB Server.InfluxDbError
161+ /// Sends a [`InfluxDbReadQuery`](crate::query::read_query::InfluxDbReadQuery) or [`InfluxDbWriteQuery`](crate::query::write_query::InfluxDbWriteQuery) to the InfluxDB Server.
104162 ///
105163 /// A version capable of parsing the returned string is available under the [serde_integration](crate::integrations::serde_integration)
106164 ///
@@ -120,6 +178,12 @@ impl InfluxDbClient {
120178 /// .add_field("temperature", 82)
121179 /// );
122180 /// ```
181+ /// # Errors
182+ ///
183+ /// If the function can not finish the query,
184+ /// a [`InfluxDbError`] variant will be returned.
185+ ///
186+ /// [`InfluxDbError`]: enum.InfluxDbError.html
123187 pub fn query < Q > ( & self , q : & Q ) -> Box < dyn Future < Item = String , Error = InfluxDbError > >
124188 where
125189 Q : Any + InfluxDbQuery ,
@@ -137,48 +201,71 @@ impl InfluxDbClient {
137201 } ;
138202
139203 let any_value = q as & dyn Any ;
204+ let basic_parameters: Vec < ( String , String ) > = self . into ( ) ;
140205
141206 let client = if let Some ( _) = any_value. downcast_ref :: < InfluxDbReadQuery > ( ) {
142207 let read_query = query. get ( ) ;
143- let encoded: String = form_urlencoded:: Serializer :: new ( String :: new ( ) )
144- . append_pair ( "db" , self . database_name ( ) )
145- . append_pair ( "q" , & read_query)
146- . finish ( ) ;
147- let http_query_string = format ! (
148- "{url}/query?{encoded}" ,
149- url = self . database_url( ) ,
150- encoded = encoded
151- ) ;
208+
209+ let mut url = match Url :: parse_with_params (
210+ format ! ( "{url}/query" , url = self . database_url( ) ) . as_str ( ) ,
211+ basic_parameters,
212+ ) {
213+ Ok ( url) => url,
214+ Err ( err) => {
215+ let error = InfluxDbError :: UrlConstructionError {
216+ error : format ! ( "{}" , err) ,
217+ } ;
218+ return Box :: new ( future:: err :: < String , InfluxDbError > ( error) ) ;
219+ }
220+ } ;
221+ url. query_pairs_mut ( ) . append_pair ( "q" , & read_query. clone ( ) ) ;
222+
152223 if read_query. contains ( "SELECT" ) || read_query. contains ( "SHOW" ) {
153- Client :: new ( ) . get ( http_query_string . as_str ( ) )
224+ Client :: new ( ) . get ( url )
154225 } else {
155- Client :: new ( ) . post ( http_query_string . as_str ( ) )
226+ Client :: new ( ) . post ( url )
156227 }
157228 } else if let Some ( write_query) = any_value. downcast_ref :: < InfluxDbWriteQuery > ( ) {
158- Client :: new ( )
159- . post (
160- format ! (
161- "{url}/write?db={db}{precision_str}" ,
162- url = self . database_url( ) ,
163- db = self . database_name( ) ,
164- precision_str = write_query. get_precision_modifier( )
165- )
166- . as_str ( ) ,
167- )
168- . body ( query. get ( ) )
229+ let mut url = match Url :: parse_with_params (
230+ format ! ( "{url}/write" , url = self . database_url( ) ) . as_str ( ) ,
231+ basic_parameters,
232+ ) {
233+ Ok ( url) => url,
234+ Err ( err) => {
235+ let error = InfluxDbError :: InvalidQueryError {
236+ error : format ! ( "{}" , err) ,
237+ } ;
238+ return Box :: new ( future:: err :: < String , InfluxDbError > ( error) ) ;
239+ }
240+ } ;
241+ url. query_pairs_mut ( )
242+ . append_pair ( "precision" , & write_query. get_precision ( ) ) ;
243+ Client :: new ( ) . post ( url) . body ( query. get ( ) )
169244 } else {
170245 unreachable ! ( )
171246 } ;
172-
173247 Box :: new (
174248 client
175249 . send ( )
250+ . map_err ( |err| InfluxDbError :: ConnectionError { error : err } )
251+ . and_then (
252+ |res| -> future:: FutureResult < reqwest:: r#async:: Response , InfluxDbError > {
253+ match res. status ( ) {
254+ StatusCode :: UNAUTHORIZED => {
255+ futures:: future:: err ( InfluxDbError :: AuthorizationError )
256+ }
257+ StatusCode :: FORBIDDEN => {
258+ futures:: future:: err ( InfluxDbError :: AuthenticationError )
259+ }
260+ _ => futures:: future:: ok ( res) ,
261+ }
262+ } ,
263+ )
176264 . and_then ( |mut res| {
177265 let body = mem:: replace ( res. body_mut ( ) , Decoder :: empty ( ) ) ;
178- body. concat2 ( )
179- } )
180- . map_err ( |err| InfluxDbError :: ProtocolError {
181- error : format ! ( "{}" , err) ,
266+ body. concat2 ( ) . map_err ( |err| InfluxDbError :: ProtocolError {
267+ error : format ! ( "{}" , err) ,
268+ } )
182269 } )
183270 . and_then ( |body| {
184271 if let Ok ( utf8) = std:: str:: from_utf8 ( & body) {
@@ -201,3 +288,70 @@ impl InfluxDbClient {
201288 )
202289 }
203290}
291+
292+ #[ cfg( test) ]
293+ mod tests {
294+ use crate :: client:: InfluxDbClient ;
295+
296+ #[ test]
297+ fn test_fn_database ( ) {
298+ let client = InfluxDbClient :: new ( "http://localhost:8068" , "database" ) ;
299+ assert_eq ! ( "database" , client. database_name( ) ) ;
300+ }
301+
302+ #[ test]
303+ fn test_with_auth ( ) {
304+ let client = InfluxDbClient :: new ( "http://localhost:8068" , "database" ) ;
305+ assert_eq ! ( client. url, "http://localhost:8068" ) ;
306+ assert_eq ! ( client. database, "database" ) ;
307+ assert ! ( client. auth. is_none( ) ) ;
308+ let with_auth = client. with_auth ( "username" , "password" ) ;
309+ assert ! ( with_auth. auth. is_some( ) ) ;
310+ let auth = with_auth. auth . unwrap ( ) ;
311+ assert_eq ! ( & auth. username, "username" ) ;
312+ assert_eq ! ( & auth. password, "password" ) ;
313+ }
314+
315+ #[ test]
316+ fn test_into_impl ( ) {
317+ let client = InfluxDbClient :: new ( "http://localhost:8068" , "database" ) ;
318+ assert ! ( client. auth. is_none( ) ) ;
319+ let basic_parameters: Vec < ( String , String ) > = client. into ( ) ;
320+ assert_eq ! (
321+ vec![ ( "db" . to_string( ) , "database" . to_string( ) ) ] ,
322+ basic_parameters
323+ ) ;
324+
325+ let with_auth = InfluxDbClient :: new ( "http://localhost:8068" , "database" )
326+ . with_auth ( "username" , "password" ) ;
327+ let basic_parameters_with_auth: Vec < ( String , String ) > = with_auth. into ( ) ;
328+ assert_eq ! (
329+ vec![
330+ ( "db" . to_string( ) , "database" . to_string( ) ) ,
331+ ( "u" . to_string( ) , "username" . to_string( ) ) ,
332+ ( "p" . to_string( ) , "password" . to_string( ) )
333+ ] ,
334+ basic_parameters_with_auth
335+ ) ;
336+
337+ let client = InfluxDbClient :: new ( "http://localhost:8068" , "database" ) ;
338+ assert ! ( client. auth. is_none( ) ) ;
339+ let basic_parameters: Vec < ( String , String ) > = ( & client) . into ( ) ;
340+ assert_eq ! (
341+ vec![ ( "db" . to_string( ) , "database" . to_string( ) ) ] ,
342+ basic_parameters
343+ ) ;
344+
345+ let with_auth = InfluxDbClient :: new ( "http://localhost:8068" , "database" )
346+ . with_auth ( "username" , "password" ) ;
347+ let basic_parameters_with_auth: Vec < ( String , String ) > = ( & with_auth) . into ( ) ;
348+ assert_eq ! (
349+ vec![
350+ ( "db" . to_string( ) , "database" . to_string( ) ) ,
351+ ( "u" . to_string( ) , "username" . to_string( ) ) ,
352+ ( "p" . to_string( ) , "password" . to_string( ) )
353+ ] ,
354+ basic_parameters_with_auth
355+ ) ;
356+ }
357+ }
0 commit comments