Joshua Lochner commited on
Commit
a197a49
·
1 Parent(s): 07690ba

Format utils code

Browse files
Files changed (1) hide show
  1. src/utils.py +3 -8
src/utils.py CHANGED
@@ -27,7 +27,6 @@ class Task:
27
  return self.function(*self.args, **self.kwargs)
28
 
29
 
30
-
31
  class CallbackGenerator:
32
  def __init__(self, generator, callback):
33
  self.generator = generator
@@ -42,7 +41,6 @@ class CallbackGenerator:
42
  yield from self.generator
43
 
44
 
45
-
46
  def start_worker(q: JoinableQueue, stop_event: Event): # TODO make class?
47
  logger.info('Starting worker...')
48
  while True:
@@ -63,13 +61,14 @@ def start_worker(q: JoinableQueue, stop_event: Event): # TODO make class?
63
  break
64
 
65
  try:
66
- task.run() # Do the task
67
- except: # Will also catch KeyboardInterrupt
68
  logger.exception(f'Failed to process task {task}', )
69
  # Can implement some kind of retry handling here
70
  finally:
71
  q.task_done()
72
 
 
73
  class InterruptibleTaskPool:
74
 
75
  # https://the-fonz.gitlab.io/posts/python-multiprocessing/
@@ -97,7 +96,6 @@ class InterruptibleTaskPool:
97
  # This is a process-safe version of the 'panic' variable shown above
98
  self.stop_event = Event()
99
 
100
-
101
  # n_workers: Start this many processes
102
  # max_queue_size: If queue exceeds this size, block when putting items on the queue
103
  # grace_period: Send SIGINT to processes if they don't exit within this time after SIGINT/SIGTERM
@@ -106,7 +104,6 @@ class InterruptibleTaskPool:
106
  # self.on_task_complete = on_task_complete
107
  # self.raise_after_interrupt = raise_after_interrupt
108
 
109
-
110
  def __enter__(self):
111
  self.start()
112
  return self
@@ -114,8 +111,6 @@ class InterruptibleTaskPool:
114
  def __exit__(self, exc_type, exc_value, exc_traceback):
115
  pass
116
 
117
-
118
-
119
  def start(self) -> None:
120
  def handler(signalname):
121
  """
 
27
  return self.function(*self.args, **self.kwargs)
28
 
29
 
 
30
  class CallbackGenerator:
31
  def __init__(self, generator, callback):
32
  self.generator = generator
 
41
  yield from self.generator
42
 
43
 
 
44
  def start_worker(q: JoinableQueue, stop_event: Event): # TODO make class?
45
  logger.info('Starting worker...')
46
  while True:
 
61
  break
62
 
63
  try:
64
+ task.run() # Do the task
65
+ except: # Will also catch KeyboardInterrupt
66
  logger.exception(f'Failed to process task {task}', )
67
  # Can implement some kind of retry handling here
68
  finally:
69
  q.task_done()
70
 
71
+
72
  class InterruptibleTaskPool:
73
 
74
  # https://the-fonz.gitlab.io/posts/python-multiprocessing/
 
96
  # This is a process-safe version of the 'panic' variable shown above
97
  self.stop_event = Event()
98
 
 
99
  # n_workers: Start this many processes
100
  # max_queue_size: If queue exceeds this size, block when putting items on the queue
101
  # grace_period: Send SIGINT to processes if they don't exit within this time after SIGINT/SIGTERM
 
104
  # self.on_task_complete = on_task_complete
105
  # self.raise_after_interrupt = raise_after_interrupt
106
 
 
107
  def __enter__(self):
108
  self.start()
109
  return self
 
111
  def __exit__(self, exc_type, exc_value, exc_traceback):
112
  pass
113
 
 
 
114
  def start(self) -> None:
115
  def handler(signalname):
116
  """