Skip to content

Commit 1578726

Browse files
authored
Merge pull request #11 from kusha/multiprocessing-support
Add multiprocessing support
2 parents 787a3a3 + dd94632 commit 1578726

File tree

2 files changed

+274
-9
lines changed

2 files changed

+274
-9
lines changed

examples/mt_mp.py

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
"""Usage of kafka-logging-handler with multiprocessing and multithreading."""
2+
3+
from concurrent.futures import ThreadPoolExecutor
4+
import logging
5+
import multiprocessing
6+
from multiprocessing import Process
7+
import os
8+
import sys
9+
import threading
10+
11+
from kafka_logger.handlers import KafkaLoggingHandler
12+
13+
REQUIRED_ENV_VARS = ['KAFKA_SERVER', 'KAFKA_CERT', 'KAFKA_TOPIC']
14+
15+
MAIN_PROCESS_THREADS = 2
16+
CHILD_PROCESSES = 2
17+
THREAD_POOL_WORKERS = 2
18+
THREAD_POOL_SIZE = 3
19+
20+
LOGGER = None
21+
22+
23+
def get_process_thread():
24+
"""Get string with process and thread names."""
25+
# you can get PID and thread ID as well:
26+
# os.getpid(), threading.current_thread().ident
27+
return "(process: {}, thread: {})".format(
28+
multiprocessing.current_process().name,
29+
threading.current_thread().name
30+
)
31+
32+
33+
def child_process(index):
34+
"""
35+
Log a message.
36+
37+
Args:
38+
index (int): index of the child process
39+
"""
40+
LOGGER.info("Hi, I'm child process #%d %s", index, get_process_thread())
41+
42+
43+
def grandchild_process():
44+
"""Log a message."""
45+
LOGGER.info("Hi, I'm sub sub process %s", get_process_thread())
46+
47+
48+
def child_process_with_grandchild():
49+
"""Spawn grandchild process."""
50+
LOGGER.info("I'm going to spawn another child %s", get_process_thread())
51+
subprocess = Process(
52+
target=grandchild_process,
53+
name="Grandchild process")
54+
subprocess.start()
55+
subprocess.join()
56+
57+
58+
def thread_worker(index):
59+
"""Log a message."""
60+
LOGGER.info(
61+
"Hi, I'm a thread worker #%d in the child process thread pool %s",
62+
index, get_process_thread())
63+
64+
65+
def child_process_with_threads():
66+
"""Run thread executor pool."""
67+
LOGGER.info("I'm going to spawn multiple threads %s", get_process_thread())
68+
with ThreadPoolExecutor(max_workers=THREAD_POOL_WORKERS) as executor:
69+
for thread_idx in range(THREAD_POOL_SIZE):
70+
executor.submit(thread_worker, thread_idx)
71+
72+
73+
def main_process_thread(index):
74+
"""Log a message."""
75+
LOGGER.info("Hi, I'm a thread #%d in the main process %s",
76+
index, get_process_thread())
77+
78+
79+
def child_process_with_exception():
80+
"""Raise an exception after start."""
81+
LOGGER.info("Hi, I'm child process that is going to raise exception %s",
82+
get_process_thread())
83+
raise Exception('This exception will not occur in Kafka!')
84+
85+
86+
def main():
87+
"""Setup logger and test logging."""
88+
global LOGGER
89+
90+
# validate that Kafka configuration is available
91+
assert all([(key in os.environ) for key in REQUIRED_ENV_VARS])
92+
93+
LOGGER = logging.getLogger("test.logger")
94+
LOGGER.propagate = False
95+
log_level = logging.DEBUG
96+
97+
log_format = logging.Formatter(
98+
'%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
99+
'%Y-%m-%dT%H:%M:%S')
100+
101+
# create handler to show logs at stdout
102+
stdout_handler = logging.StreamHandler(sys.stdout)
103+
stdout_handler.setLevel(log_level)
104+
stdout_handler.setFormatter(log_format)
105+
LOGGER.addHandler(stdout_handler)
106+
107+
# create Kafka logging handler
108+
kafka_handler = KafkaLoggingHandler(
109+
os.environ['KAFKA_SERVER'],
110+
os.environ['KAFKA_TOPIC'],
111+
security_protocol='SSL',
112+
ssl_cafile=os.environ['KAFKA_CERT'],
113+
unhandled_exception_logger=LOGGER,
114+
additional_fields={
115+
"service": "test_service"
116+
}
117+
)
118+
kafka_handler.setFormatter(log_format)
119+
LOGGER.addHandler(kafka_handler)
120+
121+
LOGGER.setLevel(log_level)
122+
123+
LOGGER.info("Hi there, I'm the main process! %s", get_process_thread())
124+
125+
# test child processes
126+
child_processes = []
127+
for idx in range(CHILD_PROCESSES):
128+
child = Process(
129+
target=child_process,
130+
name="Child process #{}".format(idx),
131+
args=(idx,))
132+
child_processes.append(child)
133+
child.start()
134+
135+
# testing threads in the main process
136+
threads = []
137+
for idx in range(MAIN_PROCESS_THREADS):
138+
thread = threading.Thread(
139+
target=main_process_thread,
140+
name="Thread of the main process #{}".format(idx),
141+
args=(idx, ))
142+
threads.append(thread)
143+
thread.start()
144+
# wait for threads to finish
145+
for thread in threads:
146+
thread.join()
147+
148+
# there is a chance of logs loss
149+
# if the main process terminates without joining child processes
150+
for child in child_processes:
151+
child.join()
152+
153+
# test if a child of a child process logs correctly
154+
child_with_subprocess = Process(
155+
target=child_process_with_grandchild,
156+
name="Child process that spawns another child")
157+
child_with_subprocess.start()
158+
child_with_subprocess.join()
159+
160+
# test threads in a child process
161+
child_with_threads = Process(
162+
target=child_process_with_threads,
163+
name="Child process that has a thread pool")
164+
child_with_threads.start()
165+
child_with_threads.join()
166+
167+
# test unhandled exception in a child process
168+
child_exception = Process(
169+
target=child_process_with_exception,
170+
name="Child process with an exception")
171+
child_exception.start()
172+
child_exception.join()
173+
174+
# top-level exception works only in the main process
175+
raise Exception('Testing top-level exception!')
176+
177+
178+
if __name__ == '__main__':
179+
main()

kafka_logger/handlers.py

Lines changed: 95 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
import atexit
44
import json
55
import logging
6+
from multiprocessing import Queue
7+
import os
68
import socket
79
import sys
8-
from threading import Lock, Timer
10+
from threading import Lock, Thread, Timer
11+
import time
912

1013
from kafka import KafkaProducer # pylint: disable=import-error
1114

@@ -21,11 +24,18 @@ class KafkaLoggingHandler(logging.Handler):
2124
This handler enables the user to forward logs to Kafka.
2225
2326
Attributes:
27+
additional_fields (dict): extra fields attached to logs
2428
buffer (list): logs (dict) waiting for a flush to Kafka
2529
buffer_lock (threading.Lock): multithreading lock for a buffer access
2630
flush_interval (float): scheduled flush interval in seconds
2731
kafka_topic_name (str): topic name
32+
main_process_pid (int): pid of the process which initialized logger
2833
max_buffer_size (int): flush if buffer > max size
34+
mp_log_handler_flush_lock (threading.Lock):
35+
locked when mp_log_handler_thread flushes logs
36+
mp_log_handler_thread (threading.Thread): thread that flushed mp queue
37+
mp_log_queue (multiprocessing.Queue):
38+
queue used to redirect logs of child processes
2939
producer (kafka.KafkaProducer): producer object
3040
timer (threading.Timer): thread with a scheduled logs flush
3141
unhandled_exception_logger (logging.Logger):
@@ -37,6 +47,7 @@ class KafkaLoggingHandler(logging.Handler):
3747
'relativeCreated',
3848
'levelno',
3949
'created']
50+
__MULTIPROCESSING_QUEUE_FLUSH_DELAY = 0.2
4051

4152
def __init__(self,
4253
hosts_list,
@@ -102,20 +113,35 @@ def __init__(self,
102113
**kafka_producer_args)
103114

104115
# setup exit hooks
116+
# exit hooks work only in main process
117+
# termination of child processes uses os.exit() and ignore any hooks
105118
atexit.register(self.at_exit)
106119
sys.excepthook = self.unhandled_exception
107120

108-
def emit(self, record):
121+
# multiprocessing support
122+
self.main_process_pid = os.getpid()
123+
self.mp_log_queue = Queue()
124+
# main process thread that will flush mp queue
125+
self.mp_log_handler_flush_lock = Lock()
126+
self.mp_log_handler_thread = Thread(
127+
target=self.mp_log_handler,
128+
name="Kafka Logger Multiprocessing Handler")
129+
# daemon will terminate with the main process
130+
self.mp_log_handler_thread.setDaemon(True)
131+
self.mp_log_handler_thread.start()
132+
133+
def prepare_record_dict(self, record):
109134
"""
110-
Add a new log to the buffer.
135+
Prepare a dictionary log item.
136+
137+
Format a log record and extend dictionary with default values.
111138
112139
Args:
113-
record: Logging message
114-
"""
115-
# drop Kafka logging to avoid infinite recursion.
116-
if record.name == 'kafka.client':
117-
return
140+
record (logging.LogRecord): log record
118141
142+
Returns:
143+
dict: log item ready for Kafka
144+
"""
119145
# use default formatting
120146
# Update the msg dict to include all of the message attributes
121147
self.format(record)
@@ -137,8 +163,37 @@ def emit(self, record):
137163
value = tuple(arg.__repr__() for arg in value)
138164
rec[key] = "" if value is None else value
139165

166+
return rec
167+
168+
def emit(self, record):
169+
"""
170+
Add log to the buffer or forward to the main process.
171+
172+
Args:
173+
record: Logging message
174+
"""
175+
# drop Kafka logging to avoid infinite recursion.
176+
if record.name == 'kafka.client':
177+
return
178+
179+
record_dict = self.prepare_record_dict(record)
180+
181+
if os.getpid() == self.main_process_pid:
182+
self.append_to_buffer(record_dict)
183+
else: # if forked
184+
self.mp_log_queue.put(record_dict)
185+
186+
def append_to_buffer(self, record_dict):
187+
"""
188+
Place log dictionary to the buffer.
189+
190+
Triggers/schedules a flush of the buffer.
191+
192+
Args:
193+
record_dict (dict): log item
194+
"""
140195
with self.buffer_lock:
141-
self.buffer.append(rec)
196+
self.buffer.append(record_dict)
142197

143198
# schedule a flush
144199
if len(self.buffer) >= self.max_buffer_size:
@@ -175,6 +230,16 @@ def schedule_flush(self):
175230
self.timer.setDaemon(True)
176231
self.timer.start()
177232

233+
def mp_log_handler(self):
234+
"""Emit logs from multiprocessing queue."""
235+
while True:
236+
if self.mp_log_handler_flush_lock.locked():
237+
# second+ iteration
238+
self.mp_log_handler_flush_lock.release()
239+
record_dict = self.mp_log_queue.get(block=True) # wait for logs
240+
self.mp_log_handler_flush_lock.acquire()
241+
self.append_to_buffer(record_dict)
242+
178243
def at_exit(self):
179244
"""
180245
Flush logs at exit, close the producer.
@@ -183,6 +248,27 @@ def at_exit(self):
183248
Kafka raises RecordAccumulator in case of flushing in close method.
184249
"""
185250
# Kafka's RecordAccumulator is still alive here
251+
if self.unhandled_exception_logger is not None:
252+
# check if there are running subprocesses and log a warning
253+
try:
254+
import psutil
255+
main_process = psutil.Process(pid=self.main_process_pid)
256+
except ImportError:
257+
pass
258+
except psutil.NoSuchProcess:
259+
pass
260+
else:
261+
children = main_process.children(recursive=True)
262+
if children:
263+
self.unhandled_exception_logger.warning(
264+
"There are %d child process(es) at the moment of the "
265+
"main process termination. This may cause logs loss.",
266+
len(children))
267+
while self.mp_log_queue.qsize() != 0:
268+
time.sleep(KafkaLoggingHandler.__MULTIPROCESSING_QUEUE_FLUSH_DELAY)
269+
# wait until everything in multiprocessing queue will be buffered
270+
self.mp_log_handler_flush_lock.acquire()
271+
186272
if self.timer is not None:
187273
self.flush()
188274
self.producer.close()

0 commit comments

Comments
 (0)