55import traceback
66import dataclasses
77from contextlib import contextmanager
8- from datetime import datetime
8+ from datetime import datetime , timedelta
99
1010import sqlalchemy as sa
1111import sqlalchemy .orm as orm
1414EPOCH = datetime (1970 , 1 , 1 )
1515
1616
17- class JobLockedError (Exception ):
17+ class JobExecutionError (Exception ):
18+ pass
19+
20+
21+ class JobLockedError (JobExecutionError ):
1822 """
1923 Raised when try to start a locked job.
2024 """
2125
2226 pass
2327
2428
25- class JobIgnoredError (Exception ):
29+ class JobIsNotReadyToStartError (JobExecutionError ):
30+ """
31+ Raised when try to start job that the current status shows that it is not
32+ ready to start.
33+ """
34+
35+ pass
36+
37+
38+ class JobAlreadySucceededError (JobIsNotReadyToStartError ):
39+ """
40+ Raised when try to start a succeeded (failed too many times) job.
41+ """
42+
43+ pass
44+
45+
46+ class JobIgnoredError (JobIsNotReadyToStartError ):
2647 """
27- Raised when try to start a ignored (failed too many times) job.
48+ Raised when try to start an ignored (failed too many times) job.
2849 """
2950
3051 pass
@@ -56,8 +77,15 @@ class JobMixin:
5677 进行加锁操作. 如果在 #1 之后, #3 之前有人把这个 Job 锁上了, 这个 SQL 就不会执行成功,
5778 我们也就视为获取锁失败.
5879
59- 注, 这里我们故意没有用 ``SELECT ... WHERE ... FOR UPDATE`` 的行锁语法, 因为我们
80+ 注 1 , 这里我们故意没有用 ``SELECT ... WHERE ... FOR UPDATE`` 的行锁语法, 因为我们
6081 需要显式的维护这个锁的开关和生命周期.
82+
83+ 注 2, 我们是先获得这个 job, 检查是否上锁, 然后再 update 上锁. 你可能会担心在检查成功后
84+ 到 update 上锁期间如果有其他人把这个锁锁上了怎么办? 这个问题是不存在的, 因为 update 里的
85+ where 会保证如果尝试上锁的时候已经被上锁了, 这个 update 会失败. 再一个你可能会问为什么不
86+ 先 update 上锁, 再获取 job. 因为我们希望当这个 job 已经被上锁时, 其他的并发 worker 能够
87+ 用最小的代价了解到这个 job 已经被上锁了. 而明显 get 的代价比 update 要小得多, 所以
88+ 优先用 get 来获得 job 检查锁的状态.
6189 """
6290
6391 # fmt: off
@@ -191,7 +219,8 @@ def _lock_it(
191219 self .lock_at = utc_now
192220 if debug : # pragma: no cover
193221 print (" Successfully lock the job!" )
194- else :
222+ # if someone else locked the job before us, we will enter this branch
223+ else : # pragma: no cover
195224 if debug : # pragma: no cover
196225 print (" Failed to lock the job" )
197226 raise JobLockedError (f"Job { self .id !r} " )
@@ -248,12 +277,15 @@ def start(
248277 cls ,
249278 engine : sa .Engine ,
250279 id : str ,
251- in_process_status : int ,
280+ pending_status : int ,
281+ in_progress_status : int ,
252282 failed_status : int ,
253- success_status : int ,
254- ignore_status : int ,
283+ succeeded_status : int ,
284+ ignored_status : int ,
255285 expire : int ,
256286 max_retry : int ,
287+ more_pending_status : T .Optional [T .Union [int , T .List [int ]]] = None ,
288+ traceback_stack_limit : int = 10 ,
257289 skip_error : bool = False ,
258290 debug : bool = False ,
259291 ) -> T .ContextManager [T .Tuple ["T_JOB" , "Updates" ]]:
@@ -273,6 +305,7 @@ def start(
273305 with Job.start(
274306 engine=engine,
275307 id="job-1",
308+ pending_status=10,
276309 in_process_status=20,
277310 failed_status=30,
278311 success_status=40,
@@ -290,36 +323,89 @@ def start(
290323
291324 :param engine: SQLAlchemy engine. A life-cycle of a job has to be done
292325 in a new session.
326+ :param id: unique job id, usually the primary key of the job table.
327+ todo, add support to allow compound primary key.
328+ :param pending_status: pending status code in integer.
329+ :param in_progress_status: in_progress status code in integer.
330+ :param failed_status: failed status code in integer.
331+ :param succeeded_status: succeeded status code in integer.
332+ :param ignored_status: ignored status code in integer.
333+ :param more_pending_status: additional pending status code that logically
334+ equal to "pending" status.
335+ :param max_retry: how many retry is allowed before we ignore it
336+ :param expire: how long the lock will expire
337+ :param skip_error: if True, ignore the error during the job execution logics.
338+ note that this flag won't ignore the error during the context manager
339+ start up and clean up. For example, it won't ignore the :class:`JobLockedError`.
340+ :param debug: if True, print debug message.
341+
342+ 注: 这里的设计跟 pynamodb_mate 中的 status tracker 模块不同. 这里没有
343+ detailed_error 这个参数. 这是因为在 sql 中我们会先 get job 再 update 获取锁, 所以
344+ 在获取锁失败时我们无需再次查询数据库来了解错误原因. 而 dynamodb 是先 update 获取锁,
345+ 出错后如需了解详细的错误原因需要一次额外的 get 操作.
293346 """
294347 if debug : # pragma: no cover
295348 print ("{msg:-^80}" .format (msg = (f" ▶️ start Job { id !r} " )))
296349
297350 updates = Updates ()
298351
299352 with orm .Session (engine ) as ses :
300- job = ses .get (cls , id )
353+ job : T . Optional [ "T_JOB" ] = ses .get (cls , id )
301354 if job is None : # pragma: no cover
302355 raise ValueError
303356
304357 if job .is_locked (expire = expire ):
305358 if debug : # pragma: no cover
306- print (f"Job { id !r} is locked." )
359+ print (f"❌ Job { id !r} is locked." )
307360 raise JobLockedError (f"Job { id !r} is locked." )
308361
309- if job .status == ignore_status :
310- if debug : # pragma: no cover
311- print (f"↪️ the job is ignored, do nothing!" )
312- raise JobIgnoredError (
313- f"Job { id !r} retry count already exceeded { max_retry } , "
314- f"ignore it."
315- )
362+ ready_to_start_status = [
363+ pending_status ,
364+ failed_status ,
365+ ]
366+ if more_pending_status is None :
367+ pass
368+ elif isinstance (more_pending_status , int ):
369+ ready_to_start_status .append (more_pending_status )
370+ else :
371+ ready_to_start_status .extend (more_pending_status )
372+
373+ if job .status not in ready_to_start_status :
374+ if job .status == succeeded_status :
375+ if debug : # pragma: no cover
376+ print (f"❌ Job { id !r} is already succeeded, do nothing." )
377+ raise JobAlreadySucceededError (
378+ f"Job { id !r} is already succeeded, do nothing."
379+ )
380+ elif job .status == ignored_status :
381+ if debug : # pragma: no cover
382+ print (f"❌ Job { id !r} is ignored, do nothing." )
383+ raise JobIgnoredError (
384+ f"Job { id !r} retry count already exceeded { max_retry } , "
385+ f"ignore it."
386+ )
387+ elif job .status not in ready_to_start_status :
388+ if debug : # pragma: no cover
389+ print (
390+ f"❌ Job { id !r} status is { job .status } , "
391+ f"it is not any of the ready-to-start status: { ready_to_start_status } ."
392+ )
393+ raise JobIsNotReadyToStartError (
394+ f"Job { id !r} status is { job .status } , "
395+ f"it is not any of the ready-to-start status: { ready_to_start_status } ."
396+ )
397+ else :
398+ raise NotImplementedError (
399+ f"You found a bug! This error should be handled but not implemented yet, "
400+ f"please report to https://github.com/MacHu-GWU/sqlalchemy_mate-project/issues;"
401+ )
316402
317- lock , lock_at = job .lock_it (
403+ _ , _ = job .lock_it (
318404 engine_or_session = ses ,
319- in_progress_status = in_process_status ,
405+ in_progress_status = in_progress_status ,
320406 debug = debug ,
321407 )
322- updates .values ["status" ] = in_process_status
408+ updates .values ["status" ] = in_progress_status
323409
324410 try :
325411 # print("before yield")
@@ -328,9 +414,9 @@ def start(
328414 if debug : # pragma: no cover
329415 print (
330416 f"✅ 🔐 job succeeded, "
331- f"set status = { success_status } and unlock the job."
417+ f"set status = { succeeded_status } and unlock the job."
332418 )
333- updates .values ["status" ] = success_status
419+ updates .values ["status" ] = succeeded_status
334420 updates .values ["update_at" ] = datetime .utcnow ()
335421 updates .values ["lock" ] = None
336422 updates .values ["retry" ] = 0
@@ -342,9 +428,9 @@ def start(
342428 if debug : # pragma: no cover
343429 print (
344430 f"❌ 🔐 job failed { max_retry } times already, "
345- f"set status = { ignore_status } and unlock the job."
431+ f"set status = { ignored_status } and unlock the job."
346432 )
347- failed_updates .values ["status" ] = ignore_status
433+ failed_updates .values ["status" ] = ignored_status
348434 else :
349435 if debug : # pragma: no cover
350436 print (
@@ -356,7 +442,7 @@ def start(
356442 failed_updates .values ["lock" ] = None
357443 failed_updates .values ["errors" ] = {
358444 "error" : repr (e ),
359- "traceback" : traceback .format_exc (limit = 10 ),
445+ "traceback" : traceback .format_exc (limit = traceback_stack_limit ),
360446 }
361447 failed_updates .values ["retry" ] = job .retry + 1
362448 job .update (engine_or_session = ses , updates = failed_updates )
0 commit comments