From 42148262575285f6a517f35c6a02692be372104f Mon Sep 17 00:00:00 2001 From: didayolo Date: Sat, 28 Feb 2026 05:48:23 +0100 Subject: [PATCH 1/9] Improve update status and logs in compute worker --- compute_worker/compute_worker.py | 140 +++++++++++++++++++------------ 1 file changed, 87 insertions(+), 53 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 2e39c1800..a31fd2a9e 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -3,6 +3,7 @@ import hashlib import json import os +import traceback import shutil import signal import socket @@ -18,7 +19,6 @@ from rich.progress import Progress from rich.pretty import pprint import requests - import websockets import yaml from billiard.exceptions import SoftTimeLimitExceeded @@ -89,22 +89,31 @@ def show_progress(line, progress): try: - if "Status: Image is up to date" in line["status"]: - logger.info(line["status"]) + status = line.get("status") or "" + layer_id = line.get("id") + detail = line.get("progressDetail") or {} + current = detail.get("current") + total = detail.get("total") + + if "Status: Image is up to date" in status: + logger.info(status) + + if not layer_id: + return completed = False - if line["status"] == "Download complete": + if status == "Download complete": description = ( - f"[blue][Download complete, waiting for extraction {line['id']}]" + f"[blue][Download complete, waiting for extraction {layer_id}]" ) completed = True - elif line["status"] == "Downloading": - description = f"[bold][Downloading {line['id']}]" - elif line["status"] == "Pull complete": - description = f"[green][Extraction complete {line['id']}]" + elif status == "Downloading": + description = f"[bold][Downloading {layer_id}]" + elif status == "Pull complete": + description = f"[green][Extraction complete {layer_id}]" completed = True - elif line["status"] == "Extracting": - description = f"[blue][Extracting {line['id']}]" + elif status == "Extracting": + description = f"[blue][Extracting {layer_id}]" else: # skip other statuses, but show extraction progress @@ -121,7 +130,7 @@ def show_progress(line, progress): ) else: tasks[task_id] = progress.add_task( - description, total=line["progressDetail"]["total"] + description, total=total ) else: if completed: @@ -134,12 +143,11 @@ def show_progress(line, progress): else: progress.update( tasks[task_id], - completed=line["progressDetail"]["current"], - total=line["progressDetail"]["total"], + completed=current, + total=total, ) except Exception as e: - logger.error("There was an error showing the progress bar") - logger.error(e) + logger.exception("There was an error showing the progress bar") # ----------------------------------------------- @@ -242,20 +250,25 @@ def rewrite_bundle_url_if_needed(url): def run_wrapper(run_args): logger.info(f"Received run arguments: \n {colorize_run_args(json.dumps(run_args))}") run = Run(run_args) - try: run.prepare() run.start() if run.is_scoring: run.push_scores() run.push_output() - except DockerImagePullException as e: - run._update_status(STATUS_FAILED, str(e)) - except SubmissionException as e: - run._update_status(STATUS_FAILED, str(e)) - except SoftTimeLimitExceeded: - run._update_status(STATUS_FAILED, "Soft time limit exceeded!") + except (DockerImagePullException, SubmissionException, SoftTimeLimitExceeded) as e: + run._update_status(STATUS_FAILED, traceback.format_exc()) + raise + except Exception as e: + # Catch any exception to avoid getting stuck in Running status + run._update_status(STATUS_FAILED, traceback.format_exc()) + raise finally: + try: + # Try to push logs before cleanup + run.push_logs() + except Exception: + logger.exception("push_logs failed") run.clean_up() @@ -444,6 +457,22 @@ async def watch_detailed_results(self): if file_path: await self.send_detailed_results(file_path) + def push_logs(self): + """Upload any collected logs, even in case of crash. + """ + try: + for kind, logs in (self.logs or {}).items(): + for stream_key in ("stdout", "stderr"): + entry = logs.get(stream_key) if isinstance(logs, dict) else None + if not entry: + continue + location = entry.get("location") + data = entry.get("data") or b"" + if location: + self._put_file(location, raw_data=data) + except Exception as e: + logger.exception(f"Failed best-effort log upload: {e}") + def get_detailed_results_file_path(self): default_detailed_results_path = os.path.join( self.output_dir, "detailed_results.html" @@ -465,7 +494,7 @@ async def send_detailed_results(self, file_path): ) websocket_url = f"{self.websocket_url}?kind=detailed_results" logger.info(f"Connecting to {websocket_url} for detailed results") - # Wrap this with a Try ... Except otherwise a failure here will make the submission get stuck on Running + # Wrap this with a Try block to avoid getting stuck on Running try: websocket = await asyncio.wait_for( websockets.connect(websocket_url), timeout=30.0 @@ -483,9 +512,7 @@ async def send_detailed_results(self, file_path): ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) - raise SubmissionException( - "Could not connect to instance to update detailed result" - ) + return def _get_stdout_stderr_file_names(self, run_args): # run_args should be the run_args argument passed to __init__ from the run_wrapper. @@ -511,7 +538,7 @@ def _update_submission(self, data): logger.info(f"Updating submission @ {url} with data = {data}") - resp = self.requests_session.patch(url, data, timeout=150) + resp = self.requests_session.patch(url, data=data, timeout=150) if resp.status_code == 200: logger.info("Submission updated successfully!") else: @@ -521,23 +548,17 @@ def _update_submission(self, data): raise SubmissionException("Failure updating submission data.") def _update_status(self, status, extra_information=None): + # Update submission status if status not in AVAILABLE_STATUSES: raise SubmissionException( f"Status '{status}' is not in available statuses: {AVAILABLE_STATUSES}" ) - - data = { - "status": status, - "status_details": extra_information, - } - - # TODO: figure out if we should pull this task code later(submission.task should always be set) - # When we start - # if status == STATUS_SCORING: - # data.update({ - # "task_pk": self.task_pk, - # }) - self._update_submission(data) + data = {"status": status, "status_details": extra_information} + try: + self._update_submission(data) + except Exception as e: + # Always catch exception and never raise error + logger.exception(f"Failed to update submission status to {status}: {e}") def _get_container_image(self, image_name): logger.info("Running pull for image: {}".format(image_name)) @@ -547,6 +568,8 @@ def _get_container_image(self, image_name): with Progress() as progress: resp = client.pull(image_name, stream=True, decode=True) for line in resp: + if isinstance(line, dict) and line.get("error"): + raise DockerImagePullException(line["error"]) show_progress(line, progress) break # Break if the loop is successful to exit "with Progress() as progress" @@ -684,6 +707,7 @@ async def _run_container_engine_cmd(self, container, kind): # Create a websocket to send the logs in real time to the codabench instance # We need to set a timeout for the websocket connection otherwise the program will get stuck if he websocket does not connect. + websocket = None try: websocket_url = f"{self.websocket_url}?kind={kind}" logger.debug( @@ -733,18 +757,20 @@ async def _run_container_engine_cmd(self, container, kind): if str(log[0]) != "None": logger.info(log[0].decode()) try: - await websocket.send( - json.dumps({"kind": kind, "message": log[0].decode()}) - ) + if websocket is not None: + await websocket.send( + json.dumps({"kind": kind, "message": log[0].decode()}) + ) except Exception as e: logger.error(e) elif str(log[1]) != "None": logger.error(log[1].decode()) try: - await websocket.send( - json.dumps({"kind": kind, "message": log[1].decode()}) - ) + if websocket is not None: + await websocket.send( + json.dumps({"kind": kind, "message": log[1].decode()}) + ) except Exception as e: logger.error(e) @@ -765,7 +791,8 @@ async def _run_container_engine_cmd(self, container, kind): logger.debug( f"WORKER_MARKER: Disconnecting from {websocket_url}, program counter = {self.completed_program_counter}" ) - await websocket.close() + if websocket is not None: + await websocket.close() client.remove_container(container, force=True) logger.debug( @@ -783,6 +810,13 @@ async def _run_container_engine_cmd(self, container, kind): logger.error(e) return_Code = {"StatusCode": 1} + finally: + try: + # Last chance of removing container + client.remove_container(container_id, force=True) + except Exception: + pass + self.logs[kind] = { "returncode": return_Code["StatusCode"], "start": start, @@ -1053,9 +1087,8 @@ async def _run_program_directory(self, program_dir, kind): try: return await self._run_container_engine_cmd(container, kind=kind) except Exception as e: - logger.error(e) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": - logger.exception(e) + logger.exception("Program directory execution failed") + raise SubmissionException(str(e)) def _put_dir(self, url, directory): """Zip the directory and send it to the given URL using _put_file.""" @@ -1097,7 +1130,7 @@ def _put_file(self, url, file=None, raw_data=None, content_type="application/zip logger.info("Putting file %s in %s" % (file, url)) data = open(file, "rb") headers["Content-Length"] = str(os.path.getsize(file)) - elif raw_data: + elif raw_data is not None: logger.info("Putting raw data %s in %s" % (raw_data, url)) data = raw_data else: @@ -1232,6 +1265,7 @@ def start(self): asyncio.run(self._send_data_through_socket(error_message)) raise SubmissionException(error_message) finally: + loop.close() self.watch = False for kind, logs in self.logs.items(): if logs["end"] is not None: @@ -1287,7 +1321,7 @@ def start(self): program_results, BaseException ) and not isinstance(program_results, asyncio.CancelledError) program_rc = getattr(self, "program_exit_code", None) - failed_rc = program_rc not in (0, None) + failed_rc = (program_rc is None) or (program_rc != 0) if had_async_exc or failed_rc: self._update_status( STATUS_FAILED, From 529c8120025d117cc484c8155b886e875bb19aba Mon Sep 17 00:00:00 2001 From: didayolo Date: Sat, 28 Feb 2026 06:30:07 +0100 Subject: [PATCH 2/9] Increase test timeout to check if the issue is delay --- tests/test_submission.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_submission.py b/tests/test_submission.py index eb99d4f4f..48077fa6d 100644 --- a/tests/test_submission.py +++ b/tests/test_submission.py @@ -43,7 +43,7 @@ def run_tests(page, competition, submission) -> None: expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=25000) except: page.reload() - expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=2000) + expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=25000) # Add to leaderboard and see if shows text = page.locator(".submission_row").first.inner_text() submission_Id = text.split(None, 1) @@ -123,7 +123,7 @@ def test_v2_multiTask(page: Page) -> None: expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=35000) except: page.reload() - expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=2000) + expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=25000) # Add to leaderboard and see if shows text = page.locator(".submission_row").first.inner_text() submission_Id = text.split(None, 1) @@ -183,7 +183,7 @@ def test_v2_multiTaskFactSheet(page: Page) -> None: expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=35000) except: page.reload() - expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=2000) + expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=25000) # Add to leaderboard and see if shows text = page.locator(".submission_row").first.inner_text() submission_Id = text.split(None, 1) From e606e527f20f15b8df86d628d621a9aaaa04e8b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Pav=C3=A3o?= Date: Mon, 2 Mar 2026 16:14:33 +0100 Subject: [PATCH 3/9] Reverse test update (Update test_submission.py) --- tests/test_submission.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_submission.py b/tests/test_submission.py index 48077fa6d..8a584b7dd 100644 --- a/tests/test_submission.py +++ b/tests/test_submission.py @@ -43,7 +43,7 @@ def run_tests(page, competition, submission) -> None: expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=25000) except: page.reload() - expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=25000) + expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=3000) # Add to leaderboard and see if shows text = page.locator(".submission_row").first.inner_text() submission_Id = text.split(None, 1) @@ -123,7 +123,7 @@ def test_v2_multiTask(page: Page) -> None: expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=35000) except: page.reload() - expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=25000) + expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=3000) # Add to leaderboard and see if shows text = page.locator(".submission_row").first.inner_text() submission_Id = text.split(None, 1) @@ -183,7 +183,7 @@ def test_v2_multiTaskFactSheet(page: Page) -> None: expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=35000) except: page.reload() - expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=25000) + expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=3000) # Add to leaderboard and see if shows text = page.locator(".submission_row").first.inner_text() submission_Id = text.split(None, 1) From 83d227b4023b9e32964627f78fe727fd509faf9b Mon Sep 17 00:00:00 2001 From: didayolo Date: Tue, 3 Mar 2026 14:41:27 +0100 Subject: [PATCH 4/9] Simplify exception --- compute_worker/compute_worker.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index a31fd2a9e..e0650d0b8 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -507,11 +507,7 @@ async def send_detailed_results(self, file_path): ) ) except Exception as e: - logger.error( - f"This error might result in a Execution Time Exceeded error: {e}" - ) - if os.environ.get("LOG_LEVEL", "info").lower() == "debug": - logger.exception(e) + logger.exception(e) return def _get_stdout_stderr_file_names(self, run_args): From ed0fef5145e587c35c6c1a374aae82d722a81374 Mon Sep 17 00:00:00 2001 From: didayolo Date: Wed, 4 Mar 2026 09:11:45 +0100 Subject: [PATCH 5/9] Fix submission timing --- src/apps/competitions/tasks.py | 66 +++++++++++++++++----------------- tests/test_submission.py | 6 ++-- 2 files changed, 35 insertions(+), 37 deletions(-) diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index 4c5e495b8..32372cc7f 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -15,6 +15,7 @@ from django.core.exceptions import ObjectDoesNotExist from django.core.files.base import ContentFile from django.db.models import Subquery, OuterRef, Count, Case, When, Value, F +from django.db import transaction from django.utils.text import slugify from django.utils.timezone import now from rest_framework.exceptions import ValidationError @@ -141,14 +142,6 @@ def _send_to_compute_worker(submission, is_scoring): submission = Submission.objects.get(id=submission.id) task = submission.task - # priority of scoring tasks is higher, we don't want to wait around for - # many submissions to be scored while we're waiting for results - if is_scoring: - # higher numbers are higher priority - priority = 10 - else: - priority = 0 - if not is_scoring: run_args['prediction_result'] = make_url_sassy( path=submission.prediction_result.name, @@ -201,40 +194,45 @@ def _send_to_compute_worker(submission, is_scoring): time_padding = 60 * 20 # 20 minutes time_limit = submission.phase.execution_time_limit + time_padding - if submission.phase.competition.queue: # if the competition is running on a custom queue, not the default queue - submission.queue_name = submission.phase.competition.queue.name or '' - run_args['execution_time_limit'] = submission.phase.execution_time_limit # use the competition time limit - submission.save() - - # Send to special queue? Using `celery_app` var name here since we'd be overriding the imported `app` - # variable above - celery_app = app_or_default() - with celery_app.connection() as new_connection: - new_connection.virtual_host = str(submission.phase.competition.queue.vhost) - task = celery_app.send_task( + def _enqueue_after_commit(): + # priority of scoring tasks is higher, we don't want to wait around for + # many submissions to be scored while we're waiting for results + priority = 10 if is_scoring else 0 + + if submission.phase.competition.queue: # if the competition is running on a custom queue, not the default queue + submission.queue_name = submission.phase.competition.queue.name or '' + run_args['execution_time_limit'] = submission.phase.execution_time_limit # use the competition time limit + submission.save(update_fields=["queue_name"]) + celery_app = app_or_default() + with celery_app.connection() as new_connection: + new_connection.virtual_host = str(submission.phase.competition.queue.vhost) + task = celery_app.send_task( + 'compute_worker_run', + args=(run_args,), + queue='compute-worker', + soft_time_limit=time_limit, + connection=new_connection, + priority=priority, + ) + else: + task = app.send_task( 'compute_worker_run', args=(run_args,), queue='compute-worker', soft_time_limit=time_limit, - connection=new_connection, priority=priority, ) - else: - task = app.send_task( - 'compute_worker_run', - args=(run_args,), - queue='compute-worker', - soft_time_limit=time_limit, - priority=priority, - ) - submission.celery_task_id = task.id - if submission.status == Submission.SUBMITTING: - # Don't want to mark an already-prepared submission as "submitted" again, so - # only do this if we were previously "SUBMITTING" - submission.status = Submission.SUBMITTED + submission.celery_task_id = task.id + + if submission.status == Submission.SUBMITTING: + # Don't want to mark an already-prepared submission as "submitted" again, so + # only do this if we were previously "SUBMITTING" + submission.status = Submission.SUBMITTED + + submission.save(update_fields=["celery_task_id", "status"]) - submission.save() + transaction.on_commit(_enqueue_after_commit) def create_detailed_output_file(detail_name, submission): diff --git a/tests/test_submission.py b/tests/test_submission.py index 8a584b7dd..eb99d4f4f 100644 --- a/tests/test_submission.py +++ b/tests/test_submission.py @@ -43,7 +43,7 @@ def run_tests(page, competition, submission) -> None: expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=25000) except: page.reload() - expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=3000) + expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=2000) # Add to leaderboard and see if shows text = page.locator(".submission_row").first.inner_text() submission_Id = text.split(None, 1) @@ -123,7 +123,7 @@ def test_v2_multiTask(page: Page) -> None: expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=35000) except: page.reload() - expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=3000) + expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=2000) # Add to leaderboard and see if shows text = page.locator(".submission_row").first.inner_text() submission_Id = text.split(None, 1) @@ -183,7 +183,7 @@ def test_v2_multiTaskFactSheet(page: Page) -> None: expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=35000) except: page.reload() - expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=3000) + expect(page.get_by_role("cell", name="Finished")).to_be_visible(timeout=2000) # Add to leaderboard and see if shows text = page.locator(".submission_row").first.inner_text() submission_Id = text.split(None, 1) From c4e2211e23387f084e3af6c63daf13308848632e Mon Sep 17 00:00:00 2001 From: didayolo Date: Wed, 4 Mar 2026 09:35:49 +0100 Subject: [PATCH 6/9] Fix scope --- src/apps/competitions/tasks.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index 32372cc7f..5d71a8e4d 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -194,15 +194,21 @@ def _send_to_compute_worker(submission, is_scoring): time_padding = 60 * 20 # 20 minutes time_limit = submission.phase.execution_time_limit + time_padding + if submission.phase.competition.queue: # if the competition is running on a custom queue, not the default queue + submission.queue_name = submission.phase.competition.queue.name or '' + run_args['execution_time_limit'] = submission.phase.execution_time_limit # use the competition time limit + submission.save(update_fields=["queue_name"]) + if submission.status == Submission.SUBMITTING: + # Don't want to mark an already-prepared submission as "submitted" again, so + # only do this if we were previously "SUBMITTING" + submission.status = Submission.SUBMITTED + submission.save(update_fields=["status"]) + def _enqueue_after_commit(): # priority of scoring tasks is higher, we don't want to wait around for # many submissions to be scored while we're waiting for results priority = 10 if is_scoring else 0 - - if submission.phase.competition.queue: # if the competition is running on a custom queue, not the default queue - submission.queue_name = submission.phase.competition.queue.name or '' - run_args['execution_time_limit'] = submission.phase.execution_time_limit # use the competition time limit - submission.save(update_fields=["queue_name"]) + if submission.phase.competition.queue: celery_app = app_or_default() with celery_app.connection() as new_connection: new_connection.virtual_host = str(submission.phase.competition.queue.vhost) @@ -222,15 +228,8 @@ def _enqueue_after_commit(): soft_time_limit=time_limit, priority=priority, ) - submission.celery_task_id = task.id - - if submission.status == Submission.SUBMITTING: - # Don't want to mark an already-prepared submission as "submitted" again, so - # only do this if we were previously "SUBMITTING" - submission.status = Submission.SUBMITTED - - submission.save(update_fields=["celery_task_id", "status"]) + submission.save(update_fields=["celery_task_id"]) transaction.on_commit(_enqueue_after_commit) From 8f8714229205241ab04619b58fa1e86dd8cae9dc Mon Sep 17 00:00:00 2001 From: didayolo Date: Wed, 4 Mar 2026 09:53:59 +0100 Subject: [PATCH 7/9] Ensure task is called in the tests --- src/apps/competitions/tests/test_submissions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/apps/competitions/tests/test_submissions.py b/src/apps/competitions/tests/test_submissions.py index a7ae024f2..ee5cdc850 100644 --- a/src/apps/competitions/tests/test_submissions.py +++ b/src/apps/competitions/tests/test_submissions.py @@ -247,7 +247,8 @@ def __init__(self): task = Task() celery_app.return_value = task mock_sassy.return_value = '' - run_submission(submission.pk) + with self.captureOnCommitCallbacks(execute=True): + run_submission(submission.pk) return celery_app def test_making_submission_creates_parent_sub_and_additional_sub_per_task(self): From 8e7a9d536830fb0d6e51c30a6dac610751d392ed Mon Sep 17 00:00:00 2001 From: didayolo Date: Wed, 4 Mar 2026 11:45:58 +0100 Subject: [PATCH 8/9] Add progress bar error only if log level is debug --- compute_worker/compute_worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index e0650d0b8..8dd916336 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -147,7 +147,8 @@ def show_progress(line, progress): total=total, ) except Exception as e: - logger.exception("There was an error showing the progress bar") + if os.environ.get("LOG_LEVEL", "info").lower() == "debug": + logger.exception("There was an error showing the progress bar") # ----------------------------------------------- From a92fc2d57b2ec31877887df2c70b2e8e00028f7e Mon Sep 17 00:00:00 2001 From: didayolo Date: Sat, 7 Mar 2026 05:41:54 +0100 Subject: [PATCH 9/9] Right asyncio loop syntax (following #2237) --- compute_worker/compute_worker.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 8dd916336..ef57868b3 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1213,21 +1213,23 @@ def start(self): logger.info("Running scoring program, and then ingestion program") loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) gathered_tasks = asyncio.gather( self._run_program_directory(program_dir, kind="program"), self._run_program_directory(ingestion_program_dir, kind="ingestion"), self.watch_detailed_results(), - loop=loop, return_exceptions=True, ) task_results = [] # will store results/exceptions from gather signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) + try: # run tasks # keep what gather returned so we can detect async errors later task_results = loop.run_until_complete(gathered_tasks) or [] + except ExecutionTimeLimitExceeded: error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds" logger.error(error_message) @@ -1261,7 +1263,10 @@ def start(self): # Send error through web socket to the frontend asyncio.run(self._send_data_through_socket(error_message)) raise SubmissionException(error_message) + finally: + signal.alarm(0) + asyncio.set_event_loop(None) loop.close() self.watch = False for kind, logs in self.logs.items(): @@ -1308,7 +1313,6 @@ def start(self): # set logs of this kind to None, since we handled them already logger.info("Program finished") - signal.alarm(0) if self.is_scoring: # Check if scoring program failed @@ -1327,6 +1331,7 @@ def start(self): # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") self._update_status(STATUS_FINISHED) + else: self._update_status(STATUS_SCORING)