Skip to content

Commit 937410c

Browse files
authored
Merge pull request #27 from kusha/failsafe
Make startup of the handler fail-safe
2 parents 101fe34 + 03dcbae commit 937410c

File tree

1 file changed

+69
-53
lines changed

1 file changed

+69
-53
lines changed

kafka_logger/handlers.py

Lines changed: 69 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -87,62 +87,70 @@ def __init__(
8787
8888
"""
8989
logging.Handler.__init__(self)
90+
self.enabled = False
9091

91-
if security_protocol == "SSL" and ssl_cafile is None:
92-
raise KafkaLoggerException("SSL CA file isn't provided.")
92+
try:
93+
if security_protocol == "SSL" and ssl_cafile is None:
94+
raise KafkaLoggerException("SSL CA file isn't provided.")
9395

94-
self.kafka_topic_name = topic
95-
self.unhandled_exception_logger = unhandled_exception_logger
96+
self.kafka_topic_name = topic
97+
self.unhandled_exception_logger = unhandled_exception_logger
9698

97-
self.buffer = []
98-
self.buffer_lock = Lock()
99-
self.max_buffer_size = (
100-
flush_buffer_size if flush_buffer_size is not None else float("inf")
101-
)
102-
self.flush_interval = flush_interval
103-
self.timer = None
104-
self.additional_fields = {}
105-
if additional_fields:
99+
self.buffer = []
100+
self.buffer_lock = Lock()
101+
self.max_buffer_size = (
102+
flush_buffer_size if flush_buffer_size is not None else float("inf")
103+
)
104+
self.flush_interval = flush_interval
105+
self.timer = None
106106
self.additional_fields = additional_fields.copy()
107-
self.additional_fields.update(
108-
{
109-
"host": socket.gethostname(),
110-
"host_ip": socket.gethostbyname(socket.gethostname()),
111-
}
112-
)
113-
self.log_preprocess = log_preprocess if log_preprocess is not None else []
114-
115-
if kafka_producer_args is None:
116-
kafka_producer_args = {}
117-
118-
self.producer = KafkaProducer(
119-
bootstrap_servers=hosts_list,
120-
security_protocol=security_protocol,
121-
ssl_cafile=ssl_cafile,
122-
value_serializer=lambda msg: json.dumps(msg).encode("utf-8"),
123-
**kafka_producer_args
124-
)
125-
126-
# setup exit hooks
127-
# exit hooks work only in main process
128-
# termination of child processes uses os.exit() and ignore any hooks
129-
atexit.register(self.at_exit)
130-
131-
# Dont touch sys.excepthook if no logger provided
132-
if self.unhandled_exception_logger is not None:
133-
sys.excepthook = self.unhandled_exception
134-
135-
# multiprocessing support
136-
self.main_process_pid = os.getpid()
137-
self.mp_log_queue = multiprocessing.Queue()
138-
# main process thread that will flush mp queue
139-
self.mp_log_handler_flush_lock = Lock()
140-
self.mp_log_handler_thread = Thread(
141-
target=self.mp_log_handler, name="Kafka Logger Multiprocessing Handler"
142-
)
143-
# daemon will terminate with the main process
144-
self.mp_log_handler_thread.setDaemon(True)
145-
self.mp_log_handler_thread.start()
107+
self.additional_fields.update(
108+
{
109+
"host": socket.gethostname(),
110+
"host_ip": socket.gethostbyname(socket.gethostname()),
111+
}
112+
)
113+
self.log_preprocess = log_preprocess if log_preprocess is not None else []
114+
115+
if kafka_producer_args is None:
116+
kafka_producer_args = {}
117+
118+
self.producer = KafkaProducer(
119+
bootstrap_servers=hosts_list,
120+
security_protocol=security_protocol,
121+
ssl_cafile=ssl_cafile,
122+
value_serializer=lambda msg: json.dumps(msg).encode("utf-8"),
123+
**kafka_producer_args
124+
)
125+
126+
# setup exit hooks
127+
# exit hooks work only in main process
128+
# termination of child processes uses os.exit() and ignore any hooks
129+
atexit.register(self.at_exit)
130+
131+
# Dont touch sys.excepthook if no logger provided
132+
if self.unhandled_exception_logger is not None:
133+
sys.excepthook = self.unhandled_exception
134+
135+
# multiprocessing support
136+
self.main_process_pid = os.getpid()
137+
self.mp_log_queue = Queue()
138+
# main process thread that will flush mp queue
139+
self.mp_log_handler_flush_lock = Lock()
140+
self.mp_log_handler_thread = Thread(
141+
target=self.mp_log_handler, name="Kafka Logger Multiprocessing Handler"
142+
)
143+
# daemon will terminate with the main process
144+
self.mp_log_handler_thread.setDaemon(True)
145+
self.mp_log_handler_thread.start()
146+
147+
self.enabled = True
148+
149+
except Exception:
150+
logging.exception("Startup error of the Kafka logging handler")
151+
152+
# teardown failed startup
153+
atexit.unregister(self.at_exit)
146154

147155
def prepare_record_dict(self, record):
148156
"""
@@ -204,6 +212,9 @@ def emit(self, record):
204212
if record.name == "kafka.client":
205213
return
206214

215+
if not self.enabled:
216+
return
217+
207218
record_dict = self.prepare_record_dict(record)
208219

209220
if os.getpid() == self.main_process_pid:
@@ -236,10 +247,15 @@ def flush(self):
236247
Skip if the buffer is empty.
237248
Uses multithreading lock to access buffer.
238249
"""
250+
# logging.shutdown() can trigger flush() directly
251+
# main_process_pid is unknown if startup failed
252+
if not self.enabled:
253+
return
254+
239255
# if flush is triggered in a child process => skip
240-
# logging.shutdown() can trigger flush()
241256
if os.getpid() != self.main_process_pid:
242257
return
258+
243259
# clean up the timer (reached max buffer size)
244260
if self.timer is not None and self.timer.is_alive():
245261
self.timer.cancel()

0 commit comments

Comments
 (0)