Skip to content

Commit c4daaaa

Browse files
committed
Optimize gTS + Refactor
1 parent 79ed19e commit c4daaaa

File tree

4 files changed

+75
-40
lines changed

4 files changed

+75
-40
lines changed
Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,23 @@
11
from task import Task
2+
from task_scheduler import TaskScheduler
23
from backup_geocml_db import backup_geocml_db
34
from restore_geocml_db_from_backups import restore_geocml_db_from_backups
45
from healthcheck_services import healthcheck_services
56
from register_with_drgon import register_with_drgon
67

7-
backup_geocml_db_task = Task(backup_geocml_db, (), 3600) # task runs every hour
8-
backup_geocml_db_task.start()
8+
scheduler = TaskScheduler()
99

10-
restore_geocml_db_task = Task(restore_geocml_db_from_backups, (), 1) # task runs every second until stopped
11-
restore_geocml_db_task.start()
10+
# DEFINE TASKS HERE
11+
backup_geocml_db_task = Task(backup_geocml_db, ())
12+
restore_geocml_db_task = Task(restore_geocml_db_from_backups, ())
13+
healthcheck_services_task = Task(healthcheck_services, ())
14+
register_with_drgon_task = Task(register_with_drgon, ())
1215

13-
healthcheck_services_task = Task(healthcheck_services, (), 60)
14-
healthcheck_services_task.start()
16+
# ADD TASKS TO SCHEDULER HERE
17+
scheduler.add(backup_geocml_db_task, 3600) # task runs every hour
18+
scheduler.add(restore_geocml_db_task, 1) # task runs every second until stopped
19+
scheduler.add(healthcheck_services_task, 60)
20+
scheduler.add(register_with_drgon_task, 60)
1521

16-
register_with_drgon_task = Task(register_with_drgon, (), 60)
17-
register_with_drgon_task.start()
1822

19-
while True:
20-
pass # keep schedule.py process running in container
23+
scheduler.tick()
Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,32 @@
11
from threading import Thread
2-
from time import sleep, time
2+
from time import time
33
from task_logger import log
44

5-
class Task(Thread):
6-
def __init__(self, target, args, timeout):
5+
class Task(object):
6+
def __init__(self, target, args):
77
"""
88
target: function
99
args: tuple
10-
timeout: int
1110
"""
1211
Thread.__init__(self)
1312
self.target = target
14-
self.args = args # TODO: do args work?
15-
self.timeout = timeout
13+
self.args = args
1614

17-
def run(self):
18-
"""
19-
Calls the target function every n seconds
20-
"""
21-
while True:
22-
sleep(self.timeout)
23-
start = time()
24-
log('Running \'{}\' at {}'.format(self.target.__name__, start), self.target)
15+
def __str__(self):
16+
return self.target.__name__
17+
18+
def start(self):
19+
start = time()
20+
log("Running \'{}\' at {}".format(str(self), str(self), start), self.target)
21+
ret_code = 1
22+
23+
try:
24+
ret_code = self.target(*self.args)
25+
log("Ran \'{}\' in {}".format(str(self), str(self), time() - start), self.target)
26+
if ret_code == 0: # ret_code 0: task stopped successfully, 1: task failed, will try again
27+
log("Stopping task \'{}\' at {} with exit code 0".format(str(self), str(self), time()),
28+
self.target)
29+
except Exception as e:
30+
log("FATAL: {} at {}".format(str(self), e, time()))
2531

26-
try:
27-
ret_code = self.target()
28-
log('Ran \'{}\' in {}'.format(self.target.__name__, time() - start), self.target)
29-
if ret_code == 0: # ret_code 0: task stopped successfully
30-
log('Stopping task \'{}\' at {} with exit code 0'.format(self.target.__name__, time()),
31-
self.target)
32-
return
33-
# TODO: ret_code 1: task stopped due to expected error
34-
# TODO: other ret_codes: log warning (ret_code {} is not valid)
35-
except Exception as e:
36-
log('FATAL: {} at {}'.format(e, time())) # TODO: should the task be stopped after this?
32+
return ret_code
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
import inspect
2+
13
task_log_path = 'task_log'
24

35
def log(message, task=None):
46
"""
57
task: function
68
message: str
79
"""
8-
f = open(task_log_path, 'a')
9-
if task == None:
10-
f.write('{}\n'.format(message))
11-
else:
12-
f.write('{}:{}\n'.format(task.__name__, message))
10+
current_frame = inspect.currentframe()
11+
f = open(task_log_path, "a")
12+
f.write("[{}]: {}\n".format(inspect.getouterframes(current_frame, 2)[1][3], message))
1313
f.close()
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from task_logger import log
2+
from time import sleep
3+
4+
class TaskScheduler(object):
5+
def __init__(self):
6+
self.schedule = {}
7+
8+
def add(self, task, timeout):
9+
"""
10+
task: Task
11+
timeout: int
12+
"""
13+
try:
14+
self.schedule[str(task)] = {
15+
"task": task,
16+
"timeout": timeout,
17+
"last_ran": 0,
18+
"ret_code": -1
19+
}
20+
except KeyError:
21+
log('FATAL: {} is already scheduled'.format(str(task)))
22+
23+
def tick(self):
24+
while True:
25+
sleep(0.25)
26+
updated_schedule = self.schedule.copy()
27+
for key, value in self.schedule.items():
28+
if value["last_ran"] >= value["timeout"]:
29+
if value["ret_code"] == 0:
30+
del updated_schedule[key]
31+
continue
32+
33+
value["last_ran"] = 0
34+
value["ret_code"] = value["task"].start()
35+
value["last_ran"] += 0.25
36+
self.schedule = updated_schedule

0 commit comments

Comments
 (0)