Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,21 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]:
)
),
InstanceModel.deleted == False,
or_(
# Do not try to lock instances if the fleet is waiting for the lock.
InstanceModel.fleet_id.is_(None),
FleetModel.lock_owner.is_(None),
),
or_(
InstanceModel.last_processed_at <= now - self._min_processing_interval,
InstanceModel.last_processed_at == InstanceModel.created_at,
),
or_(
InstanceModel.lock_expires_at.is_(None),
and_(
# Do not try to lock instances if the fleet is waiting for the
# lock, but allow retrying instances whose own lock is stale
# because the fleet pipeline cannot reclaim stale instance locks.
or_(
InstanceModel.fleet_id.is_(None),
FleetModel.lock_owner.is_(None),
),
InstanceModel.lock_expires_at.is_(None),
),
InstanceModel.lock_expires_at < now,
),
or_(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,14 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]:
),
RunModel.status.not_in([RunStatus.TERMINATING]),
JobModel.last_processed_at <= now - self._min_processing_interval,
# Do not try to lock jobs if the run is waiting for the lock.
RunModel.lock_owner.is_(None),
or_(
JobModel.lock_expires_at.is_(None),
and_(
# Do not try to lock jobs if the run is waiting for the lock,
# but allow retrying jobs whose own lock is stale because
# the run pipeline cannot reclaim stale job locks.
RunModel.lock_owner.is_(None),
JobModel.lock_expires_at.is_(None),
),
JobModel.lock_expires_at < now,
),
or_(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]:
JobModel.last_processed_at <= now - self._min_processing_interval,
JobModel.last_processed_at == JobModel.submitted_at,
),
# Do not try to lock jobs if the run is waiting for the lock.
RunModel.lock_owner.is_(None),
or_(
# This pipeline does not check RunModel.lock_owner
# because we want to provision jobs ASAP and RunPipeline can wait.
JobModel.lock_expires_at.is_(None),
JobModel.lock_expires_at < now,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from sqlalchemy.ext.asyncio import AsyncSession

from dstack._internal.core.models.instances import InstanceStatus
from dstack._internal.server.background.pipeline_tasks.fleets import FleetPipeline
from dstack._internal.server.background.pipeline_tasks.instances import (
InstanceFetcher,
InstancePipeline,
Expand Down Expand Up @@ -192,6 +193,64 @@ async def test_fetch_respects_order_and_limit(
assert middle.lock_owner == InstancePipeline.__name__
assert newest.lock_owner is None

async def test_fetch_allows_stale_instance_locks_if_fleet_is_waiting_for_instance_locks(
self, test_db, session: AsyncSession, fetcher: InstanceFetcher
):
project = await create_project(session=session)
fleet = await create_fleet(session=session, project=project)
stale = get_current_datetime() - dt.timedelta(minutes=1)

fleet.lock_owner = FleetPipeline.__name__
fleet.lock_token = None
fleet.lock_expires_at = None

instance = await create_instance(
session=session,
project=project,
fleet=fleet,
status=InstanceStatus.IDLE,
name="stale-locked",
last_processed_at=stale - dt.timedelta(seconds=1),
)
lock_instance(instance)
instance.lock_expires_at = stale
await session.commit()

items = await fetcher.fetch(limit=10)

assert [item.id for item in items] == [instance.id]

await session.refresh(instance)
assert instance.lock_owner == InstancePipeline.__name__

async def test_fetch_excludes_fresh_instances_when_fleet_is_waiting_for_instance_locks(
self, test_db, session: AsyncSession, fetcher: InstanceFetcher
):
project = await create_project(session=session)
fleet = await create_fleet(session=session, project=project)
stale = get_current_datetime() - dt.timedelta(minutes=1)

fleet.lock_owner = FleetPipeline.__name__
fleet.lock_token = None
fleet.lock_expires_at = None

instance = await create_instance(
session=session,
project=project,
fleet=fleet,
status=InstanceStatus.IDLE,
name="fresh-unlocked",
last_processed_at=stale - dt.timedelta(seconds=1),
)
await session.commit()

items = await fetcher.fetch(limit=10)

assert items == []

await session.refresh(instance)
assert instance.lock_owner is None


@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
_RunnerAvailability,
_SubmitJobToRunnerResult,
)
from dstack._internal.server.background.pipeline_tasks.runs import RunPipeline
from dstack._internal.server.models import JobModel, ProbeModel
from dstack._internal.server.schemas.runner import (
HealthcheckResponse,
Expand Down Expand Up @@ -307,6 +308,63 @@ async def test_fetch_excludes_jobs_from_terminating_runs(
assert active_job.lock_owner == JobRunningPipeline.__name__
assert terminating_run_job.lock_owner is None

async def test_fetch_allows_stale_job_locks_even_if_run_is_waiting_for_job_locks(
self, test_db, session: AsyncSession, fetcher: JobRunningFetcher
):
project = await create_project(session=session)
user = await create_user(session=session)
repo = await create_repo(session=session, project_id=project.id)
run = await create_run(session=session, project=project, repo=repo, user=user)
stale = get_current_datetime() - timedelta(minutes=1)

run.lock_owner = RunPipeline.__name__
run.lock_token = None
run.lock_expires_at = None

job = await create_job(
session=session,
run=run,
status=JobStatus.RUNNING,
last_processed_at=stale - timedelta(seconds=1),
)
_lock_job_expired_same_owner(job)
await session.commit()

items = await fetcher.fetch(limit=10)

assert [item.id for item in items] == [job.id]

await session.refresh(job)
assert job.lock_owner == JobRunningPipeline.__name__

async def test_fetch_excludes_jobs_when_run_is_waiting_for_related_job_locks(
self, test_db, session: AsyncSession, fetcher: JobRunningFetcher
):
project = await create_project(session=session)
user = await create_user(session=session)
repo = await create_repo(session=session, project_id=project.id)
run = await create_run(session=session, project=project, repo=repo, user=user)
stale = get_current_datetime() - timedelta(minutes=1)

run.lock_owner = RunPipeline.__name__
run.lock_token = None
run.lock_expires_at = None

job = await create_job(
session=session,
run=run,
status=JobStatus.RUNNING,
last_processed_at=stale - timedelta(seconds=1),
)
await session.commit()

items = await fetcher.fetch(limit=10)

assert items == []

await session.refresh(job)
assert job.lock_owner is None

async def test_fetch_returns_oldest_jobs_first_up_to_limit(
self, test_db, session: AsyncSession, fetcher: JobRunningFetcher
):
Expand Down
Loading