11"""Usage of kafka-logging-handler with multiprocessing and multithreading."""
2-
2+ # pylint: disable=global-statement
33from concurrent .futures import ThreadPoolExecutor
44import logging
55import multiprocessing
1010
1111from kafka_logger .handlers import KafkaLoggingHandler
1212
13- REQUIRED_ENV_VARS = [' KAFKA_SERVER' , ' KAFKA_CERT' , ' KAFKA_TOPIC' ]
13+ REQUIRED_ENV_VARS = [" KAFKA_SERVER" , " KAFKA_CERT" , " KAFKA_TOPIC" ]
1414
1515MAIN_PROCESS_THREADS = 2
1616CHILD_PROCESSES = 2
@@ -25,8 +25,7 @@ def get_process_thread():
2525 # you can get PID and thread ID as well:
2626 # os.getpid(), threading.current_thread().ident
2727 return "(process: {}, thread: {})" .format (
28- multiprocessing .current_process ().name ,
29- threading .current_thread ().name
28+ multiprocessing .current_process ().name , threading .current_thread ().name
3029 )
3130
3231
@@ -48,9 +47,7 @@ def grandchild_process():
4847def child_process_with_grandchild ():
4948 """Spawn grandchild process."""
5049 LOGGER .info ("I'm going to spawn another child %s" , get_process_thread ())
51- subprocess = Process (
52- target = grandchild_process ,
53- name = "Grandchild process" )
50+ subprocess = Process (target = grandchild_process , name = "Grandchild process" )
5451 subprocess .start ()
5552 subprocess .join ()
5653
@@ -59,7 +56,9 @@ def thread_worker(index):
5956 """Log a message."""
6057 LOGGER .info (
6158 "Hi, I'm a thread worker #%d in the child process thread pool %s" ,
62- index , get_process_thread ())
59+ index ,
60+ get_process_thread (),
61+ )
6362
6463
6564def child_process_with_threads ():
@@ -72,15 +71,18 @@ def child_process_with_threads():
7271
7372def main_process_thread (index ):
7473 """Log a message."""
75- LOGGER .info ("Hi, I'm a thread #%d in the main process %s" ,
76- index , get_process_thread ())
74+ LOGGER .info (
75+ "Hi, I'm a thread #%d in the main process %s" , index , get_process_thread ()
76+ )
7777
7878
7979def child_process_with_exception ():
8080 """Raise an exception after start."""
81- LOGGER .info ("Hi, I'm child process that is going to raise exception %s" ,
82- get_process_thread ())
83- raise Exception ('This exception will not occur in Kafka!' )
81+ LOGGER .info (
82+ "Hi, I'm child process that is going to raise exception %s" ,
83+ get_process_thread (),
84+ )
85+ raise Exception ("This exception will not occur in Kafka!" )
8486
8587
8688def main ():
@@ -95,8 +97,8 @@ def main():
9597 log_level = logging .DEBUG
9698
9799 log_format = logging .Formatter (
98- ' %(asctime)s %(name)-12s %(levelname)-8s %(message)s' ,
99- '%Y-%m-%dT%H:%M:%S' )
100+ " %(asctime)s %(name)-12s %(levelname)-8s %(message)s" , "%Y-%m-%dT%H:%M:%S"
101+ )
100102
101103 # create handler to show logs at stdout
102104 stdout_handler = logging .StreamHandler (sys .stdout )
@@ -106,14 +108,12 @@ def main():
106108
107109 # create Kafka logging handler
108110 kafka_handler = KafkaLoggingHandler (
109- os .environ [' KAFKA_SERVER' ],
110- os .environ [' KAFKA_TOPIC' ],
111- security_protocol = ' SSL' ,
112- ssl_cafile = os .environ [' KAFKA_CERT' ],
111+ os .environ [" KAFKA_SERVER" ],
112+ os .environ [" KAFKA_TOPIC" ],
113+ security_protocol = " SSL" ,
114+ ssl_cafile = os .environ [" KAFKA_CERT" ],
113115 unhandled_exception_logger = LOGGER ,
114- additional_fields = {
115- "service" : "test_service"
116- }
116+ additional_fields = {"service" : "test_service" },
117117 )
118118 kafka_handler .setFormatter (log_format )
119119 LOGGER .addHandler (kafka_handler )
@@ -126,9 +126,8 @@ def main():
126126 child_processes = []
127127 for idx in range (CHILD_PROCESSES ):
128128 child = Process (
129- target = child_process ,
130- name = "Child process #{}" .format (idx ),
131- args = (idx ,))
129+ target = child_process , name = "Child process #{}" .format (idx ), args = (idx ,)
130+ )
132131 child_processes .append (child )
133132 child .start ()
134133
@@ -138,7 +137,8 @@ def main():
138137 thread = threading .Thread (
139138 target = main_process_thread ,
140139 name = "Thread of the main process #{}" .format (idx ),
141- args = (idx , ))
140+ args = (idx ,),
141+ )
142142 threads .append (thread )
143143 thread .start ()
144144 # wait for threads to finish
@@ -153,27 +153,28 @@ def main():
153153 # test if a child of a child process logs correctly
154154 child_with_subprocess = Process (
155155 target = child_process_with_grandchild ,
156- name = "Child process that spawns another child" )
156+ name = "Child process that spawns another child" ,
157+ )
157158 child_with_subprocess .start ()
158159 child_with_subprocess .join ()
159160
160161 # test threads in a child process
161162 child_with_threads = Process (
162- target = child_process_with_threads ,
163- name = "Child process that has a thread pool" )
163+ target = child_process_with_threads , name = "Child process that has a thread pool"
164+ )
164165 child_with_threads .start ()
165166 child_with_threads .join ()
166167
167168 # test unhandled exception in a child process
168169 child_exception = Process (
169- target = child_process_with_exception ,
170- name = "Child process with an exception" )
170+ target = child_process_with_exception , name = "Child process with an exception"
171+ )
171172 child_exception .start ()
172173 child_exception .join ()
173174
174175 # top-level exception works only in the main process
175- raise Exception (' Testing top-level exception!' )
176+ raise Exception (" Testing top-level exception!" )
176177
177178
178- if __name__ == ' __main__' :
179+ if __name__ == " __main__" :
179180 main ()
0 commit comments