1616import com .clickhouse .client .api .transport .Endpoint ;
1717import com .clickhouse .data .ClickHouseFormat ;
1818import net .jpountz .lz4 .LZ4Factory ;
19+ import org .apache .commons .compress .compressors .CompressorStreamFactory ;
1920import org .apache .hc .client5 .http .ConnectTimeoutException ;
2021import org .apache .hc .client5 .http .classic .methods .HttpPost ;
2122import org .apache .hc .client5 .http .config .ConnectionConfig ;
@@ -105,6 +106,8 @@ public class HttpAPIClientHelper {
105106
106107 private static final int ERROR_BODY_BUFFER_SIZE = 1024 ; // Error messages are usually small
107108
109+ private final String DEFAULT_HTTP_COMPRESSION_ALGO = "lz4" ;
110+
108111 private static final Pattern PATTERN_HEADER_VALUE_ASCII = Pattern .compile (
109112 "\\ p{Graph}+(?:[ ]\\ p{Graph}+)*" );
110113
@@ -322,6 +325,8 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map<String,
322325 clientBuilder .setKeepAliveStrategy ((response , context ) -> TimeValue .ofMilliseconds (keepAliveTimeout ));
323326 }
324327
328+ clientBuilder .disableContentCompression (); // will handle ourselves
329+
325330 return clientBuilder .build ();
326331 }
327332
@@ -427,14 +432,12 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
427432// req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding
428433 addHeaders (req , requestConfig );
429434
430- boolean clientCompression = ClientConfigProperties .COMPRESS_CLIENT_REQUEST .getOrDefault (requestConfig );
431- boolean useHttpCompression = ClientConfigProperties .USE_HTTP_COMPRESSION .getOrDefault (requestConfig );
432- boolean appCompressedData = ClientConfigProperties .APP_COMPRESSED_DATA .getOrDefault (requestConfig );
433-
434435
435436 // setting entity. wrapping if compression is enabled
436- req .setEntity (wrapRequestEntity (new EntityTemplate (-1 , CONTENT_TYPE , null , writeCallback ),
437- clientCompression , useHttpCompression , appCompressedData , lz4Factory , requestConfig ));
437+ String contentEncoding = req .containsHeader (HttpHeaders .CONTENT_ENCODING ) ? req .getHeader (HttpHeaders .CONTENT_ENCODING ).getValue () : null ;
438+ req .setEntity (wrapRequestEntity (new EntityTemplate (-1 , CONTENT_TYPE , contentEncoding , writeCallback ),
439+ lz4Factory ,
440+ requestConfig ));
438441
439442 HttpClientContext context = HttpClientContext .create ();
440443 Number responseTimeout = ClientConfigProperties .SOCKET_OPERATION_TIMEOUT .getOrDefault (requestConfig );
@@ -448,8 +451,11 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
448451 ClassicHttpResponse httpResponse = null ;
449452 try {
450453 httpResponse = httpClient .executeOpen (null , req , context );
451- boolean serverCompression = ClientConfigProperties .COMPRESS_SERVER_RESPONSE .getOrDefault (requestConfig );
452- httpResponse .setEntity (wrapResponseEntity (httpResponse .getEntity (), httpResponse .getCode (), serverCompression , useHttpCompression , lz4Factory , requestConfig ));
454+
455+ httpResponse .setEntity (wrapResponseEntity (httpResponse .getEntity (),
456+ httpResponse .getCode (),
457+ lz4Factory ,
458+ requestConfig ));
453459
454460 if (httpResponse .getCode () == HttpStatus .SC_PROXY_AUTHENTICATION_REQUIRED ) {
455461 throw new ClientMisconfigurationException ("Proxy authentication required. Please check your proxy settings." );
@@ -493,30 +499,30 @@ public static void closeQuietly(ClassicHttpResponse httpResponse) {
493499 private static final ContentType CONTENT_TYPE = ContentType .create (ContentType .TEXT_PLAIN .getMimeType (), "UTF-8" );
494500
495501 private void addHeaders (HttpPost req , Map <String , Object > requestConfig ) {
496- addHeader (req , HttpHeaders .CONTENT_TYPE , CONTENT_TYPE .getMimeType ());
502+ setHeader (req , HttpHeaders .CONTENT_TYPE , CONTENT_TYPE .getMimeType ());
497503 if (requestConfig .containsKey (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey ())) {
498- addHeader (
504+ setHeader (
499505 req ,
500506 ClickHouseHttpProto .HEADER_FORMAT ,
501507 ((ClickHouseFormat ) requestConfig .get (ClientConfigProperties .INPUT_OUTPUT_FORMAT .getKey ())).name ());
502508 }
503509 if (requestConfig .containsKey (ClientConfigProperties .QUERY_ID .getKey ())) {
504- addHeader (
510+ setHeader (
505511 req ,
506512 ClickHouseHttpProto .HEADER_QUERY_ID ,
507513 (String ) requestConfig .get (ClientConfigProperties .QUERY_ID .getKey ()));
508514 }
509- addHeader (
515+ setHeader (
510516 req ,
511517 ClickHouseHttpProto .HEADER_DATABASE ,
512518 ClientConfigProperties .DATABASE .getOrDefault (requestConfig ));
513519
514520 if (ClientConfigProperties .SSL_AUTH .<Boolean >getOrDefault (requestConfig ).booleanValue ()) {
515- addHeader (
521+ setHeader (
516522 req ,
517523 ClickHouseHttpProto .HEADER_DB_USER ,
518524 ClientConfigProperties .USER .getOrDefault (requestConfig ));
519- addHeader (
525+ setHeader (
520526 req ,
521527 ClickHouseHttpProto .HEADER_SSL_CERT_AUTH ,
522528 "on" );
@@ -529,11 +535,11 @@ private void addHeaders(HttpPost req, Map<String, Object> requestConfig) {
529535 "Basic " + Base64 .getEncoder ().encodeToString (
530536 (user + ":" + password ).getBytes (StandardCharsets .UTF_8 )));
531537 } else {
532- addHeader (
538+ setHeader (
533539 req ,
534540 ClickHouseHttpProto .HEADER_DB_USER ,
535541 ClientConfigProperties .USER .getOrDefault (requestConfig ));
536- addHeader (
542+ setHeader (
537543 req ,
538544 ClickHouseHttpProto .HEADER_DB_PASSWORD ,
539545 ClientConfigProperties .PASSWORD .getOrDefault (requestConfig ));
@@ -551,18 +557,19 @@ private void addHeaders(HttpPost req, Map<String, Object> requestConfig) {
551557
552558 if (useHttpCompression ) {
553559 if (serverCompression ) {
554- addHeader (req , HttpHeaders .ACCEPT_ENCODING , "lz4" );
560+ setHeader (req , HttpHeaders .ACCEPT_ENCODING , DEFAULT_HTTP_COMPRESSION_ALGO );
555561 }
562+
556563 if (clientCompression && !appCompressedData ) {
557- addHeader (req , HttpHeaders .CONTENT_ENCODING , "lz4" );
564+ setHeader (req , HttpHeaders .CONTENT_ENCODING , DEFAULT_HTTP_COMPRESSION_ALGO );
558565 }
559566 }
560567
561568 for (String key : requestConfig .keySet ()) {
562569 if (key .startsWith (ClientConfigProperties .HTTP_HEADER_PREFIX )) {
563570 Object val = requestConfig .get (key );
564571 if (val != null ) {
565- addHeader (
572+ setHeader (
566573 req ,
567574 key .substring (ClientConfigProperties .HTTP_HEADER_PREFIX .length ()),
568575 String .valueOf (val ));
@@ -626,11 +633,20 @@ private void addQueryParams(URIBuilder req, Map<String, Object> requestConfig) {
626633 }
627634 }
628635
629- private HttpEntity wrapRequestEntity (HttpEntity httpEntity , boolean clientCompression , boolean useHttpCompression ,
630- boolean appControlledCompression , LZ4Factory lz4Factory , Map <String , Object > requestConfig ) {
631- LOG .debug ("wrapRequestEntity: client compression: {}, http compression: {}" , clientCompression , useHttpCompression );
636+ private HttpEntity wrapRequestEntity (HttpEntity httpEntity , LZ4Factory lz4Factory , Map <String , Object > requestConfig ) {
637+
638+ boolean clientCompression = ClientConfigProperties .COMPRESS_CLIENT_REQUEST .getOrDefault (requestConfig );
639+ boolean useHttpCompression = ClientConfigProperties .USE_HTTP_COMPRESSION .getOrDefault (requestConfig );
640+ boolean appCompressedData = ClientConfigProperties .APP_COMPRESSED_DATA .getOrDefault (requestConfig );
641+
642+ LOG .debug ("wrapRequestEntity: client compression: {}, http compression: {}, content encoding: {}" ,
643+ clientCompression , useHttpCompression , httpEntity .getContentEncoding ());
632644
633- if (clientCompression && !appControlledCompression ) {
645+ if (httpEntity .getContentEncoding () != null && !appCompressedData ) {
646+ // http header is set and data is not compressed
647+ String algo = getCompressionAlgoName (httpEntity .getContentEncoding ());
648+ return new CompressedEntity (httpEntity , false , CompressorStreamFactory .getSingleton (), algo );
649+ } else if (clientCompression && !appCompressedData ) {
634650 int buffSize = ClientConfigProperties .COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE .getOrDefault (requestConfig );
635651 return new LZ4Entity (httpEntity , useHttpCompression , false , true ,
636652 buffSize , false , lz4Factory );
@@ -639,30 +655,38 @@ private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompre
639655 }
640656 }
641657
642- private HttpEntity wrapResponseEntity (HttpEntity httpEntity , int httpStatus , boolean serverCompression , boolean useHttpCompression , LZ4Factory lz4Factory , Map <String , Object > requestConfig ) {
643- LOG .debug ("wrapResponseEntity: server compression: {}, http compression: {}" , serverCompression , useHttpCompression );
644-
645- if (serverCompression ) {
646- // Server doesn't compress certain errors like 403
647- switch (httpStatus ) {
648- case HttpStatus .SC_OK :
649- case HttpStatus .SC_CREATED :
650- case HttpStatus .SC_ACCEPTED :
651- case HttpStatus .SC_NO_CONTENT :
652- case HttpStatus .SC_PARTIAL_CONTENT :
653- case HttpStatus .SC_RESET_CONTENT :
654- case HttpStatus .SC_NOT_MODIFIED :
655- case HttpStatus .SC_BAD_REQUEST :
656- case HttpStatus .SC_INTERNAL_SERVER_ERROR :
657- case HttpStatus .SC_NOT_FOUND :
658- int buffSize = ClientConfigProperties .COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE .getOrDefault (requestConfig );
659- return new LZ4Entity (httpEntity , useHttpCompression , true , false , buffSize , true , lz4Factory );
660- }
658+ private HttpEntity wrapResponseEntity (HttpEntity httpEntity , int httpStatus , LZ4Factory lz4Factory , Map <String , Object > requestConfig ) {
659+ boolean serverCompression = ClientConfigProperties .COMPRESS_SERVER_RESPONSE .getOrDefault (requestConfig );
660+ boolean useHttpCompression = ClientConfigProperties .USE_HTTP_COMPRESSION .getOrDefault (requestConfig );
661+
662+ LOG .debug ("wrapResponseEntity: server compression: {}, http compression: {}, content encoding: {}" ,
663+ serverCompression , useHttpCompression , httpEntity .getContentEncoding ());
664+
665+ if (httpEntity .getContentEncoding () != null ) {
666+ // http compressed response
667+ String algo = getCompressionAlgoName (httpEntity .getContentEncoding ());
668+ return new CompressedEntity (httpEntity , true , CompressorStreamFactory .getSingleton (), algo );
669+ }
670+
671+ // data compression
672+ if (serverCompression && !(httpStatus == HttpStatus .SC_FORBIDDEN || httpStatus == HttpStatus .SC_UNAUTHORIZED )) {
673+ int buffSize = ClientConfigProperties .COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE .getOrDefault (requestConfig );
674+ return new LZ4Entity (httpEntity , useHttpCompression , true , false , buffSize , true , lz4Factory );
661675 }
662676
663677 return httpEntity ;
664678 }
665679
680+ private String getCompressionAlgoName (String contentEncoding ) {
681+ String algo = contentEncoding ;
682+ if (algo .equalsIgnoreCase ("gzip" )) {
683+ algo = CompressorStreamFactory .GZIP ;
684+ } else if (algo .equalsIgnoreCase ("lz4" )) {
685+ algo = CompressorStreamFactory .LZ4_FRAMED ;
686+ }
687+ return algo ;
688+ }
689+
666690 public static int getHeaderInt (Header header , int defaultValue ) {
667691 return getHeaderVal (header , defaultValue , Integer ::parseInt );
668692 }
@@ -803,8 +827,8 @@ public void close() {
803827 httpClient .close (CloseMode .IMMEDIATE );
804828 }
805829
806- private static <T > void addHeader (HttpRequest req , String headerName ,
807- String value )
830+ private static <T > void setHeader (HttpRequest req , String headerName ,
831+ String value )
808832 {
809833 if (value == null ) {
810834 return ;
@@ -814,10 +838,10 @@ private static <T> void addHeader(HttpRequest req, String headerName,
814838 return ;
815839 }
816840 if (PATTERN_HEADER_VALUE_ASCII .matcher (value ).matches ()) {
817- req .addHeader (headerName , value );
841+ req .setHeader (headerName , value );
818842 } else {
819843 try {
820- req .addHeader (
844+ req .setHeader (
821845 headerName + "*" ,
822846 "UTF-8''" + URLEncoder .encode (value , StandardCharsets .UTF_8 .name ()));
823847 } catch (UnsupportedEncodingException e ) {
0 commit comments