From 7c59044baab4733d8f4f13034506276818cef196 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 27 Mar 2026 11:03:48 +0500 Subject: [PATCH 1/2] Fix pipeline fetcher deadlock --- .../pipeline_tasks/instances/__init__.py | 16 +++-- .../background/pipeline_tasks/jobs_running.py | 10 ++- .../pipeline_tasks/jobs_submitted.py | 10 ++- .../test_instances/test_pipeline.py | 59 ++++++++++++++++++ .../pipeline_tasks/test_running_jobs.py | 58 +++++++++++++++++ .../pipeline_tasks/test_submitted_jobs.py | 62 +++++++++++++++++++ 6 files changed, 203 insertions(+), 12 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py b/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py index 8c31cd537..dfc4c3d41 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/instances/__init__.py @@ -166,17 +166,21 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]: ) ), InstanceModel.deleted == False, - or_( - # Do not try to lock instances if the fleet is waiting for the lock. - InstanceModel.fleet_id.is_(None), - FleetModel.lock_owner.is_(None), - ), or_( InstanceModel.last_processed_at <= now - self._min_processing_interval, InstanceModel.last_processed_at == InstanceModel.created_at, ), or_( - InstanceModel.lock_expires_at.is_(None), + and_( + # Do not try to lock instances if the fleet is waiting for the + # lock, but allow retrying instances whose own lock is stale + # because the fleet pipeline cannot reclaim stale instance locks. + or_( + InstanceModel.fleet_id.is_(None), + FleetModel.lock_owner.is_(None), + ), + InstanceModel.lock_expires_at.is_(None), + ), InstanceModel.lock_expires_at < now, ), or_( diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py index 5e061ee77..73af7d302 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py @@ -197,10 +197,14 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]: ), RunModel.status.not_in([RunStatus.TERMINATING]), JobModel.last_processed_at <= now - self._min_processing_interval, - # Do not try to lock jobs if the run is waiting for the lock. - RunModel.lock_owner.is_(None), or_( - JobModel.lock_expires_at.is_(None), + and_( + # Do not try to lock jobs if the run is waiting for the lock, + # but allow retrying jobs whose own lock is stale because + # the run pipeline cannot reclaim stale job locks. + RunModel.lock_owner.is_(None), + JobModel.lock_expires_at.is_(None), + ), JobModel.lock_expires_at < now, ), or_( diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 81c407a05..3cf97d647 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -243,10 +243,14 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]: JobModel.last_processed_at <= now - self._min_processing_interval, JobModel.last_processed_at == JobModel.submitted_at, ), - # Do not try to lock jobs if the run is waiting for the lock. - RunModel.lock_owner.is_(None), or_( - JobModel.lock_expires_at.is_(None), + and_( + # Do not try to lock jobs if the run is waiting for the lock, + # but allow retrying jobs whose own lock is stale because + # the run pipeline cannot reclaim stale job locks. + RunModel.lock_owner.is_(None), + JobModel.lock_expires_at.is_(None), + ), JobModel.lock_expires_at < now, ), or_( diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py b/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py index 012c7fdb3..2e31567ae 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_instances/test_pipeline.py @@ -6,6 +6,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from dstack._internal.core.models.instances import InstanceStatus +from dstack._internal.server.background.pipeline_tasks.fleets import FleetPipeline from dstack._internal.server.background.pipeline_tasks.instances import ( InstanceFetcher, InstancePipeline, @@ -192,6 +193,64 @@ async def test_fetch_respects_order_and_limit( assert middle.lock_owner == InstancePipeline.__name__ assert newest.lock_owner is None + async def test_fetch_allows_stale_instance_locks_if_fleet_is_waiting_for_instance_locks( + self, test_db, session: AsyncSession, fetcher: InstanceFetcher + ): + project = await create_project(session=session) + fleet = await create_fleet(session=session, project=project) + stale = get_current_datetime() - dt.timedelta(minutes=1) + + fleet.lock_owner = FleetPipeline.__name__ + fleet.lock_token = None + fleet.lock_expires_at = None + + instance = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.IDLE, + name="stale-locked", + last_processed_at=stale - dt.timedelta(seconds=1), + ) + lock_instance(instance) + instance.lock_expires_at = stale + await session.commit() + + items = await fetcher.fetch(limit=10) + + assert [item.id for item in items] == [instance.id] + + await session.refresh(instance) + assert instance.lock_owner == InstancePipeline.__name__ + + async def test_fetch_excludes_fresh_instances_when_fleet_is_waiting_for_instance_locks( + self, test_db, session: AsyncSession, fetcher: InstanceFetcher + ): + project = await create_project(session=session) + fleet = await create_fleet(session=session, project=project) + stale = get_current_datetime() - dt.timedelta(minutes=1) + + fleet.lock_owner = FleetPipeline.__name__ + fleet.lock_token = None + fleet.lock_expires_at = None + + instance = await create_instance( + session=session, + project=project, + fleet=fleet, + status=InstanceStatus.IDLE, + name="fresh-unlocked", + last_processed_at=stale - dt.timedelta(seconds=1), + ) + await session.commit() + + items = await fetcher.fetch(limit=10) + + assert items == [] + + await session.refresh(instance) + assert instance.lock_owner is None + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py index a52924a55..5f113a142 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py @@ -40,6 +40,7 @@ _RunnerAvailability, _SubmitJobToRunnerResult, ) +from dstack._internal.server.background.pipeline_tasks.runs import RunPipeline from dstack._internal.server.models import JobModel, ProbeModel from dstack._internal.server.schemas.runner import ( HealthcheckResponse, @@ -307,6 +308,63 @@ async def test_fetch_excludes_jobs_from_terminating_runs( assert active_job.lock_owner == JobRunningPipeline.__name__ assert terminating_run_job.lock_owner is None + async def test_fetch_allows_stale_job_locks_even_if_run_is_waiting_for_job_locks( + self, test_db, session: AsyncSession, fetcher: JobRunningFetcher + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + run = await create_run(session=session, project=project, repo=repo, user=user) + stale = get_current_datetime() - timedelta(minutes=1) + + run.lock_owner = RunPipeline.__name__ + run.lock_token = None + run.lock_expires_at = None + + job = await create_job( + session=session, + run=run, + status=JobStatus.RUNNING, + last_processed_at=stale - timedelta(seconds=1), + ) + _lock_job_expired_same_owner(job) + await session.commit() + + items = await fetcher.fetch(limit=10) + + assert [item.id for item in items] == [job.id] + + await session.refresh(job) + assert job.lock_owner == JobRunningPipeline.__name__ + + async def test_fetch_excludes_jobs_when_run_is_waiting_for_related_job_locks( + self, test_db, session: AsyncSession, fetcher: JobRunningFetcher + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + run = await create_run(session=session, project=project, repo=repo, user=user) + stale = get_current_datetime() - timedelta(minutes=1) + + run.lock_owner = RunPipeline.__name__ + run.lock_token = None + run.lock_expires_at = None + + job = await create_job( + session=session, + run=run, + status=JobStatus.RUNNING, + last_processed_at=stale - timedelta(seconds=1), + ) + await session.commit() + + items = await fetcher.fetch(limit=10) + + assert items == [] + + await session.refresh(job) + assert job.lock_owner is None + async def test_fetch_returns_oldest_jobs_first_up_to_limit( self, test_db, session: AsyncSession, fetcher: JobRunningFetcher ): diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index 22de460ce..d6f2df29f 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -28,6 +28,7 @@ JobSubmittedPipelineItem, JobSubmittedWorker, ) +from dstack._internal.server.background.pipeline_tasks.runs import RunPipeline from dstack._internal.server.models import ( ComputeGroupModel, InstanceModel, @@ -251,6 +252,67 @@ async def test_fetch_selects_eligible_jobs_and_sets_lock_fields( assert recent_retry.lock_owner is None assert foreign_locked.lock_owner == "OtherPipeline" + async def test_fetch_allows_stale_job_locks_even_if_run_is_waiting_for_job_locks( + self, test_db, session: AsyncSession, fetcher: JobSubmittedFetcher + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project) + run = await create_run(session=session, project=project, repo=repo, user=user, fleet=fleet) + stale = get_current_datetime() - timedelta(minutes=1) + + run.lock_owner = RunPipeline.__name__ + run.lock_token = None + run.lock_expires_at = None + + job = await create_job( + session=session, + run=run, + status=JobStatus.SUBMITTED, + submitted_at=stale - timedelta(minutes=1), + last_processed_at=stale - timedelta(seconds=1), + ) + _lock_job_expired_same_owner(job) + await session.commit() + + items = await fetcher.fetch(limit=10) + + assert [item.id for item in items] == [job.id] + + await session.refresh(job) + assert job.lock_owner == JobSubmittedPipeline.__name__ + + async def test_fetch_excludes_fresh_jobs_when_run_is_waiting_for_job_locks( + self, test_db, session: AsyncSession, fetcher: JobSubmittedFetcher + ): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet = await create_fleet(session=session, project=project) + run = await create_run(session=session, project=project, repo=repo, user=user, fleet=fleet) + stale = get_current_datetime() - timedelta(minutes=1) + + run.lock_owner = RunPipeline.__name__ + run.lock_token = None + run.lock_expires_at = None + + job = await create_job( + session=session, + run=run, + status=JobStatus.SUBMITTED, + submitted_at=stale - timedelta(minutes=1), + last_processed_at=stale - timedelta(seconds=1), + ) + await session.commit() + + items = await fetcher.fetch(limit=10) + + assert items == [] + + await session.refresh(job) + assert job.lock_owner is None + async def test_fetch_orders_by_priority_then_last_processed_at( self, test_db, session: AsyncSession, fetcher: JobSubmittedFetcher ): From 25659a6bd94ab13189910e3eda4a182021106845 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 27 Mar 2026 11:45:56 +0500 Subject: [PATCH 2/2] Prioritize JobSubmittedPipeline over RunPipeline --- .../pipeline_tasks/jobs_submitted.py | 10 +-- .../pipeline_tasks/test_submitted_jobs.py | 62 ------------------- 2 files changed, 3 insertions(+), 69 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 3cf97d647..2e7270462 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -244,13 +244,9 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]: JobModel.last_processed_at == JobModel.submitted_at, ), or_( - and_( - # Do not try to lock jobs if the run is waiting for the lock, - # but allow retrying jobs whose own lock is stale because - # the run pipeline cannot reclaim stale job locks. - RunModel.lock_owner.is_(None), - JobModel.lock_expires_at.is_(None), - ), + # This pipeline does not check RunModel.lock_owner + # because we want to provision jobs ASAP and RunPipeline can wait. + JobModel.lock_expires_at.is_(None), JobModel.lock_expires_at < now, ), or_( diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py index d6f2df29f..22de460ce 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_submitted_jobs.py @@ -28,7 +28,6 @@ JobSubmittedPipelineItem, JobSubmittedWorker, ) -from dstack._internal.server.background.pipeline_tasks.runs import RunPipeline from dstack._internal.server.models import ( ComputeGroupModel, InstanceModel, @@ -252,67 +251,6 @@ async def test_fetch_selects_eligible_jobs_and_sets_lock_fields( assert recent_retry.lock_owner is None assert foreign_locked.lock_owner == "OtherPipeline" - async def test_fetch_allows_stale_job_locks_even_if_run_is_waiting_for_job_locks( - self, test_db, session: AsyncSession, fetcher: JobSubmittedFetcher - ): - project = await create_project(session=session) - user = await create_user(session=session) - repo = await create_repo(session=session, project_id=project.id) - fleet = await create_fleet(session=session, project=project) - run = await create_run(session=session, project=project, repo=repo, user=user, fleet=fleet) - stale = get_current_datetime() - timedelta(minutes=1) - - run.lock_owner = RunPipeline.__name__ - run.lock_token = None - run.lock_expires_at = None - - job = await create_job( - session=session, - run=run, - status=JobStatus.SUBMITTED, - submitted_at=stale - timedelta(minutes=1), - last_processed_at=stale - timedelta(seconds=1), - ) - _lock_job_expired_same_owner(job) - await session.commit() - - items = await fetcher.fetch(limit=10) - - assert [item.id for item in items] == [job.id] - - await session.refresh(job) - assert job.lock_owner == JobSubmittedPipeline.__name__ - - async def test_fetch_excludes_fresh_jobs_when_run_is_waiting_for_job_locks( - self, test_db, session: AsyncSession, fetcher: JobSubmittedFetcher - ): - project = await create_project(session=session) - user = await create_user(session=session) - repo = await create_repo(session=session, project_id=project.id) - fleet = await create_fleet(session=session, project=project) - run = await create_run(session=session, project=project, repo=repo, user=user, fleet=fleet) - stale = get_current_datetime() - timedelta(minutes=1) - - run.lock_owner = RunPipeline.__name__ - run.lock_token = None - run.lock_expires_at = None - - job = await create_job( - session=session, - run=run, - status=JobStatus.SUBMITTED, - submitted_at=stale - timedelta(minutes=1), - last_processed_at=stale - timedelta(seconds=1), - ) - await session.commit() - - items = await fetcher.fetch(limit=10) - - assert items == [] - - await session.refresh(job) - assert job.lock_owner is None - async def test_fetch_orders_by_priority_then_last_processed_at( self, test_db, session: AsyncSession, fetcher: JobSubmittedFetcher ):