Skip to content

Commit 6babb6f

Browse files
author
Robert Monegro
committed
Add option to provide additional fields
- Added the option to provide a dictionary with additional fields to be added to the logging record - Added hostname and hostip
1 parent 2f14a2f commit 6babb6f

File tree

1 file changed

+20
-1
lines changed

1 file changed

+20
-1
lines changed

kafka_logger/handlers.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import atexit
44
import json
55
import logging
6+
import socket
67
import sys
78
from threading import Lock, Timer
89

@@ -31,13 +32,18 @@ class KafkaLoggingHandler(logging.Handler):
3132
logger that will be used to log uhandled top-level exception
3233
3334
"""
35+
__LOGGING_FILTER_FIELDS = ['msecs',
36+
'relativeCreated',
37+
'levelno',
38+
'created']
3439

3540
def __init__(self,
3641
hosts_list,
3742
topic,
3843
security_protocol='SSL',
3944
ssl_cafile=None,
4045
kafka_producer_args=None,
46+
additional_fields={},
4147
flush_buffer_size=None,
4248
flush_interval=5.0,
4349
unhandled_exception_logger=None):
@@ -51,6 +57,9 @@ def __init__(self,
5157
ssl_cafile (None, optional): path to CA file
5258
kafka_producer_args (None, optional):
5359
extra arguments to pass to KafkaProducer
60+
additional_fields (None, optional):
61+
A dictionary with all the additional fields that you would like
62+
to add to the logs, such the application, environment, etc.
5463
flush_buffer_size (None/int, optional):
5564
flush if buffer > max size, None means there is no restriction
5665
flush_interval (int, optional): scheduled flush interval in seconds
@@ -75,9 +84,13 @@ def __init__(self,
7584
if flush_buffer_size is not None else float("inf")
7685
self.flush_interval = flush_interval
7786
self.timer = None
87+
self.additional_fields = additional_fields.copy()
88+
self.additional_fields.update({'host': socket.gethostname(),
89+
'host_ip': socket.gethostbyname(socket.gethostname())})
7890

7991
if kafka_producer_args is None:
8092
kafka_producer_args = {}
93+
8194
self.producer = KafkaProducer(
8295
bootstrap_servers=hosts_list,
8396
security_protocol=security_protocol,
@@ -109,8 +122,14 @@ def emit(self, record):
109122
record.msg = repr(record.msg)
110123
record.exc_info = repr(record.exc_info)
111124

125+
# Append additional fields
126+
rec = self.additional_fields.copy()
127+
for key, value in record.__dict__.items():
128+
if key not in KafkaLoggingHandler.__LOGGING_FILTER_FIELDS:
129+
rec[key] = "" if value is None else value
130+
112131
with self.buffer_lock:
113-
self.buffer.append(record.__dict__)
132+
self.buffer.append(rec)
114133

115134
# schedule a flush
116135
if len(self.buffer) >= self.max_buffer_size:

0 commit comments

Comments
 (0)