Skip to content

Commit e602b37

Browse files
committed
Right asyncio loop syntax (following #2237)
1 parent ad58425 commit e602b37

1 file changed

Lines changed: 7 additions & 2 deletions

File tree

compute_worker/compute_worker.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,21 +1213,23 @@ def start(self):
12131213

12141214
logger.info("Running scoring program, and then ingestion program")
12151215
loop = asyncio.new_event_loop()
1216+
asyncio.set_event_loop(loop)
12161217
gathered_tasks = asyncio.gather(
12171218
self._run_program_directory(program_dir, kind="program"),
12181219
self._run_program_directory(ingestion_program_dir, kind="ingestion"),
12191220
self.watch_detailed_results(),
1220-
loop=loop,
12211221
return_exceptions=True,
12221222
)
12231223

12241224
task_results = [] # will store results/exceptions from gather
12251225
signal.signal(signal.SIGALRM, alarm_handler)
12261226
signal.alarm(self.execution_time_limit)
1227+
12271228
try:
12281229
# run tasks
12291230
# keep what gather returned so we can detect async errors later
12301231
task_results = loop.run_until_complete(gathered_tasks) or []
1232+
12311233
except ExecutionTimeLimitExceeded:
12321234
error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds"
12331235
logger.error(error_message)
@@ -1261,7 +1263,10 @@ def start(self):
12611263
# Send error through web socket to the frontend
12621264
asyncio.run(self._send_data_through_socket(error_message))
12631265
raise SubmissionException(error_message)
1266+
12641267
finally:
1268+
signal.alarm(0)
1269+
asyncio.set_event_loop(None)
12651270
loop.close()
12661271
self.watch = False
12671272
for kind, logs in self.logs.items():
@@ -1308,7 +1313,6 @@ def start(self):
13081313

13091314
# set logs of this kind to None, since we handled them already
13101315
logger.info("Program finished")
1311-
signal.alarm(0)
13121316

13131317
if self.is_scoring:
13141318
# Check if scoring program failed
@@ -1327,6 +1331,7 @@ def start(self):
13271331
# Raise so upstream marks failed immediately
13281332
raise SubmissionException("Child task failed or non-zero return code")
13291333
self._update_status(STATUS_FINISHED)
1334+
13301335
else:
13311336
self._update_status(STATUS_SCORING)
13321337

0 commit comments

Comments
 (0)