From 53ebf03bf4c78d62b283511840c6937f8d7ae5b5 Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Tue, 31 Mar 2026 14:08:50 +0200 Subject: [PATCH 1/9] feature ok / needs to be tested --- src/apps/competitions/tasks.py | 62 +- src/celery_config.py | 9 + src/routing.py | 3 + .../riot/competitions/detail/detail.tag | 837 ++++++++++++------ src/urls.py | 1 - src/utils/consumers.py | 72 ++ 6 files changed, 731 insertions(+), 253 deletions(-) create mode 100644 src/utils/consumers.py diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index d72e9191a..95c30fd35 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -32,6 +32,9 @@ from utils.data import make_url_sassy from utils.email import codalab_send_markdown_email +from channels.layers import get_channel_layer +from asgiref.sync import async_to_sync + import logging logger = logging.getLogger(__name__) @@ -784,9 +787,66 @@ def submission_status_cleanup(): submissions = Submission.objects.filter(status=Submission.RUNNING, has_children=False).select_related('phase', 'parent') for sub in submissions: - # Check if the submission has been running for 24 hours longer than execution_time_limit if sub.started_when < now() - timedelta(milliseconds=(3600000 * 24) + sub.phase.execution_time_limit): if sub.parent is not None: sub.parent.cancel(status=Submission.FAILED) else: sub.cancel(status=Submission.FAILED) + + +def _broadcast_worker_state(payload): + channel_layer = get_channel_layer() + if not channel_layer: + return + + async_to_sync(channel_layer.group_send)( + "compute_workers", + { + "type": "worker.health", + "worker": payload, + }, + ) + + +@app.task(queue="site-worker", soft_time_limit=60) +def refresh_compute_worker_health(): + celery_app = app_or_default() + inspector = celery_app.control.inspect(timeout=1) + + if inspector is None: + logger.warning("Celery inspect returned None") + return + + try: + stats = inspector.stats() or {} + active = inspector.active() or {} + reserved = inspector.reserved() or {} + except Exception: + logger.exception("Unable to inspect Celery workers") + return + + for worker_name in stats.keys(): + if not worker_name.startswith("compute-worker"): + continue + + raw_running_jobs = len(active.get(worker_name, [])) + len(reserved.get(worker_name, [])) + status = "busy" if raw_running_jobs > 0 else "available" + + payload = { + "hostname": worker_name, + "status": status, + "running_jobs": raw_running_jobs, + "timestamp": now().timestamp(), + } + + r.set(f"worker:{worker_name}:heartbeat", json.dumps(payload), ex=35) + r.hset( + WORKERS_REGISTRY_KEY, + worker_name, + json.dumps({ + "hostname": worker_name, + "last_seen": payload["timestamp"], + }), + ) + + _broadcast_worker_state(payload) \ No newline at end of file diff --git a/src/celery_config.py b/src/celery_config.py index 760614783..995103fcc 100644 --- a/src/celery_config.py +++ b/src/celery_config.py @@ -36,3 +36,12 @@ def app_for_vhost(vhost): vhost_app.conf.task_queues = app.conf.task_queues _vhost_apps[vhost] = vhost_app return _vhost_apps[vhost] + + + +app.conf.beat_schedule = { + "refresh-compute-worker-health": { + "task": "chemin.vers.refresh_compute_worker_health", + "schedule": 5.0, + }, +} \ No newline at end of file diff --git a/src/routing.py b/src/routing.py index 2ef280e73..adf3f44cc 100644 --- a/src/routing.py +++ b/src/routing.py @@ -1,7 +1,10 @@ from django.urls import re_path from apps.competitions.consumers import SubmissionIOConsumer, SubmissionOutputConsumer +from utils.consumers import ComputeWorkersConsumer + websocket_urlpatterns = [ re_path(r'submission_input/(?P\d+)/(?P\d+)/(?P[^/]+)/$', SubmissionIOConsumer.as_asgi()), re_path(r'submission_output/$', SubmissionOutputConsumer.as_asgi()), + re_path(r"ws/workers/$", ComputeWorkersConsumer.as_asgi()), ] diff --git a/src/static/riot/competitions/detail/detail.tag b/src/static/riot/competitions/detail/detail.tag index f971675b3..f1dce44fa 100644 --- a/src/static/riot/competitions/detail/detail.tag +++ b/src/static/riot/competitions/detail/detail.tag @@ -6,25 +6,13 @@ - - -