Skip to content

Commit 559cb35

Browse files
mbirgerkusha
authored andcommitted
Add log preprocessing
Handler have an optional argument (list of functions). Raw logs are processed by each function before they will be added to the buffer.
1 parent 2d4c0d1 commit 559cb35

File tree

2 files changed

+95
-1
lines changed

2 files changed

+95
-1
lines changed

examples/preprocess.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
"""Simple usage example of kafka-logging-handler."""
2+
3+
import logging
4+
import os
5+
import sys
6+
import time
7+
import random
8+
9+
from kafka_logger.handlers import KafkaLoggingHandler
10+
11+
REQUIRED_ENV_VARS = ['KAFKA_SERVER', 'KAFKA_CERT', 'KAFKA_TOPIC']
12+
PASSWORDS = ['qwerty', '12345', 'password']
13+
14+
15+
def main():
16+
"""Setup logger and test logging."""
17+
# validate that Kafka configuration is available
18+
assert all([(key in os.environ) for key in REQUIRED_ENV_VARS])
19+
20+
logger = logging.getLogger("test.logger")
21+
logger.propagate = False
22+
log_level = logging.DEBUG
23+
24+
log_format = logging.Formatter(
25+
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
26+
'%Y-%m-%dT%H:%M:%S')
27+
28+
# create handler to show logs at stdout
29+
stdout_handler = logging.StreamHandler(sys.stdout)
30+
stdout_handler.setLevel(log_level)
31+
stdout_handler.setFormatter(log_format)
32+
logger.addHandler(stdout_handler)
33+
34+
def remove_lineno(log):
35+
"""Example preprocessor, includes lineno to message field."""
36+
log['message'] += ' (at line {})'.format(log['lineno'])
37+
del log['lineno']
38+
log['custom_field'] = 42
39+
return log
40+
41+
def hide_passwords(log):
42+
"""Example preprocessor, hides passwords."""
43+
for password in PASSWORDS:
44+
hidden_password = \
45+
password[0] + \
46+
'*' * (len(password) - 2) + \
47+
password[-1]
48+
log['message'] = log['message'].replace(password, hidden_password)
49+
return log
50+
51+
# create Kafka logging handler
52+
kafka_handler = KafkaLoggingHandler(
53+
os.environ['KAFKA_SERVER'],
54+
os.environ['KAFKA_TOPIC'],
55+
security_protocol='SSL',
56+
ssl_cafile=os.environ['KAFKA_CERT'],
57+
# you can configure how often logger will send logs to Kafka
58+
# flush_buffer_size=3, # uncomment to see that it works slower
59+
# flush_interval=3.0, # interval in seconds
60+
unhandled_exception_logger=logger,
61+
kafka_producer_args={
62+
'api_version_auto_timeout_ms': 1000000,
63+
'request_timeout_ms': 1000000,
64+
},
65+
# you can include arbitrary fields to all produced logs
66+
additional_fields={
67+
"service": "test_service"
68+
},
69+
log_preprocess=[remove_lineno, hide_passwords]
70+
)
71+
kafka_handler.setFormatter(log_format)
72+
logger.addHandler(kafka_handler)
73+
74+
logger.setLevel(log_level)
75+
76+
# test logging
77+
logger.debug("Test debug level logs")
78+
for idx in range(3):
79+
logger.info("Test log #%d: %s", idx, random.choice(PASSWORDS))
80+
time.sleep(0.5)
81+
82+
83+
if __name__ == '__main__':
84+
main()

kafka_logger/handlers.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ def __init__(self,
5858
additional_fields={},
5959
flush_buffer_size=None,
6060
flush_interval=5.0,
61-
unhandled_exception_logger=None):
61+
unhandled_exception_logger=None,
62+
log_preprocess=None):
6263
"""
6364
Initialize the handler.
6465
@@ -77,6 +78,9 @@ def __init__(self,
7778
flush_interval (int, optional): scheduled flush interval in seconds
7879
unhandled_exception_logger (None/logging.Logger, optional):
7980
logger that will be used to log uhandled top-level exception
81+
log_preprocess (None/list, optional):
82+
list of functions, handler will send the following to Kafka
83+
...preprocess[1](preprocess[0](raw_log))...
8084
8185
Raises:
8286
KafkaLoggerException: in case of incorrect logger configuration
@@ -101,6 +105,8 @@ def __init__(self,
101105
'host': socket.gethostname(),
102106
'host_ip': socket.gethostbyname(socket.gethostname())
103107
})
108+
self.log_preprocess = \
109+
log_preprocess if log_preprocess is not None else []
104110

105111
if kafka_producer_args is None:
106112
kafka_producer_args = {}
@@ -167,6 +173,10 @@ def prepare_record_dict(self, record):
167173
value = str(value)
168174
rec[key] = "" if value is None else value
169175

176+
# apply preprocessor(s)
177+
for preprocessor in self.log_preprocess:
178+
rec = preprocessor(rec)
179+
170180
return rec
171181

172182
def emit(self, record):

0 commit comments

Comments
 (0)