1515package output
1616
1717import (
18+ "encoding/json"
19+
1820 "github.com/cisco-open/operator-tools/pkg/secret"
1921
2022 "github.com/kube-logging/logging-operator/pkg/sdk/logging/model/types"
@@ -355,6 +357,21 @@ func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id str
355357 } else {
356358 kafka .Params = params
357359 }
360+
361+ if e .RdkafkaOptions != nil {
362+ if rdkafkaOptions , err := types .NewStructToStringMapper (secretLoader ).StringsMap (e .RdkafkaOptions ); err != nil {
363+ return nil , err
364+ } else {
365+ if len (rdkafkaOptions ) > 0 {
366+ marshaledRdkafkaOptions , err := json .Marshal (rdkafkaOptions )
367+ if err != nil {
368+ return nil , err
369+ }
370+ kafka .Params ["rdkafka_options" ] = string (marshaledRdkafkaOptions )
371+ }
372+ }
373+ }
374+
358375 if e .Buffer == nil {
359376 e .Buffer = & Buffer {}
360377 }
@@ -363,13 +380,6 @@ func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id str
363380 } else {
364381 kafka .SubDirectives = append (kafka .SubDirectives , buffer )
365382 }
366- if e .RdkafkaOptions != nil {
367- if rdkafkaOptions , err := e .RdkafkaOptions .ToDirective (secretLoader , id ); err != nil {
368- return nil , err
369- } else {
370- kafka .SubDirectives = append (kafka .SubDirectives , rdkafkaOptions )
371- }
372- }
373383
374384 if e .Format != nil {
375385 if format , err := e .Format .ToDirective (secretLoader , "" ); err != nil {
@@ -383,9 +393,3 @@ func (e *KafkaOutputConfig) ToDirective(secretLoader secret.SecretLoader, id str
383393 delete (kafka .Params , "use_rdkafka" )
384394 return kafka , nil
385395}
386-
387- func (o * RdkafkaOptions ) ToDirective (secretLoader secret.SecretLoader , id string ) (types.Directive , error ) {
388- return types .NewFlatDirective (types.PluginMeta {
389- Directive : "rdkafka_options" ,
390- }, o , secretLoader )
391- }
0 commit comments