33import atexit
44import json
55import logging
6+ import socket
67import sys
78from threading import Lock , Timer
89
@@ -31,13 +32,18 @@ class KafkaLoggingHandler(logging.Handler):
3132 logger that will be used to log uhandled top-level exception
3233
3334 """
35+ __LOGGING_FILTER_FIELDS = ['msecs' ,
36+ 'relativeCreated' ,
37+ 'levelno' ,
38+ 'created' ]
3439
3540 def __init__ (self ,
3641 hosts_list ,
3742 topic ,
3843 security_protocol = 'SSL' ,
3944 ssl_cafile = None ,
4045 kafka_producer_args = None ,
46+ additional_fields = {},
4147 flush_buffer_size = None ,
4248 flush_interval = 5.0 ,
4349 unhandled_exception_logger = None ):
@@ -51,6 +57,9 @@ def __init__(self,
5157 ssl_cafile (None, optional): path to CA file
5258 kafka_producer_args (None, optional):
5359 extra arguments to pass to KafkaProducer
60+ additional_fields (None, optional):
61+ A dictionary with all the additional fields that you would like
62+ to add to the logs, such the application, environment, etc.
5463 flush_buffer_size (None/int, optional):
5564 flush if buffer > max size, None means there is no restriction
5665 flush_interval (int, optional): scheduled flush interval in seconds
@@ -75,9 +84,13 @@ def __init__(self,
7584 if flush_buffer_size is not None else float ("inf" )
7685 self .flush_interval = flush_interval
7786 self .timer = None
87+ self .additional_fields = additional_fields .copy ()
88+ self .additional_fields .update ({'host' : socket .gethostname (),
89+ 'host_ip' : socket .gethostbyname (socket .gethostname ())})
7890
7991 if kafka_producer_args is None :
8092 kafka_producer_args = {}
93+
8194 self .producer = KafkaProducer (
8295 bootstrap_servers = hosts_list ,
8396 security_protocol = security_protocol ,
@@ -109,8 +122,14 @@ def emit(self, record):
109122 record .msg = repr (record .msg )
110123 record .exc_info = repr (record .exc_info )
111124
125+ # Append additional fields
126+ rec = self .additional_fields .copy ()
127+ for key , value in record .__dict__ .items ():
128+ if key not in KafkaLoggingHandler .__LOGGING_FILTER_FIELDS :
129+ rec [key ] = "" if value is None else value
130+
112131 with self .buffer_lock :
113- self .buffer .append (record . __dict__ )
132+ self .buffer .append (rec )
114133
115134 # schedule a flush
116135 if len (self .buffer ) >= self .max_buffer_size :
0 commit comments