Skip to content

Commit 259f4b9

Browse files
authored
Merge pull request #18 from kusha/fix-mp-shutdown
Fix mp shutdown
2 parents df1c5cb + dea6f25 commit 259f4b9

File tree

2 files changed

+125
-0
lines changed

2 files changed

+125
-0
lines changed

examples/verify_shutdown.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
"""Usage of kafka-logging-handler with multiprocessing and multithreading."""
2+
3+
from concurrent.futures import ThreadPoolExecutor
4+
import logging
5+
import multiprocessing
6+
from multiprocessing import Process
7+
import os
8+
import sys
9+
import threading
10+
11+
from kafka_logger.handlers import KafkaLoggingHandler
12+
13+
REQUIRED_ENV_VARS = ['KAFKA_SERVER', 'KAFKA_CERT', 'KAFKA_TOPIC']
14+
15+
CHILD_PROCESSES = 5
16+
CHILD_THREADS = 5
17+
18+
LOGGER = None
19+
20+
21+
def get_process_thread():
22+
"""Get string with process and thread names."""
23+
# you can get PID and thread ID as well:
24+
# os.getpid(), threading.current_thread().ident
25+
return "(process: {}, thread: {})".format(
26+
multiprocessing.current_process().name,
27+
threading.current_thread().name
28+
)
29+
30+
31+
def child_process(index):
32+
"""
33+
Log a message.
34+
35+
Args:
36+
index (int): index of the child process
37+
"""
38+
LOGGER.info("Hi, I'm child process #%d %s", index, get_process_thread())
39+
logging.shutdown()
40+
41+
42+
def thread_function(index):
43+
"""Log a message."""
44+
LOGGER.info("Hi, I'm a thread #%d in the main process %s",
45+
index, get_process_thread())
46+
logging.shutdown()
47+
48+
49+
def main():
50+
"""Setup logger and test logging."""
51+
global LOGGER
52+
53+
# validate that Kafka configuration is available
54+
assert all([(key in os.environ) for key in REQUIRED_ENV_VARS])
55+
56+
LOGGER = logging.getLogger("test.logger")
57+
LOGGER.propagate = False
58+
log_level = logging.DEBUG
59+
60+
log_format = logging.Formatter(
61+
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
62+
'%Y-%m-%dT%H:%M:%S')
63+
64+
# create handler to show logs at stdout
65+
stdout_handler = logging.StreamHandler(sys.stdout)
66+
stdout_handler.setLevel(log_level)
67+
stdout_handler.setFormatter(log_format)
68+
LOGGER.addHandler(stdout_handler)
69+
70+
# create Kafka logging handler
71+
kafka_handler = KafkaLoggingHandler(
72+
os.environ['KAFKA_SERVER'],
73+
os.environ['KAFKA_TOPIC'],
74+
security_protocol='SSL',
75+
ssl_cafile=os.environ['KAFKA_CERT'],
76+
unhandled_exception_logger=LOGGER,
77+
additional_fields={
78+
"service": "test_service"
79+
}
80+
)
81+
kafka_handler.setFormatter(log_format)
82+
LOGGER.addHandler(kafka_handler)
83+
84+
LOGGER.setLevel(log_level)
85+
86+
LOGGER.info("Hi there, I'm the main process! %s", get_process_thread())
87+
88+
# test child processes
89+
child_processes = []
90+
for idx in range(CHILD_PROCESSES):
91+
child = Process(
92+
target=child_process,
93+
name="Child process #{}".format(idx),
94+
args=(idx,))
95+
child_processes.append(child)
96+
child.start()
97+
98+
import time
99+
time.sleep(1) # in the main proc only
100+
alive = [proc.is_alive() for proc in child_processes]
101+
assert not any(alive)
102+
103+
LOGGER.info('Multiprocessing logging.shutdown() works')
104+
105+
threads = []
106+
for idx in range(CHILD_THREADS):
107+
thread = threading.Thread(
108+
target=thread_function,
109+
name="Thread of the main process #{}".format(idx),
110+
args=(idx, ))
111+
threads.append(thread)
112+
thread.start()
113+
# wait for threads to finish
114+
for thread in threads:
115+
thread.join()
116+
117+
LOGGER.info('Multithreding logging.shutdown() works')
118+
119+
120+
if __name__ == '__main__':
121+
main()

kafka_logger/handlers.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ def flush(self):
230230
Skip if the buffer is empty.
231231
Uses multithreading lock to access buffer.
232232
"""
233+
# if flush is triggered in a child process => skip
234+
# logging.shutdown() can trigger flush()
235+
if os.getpid() != self.main_process_pid:
236+
return
233237
# clean up the timer (reached max buffer size)
234238
if self.timer is not None and self.timer.is_alive():
235239
self.timer.cancel()

0 commit comments

Comments
 (0)