Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a6cb066
Add RunModel pipeline columns
r4victor Mar 19, 2026
4bc1850
Add RunPipeline scaffolding
r4victor Mar 19, 2026
1599545
Add run pipeline terminating worker
r4victor Mar 19, 2026
5c00eb5
Implement pending path for non-services
r4victor Mar 20, 2026
b9d8993
Implement active path for non-services
r4victor Mar 20, 2026
a9e20fe
Fix stale run.jobs view
r4victor Mar 20, 2026
6392c71
Document Locking related resources before refetch
r4victor Mar 20, 2026
15fe2f1
Fix stale fleet.instances view
r4victor Mar 20, 2026
5b8c129
Early return on fleet termination
r4victor Mar 20, 2026
a7dd28d
Implement pending path for services
r4victor Mar 23, 2026
540c5e2
Implement active services scaling
r4victor Mar 23, 2026
386af7c
Implement active services rolling deployment
r4victor Mar 23, 2026
49da969
Clarify scaling and rolling deploy conflict
r4victor Mar 23, 2026
d34b9ff
Wire
r4victor Mar 24, 2026
e45a26b
Hint run fetch
r4victor Mar 24, 2026
c5992cd
Add ix_runs_pipeline_fetch_q index
r4victor Mar 24, 2026
c0b53a0
Merge branch 'master' into issue_3551_run_pipeline
r4victor Mar 24, 2026
15a5817
Cleanup
r4victor Mar 24, 2026
fffc746
Fix _load_active_context extra return param
r4victor Mar 24, 2026
cbcdd2d
Fix double unlock
r4victor Mar 24, 2026
454e2c5
Fix unfinished jobs in retry replica
r4victor Mar 24, 2026
b474ad4
Add tests
r4victor Mar 24, 2026
363cd14
Respect parent pipeline priority
r4victor Mar 24, 2026
f013fb7
Fix unions syntax
r4victor Mar 24, 2026
f9a9d8b
Decrease fleets min_processing_interval
r4victor Mar 24, 2026
94fe7a7
Process legacy instances without fleets
r4victor Mar 24, 2026
beb55ae
Fix tests
r4victor Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions contributing/PIPELINES.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ It's ok not to force all pipelines into one exact shape.

When writing processing results, update the main row with a filter by both `id` and `lock_token`. This guarantees that only the worker that still owns the lock can apply its results. If the update affects no rows, treat the item as stale and skip applying other changes (status changes, related updates, events). A stale item means another worker or replica already continued processing.

**Locking related resources before refetch**

If you first refetch a main resource and only after lock the related resources, you need to ensure the worker doesn't get the stale view on related resources or works properly even in this case. It's often more robust to first lock related resources and then refetch the main resource with related resources already locked.

**Locking many related resources**

A pipeline may need to lock a potentially big set of related resource, e.g. fleet pipeline locking all fleet's instances. For this, do one SELECT FOR UPDATE of non-locked instances and one SELECT to see how many instances there are, and check if you managed to lock all of them. If fail to lock, release the main lock and try processing on another fetch iteration. You may keep `lock_owner` on the main resource or set `lock_owner` on locked related resource and make other pipelines respect that to guarantee the eventual locking of all related resources and avoid lock starvation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dstack._internal.server.background.pipeline_tasks.placement_groups import (
PlacementGroupPipeline,
)
from dstack._internal.server.background.pipeline_tasks.runs import RunPipeline
from dstack._internal.server.background.pipeline_tasks.volumes import VolumePipeline
from dstack._internal.utils.logging import get_logger

Expand All @@ -32,6 +33,7 @@ def __init__(self) -> None:
JobTerminatingPipeline(),
InstancePipeline(),
PlacementGroupPipeline(),
RunPipeline(),
VolumePipeline(),
]
self._hinter = PipelineHinter(self._pipelines)
Expand Down
102 changes: 67 additions & 35 deletions src/dstack/_internal/server/background/pipeline_tasks/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def __init__(
workers_num: int = 10,
queue_lower_limit_factor: float = 0.5,
queue_upper_limit_factor: float = 2.0,
min_processing_interval: timedelta = timedelta(seconds=60),
min_processing_interval: timedelta = timedelta(seconds=30),
lock_timeout: timedelta = timedelta(seconds=20),
heartbeat_trigger: timedelta = timedelta(seconds=10),
) -> None:
Expand Down Expand Up @@ -199,19 +199,13 @@ async def process(self, item: PipelineItem):
process_context = await _load_process_context(item)
if process_context is None:
return
result = await _process_fleet(
process_context.fleet_model,
consolidation_fleet_spec=process_context.consolidation_fleet_spec,
consolidation_instances=process_context.consolidation_instances,
)
result = await _process_fleet(process_context.fleet_model)
await _apply_process_result(item, process_context, result)


@dataclass
class _ProcessContext:
fleet_model: FleetModel
consolidation_fleet_spec: Optional[FleetSpec]
consolidation_instances: Optional[list[InstanceModel]]
locked_instance_ids: set[uuid.UUID] = field(default_factory=set)


Expand Down Expand Up @@ -260,34 +254,64 @@ def has_changes(self) -> bool:

async def _load_process_context(item: PipelineItem) -> Optional[_ProcessContext]:
async with get_session_ctx() as session:
fleet_model = await _refetch_locked_fleet(session=session, item=item)
fleet_model = await _refetch_locked_fleet_for_lock_decision(session=session, item=item)
if fleet_model is None:
log_lock_token_mismatch(logger, item)
return None

consolidation_fleet_spec = _get_fleet_spec_if_ready_for_consolidation(fleet_model)
consolidation_instances = None
if consolidation_fleet_spec is not None:
consolidation_instances = await _lock_fleet_instances_for_consolidation(
session=session,
item=item,
)
if consolidation_instances is None:
return None
locked_instance_ids = await _lock_fleet_instances_for_processing(
session=session,
item=item,
fleet_model=fleet_model,
)
if locked_instance_ids is None:
return None

fleet_model = await _refetch_locked_fleet_for_processing(session=session, item=item)
if fleet_model is None:
log_lock_token_mismatch(logger, item)
if locked_instance_ids:
await _unlock_fleet_locked_instances(
session=session,
item=item,
locked_instance_ids=locked_instance_ids,
)
await session.commit()
return None

return _ProcessContext(
fleet_model=fleet_model,
consolidation_fleet_spec=consolidation_fleet_spec,
consolidation_instances=consolidation_instances,
locked_instance_ids=(
set()
if consolidation_instances is None
else {i.id for i in consolidation_instances}
),
locked_instance_ids=locked_instance_ids,
)


async def _refetch_locked_fleet(
async def _refetch_locked_fleet_for_lock_decision(
session: AsyncSession,
item: PipelineItem,
) -> Optional[FleetModel]:
res = await session.execute(
select(FleetModel)
.where(
FleetModel.id == item.id,
FleetModel.lock_token == item.lock_token,
)
.options(
load_only(
FleetModel.id,
FleetModel.status,
FleetModel.spec,
FleetModel.current_master_instance_id,
FleetModel.consolidation_attempt,
FleetModel.last_consolidated_at,
FleetModel.last_processed_at,
)
)
.execution_options(populate_existing=True)
)
return res.unique().scalar_one_or_none()


async def _refetch_locked_fleet_for_processing(
session: AsyncSession,
item: PipelineItem,
) -> Optional[FleetModel]:
Expand All @@ -308,6 +332,7 @@ async def _refetch_locked_fleet(
FleetModel.runs.and_(RunModel.status.not_in(RunStatus.finished_statuses()))
).load_only(RunModel.status)
)
.execution_options(populate_existing=True)
)
return res.unique().scalar_one_or_none()

Expand All @@ -326,10 +351,17 @@ def _get_fleet_spec_if_ready_for_consolidation(fleet_model: FleetModel) -> Optio
return consolidation_fleet_spec


async def _lock_fleet_instances_for_consolidation(
async def _lock_fleet_instances_for_processing(
session: AsyncSession,
item: PipelineItem,
) -> Optional[list[InstanceModel]]:
fleet_model: FleetModel,
) -> Optional[set[uuid.UUID]]:
if _get_fleet_spec_if_ready_for_consolidation(fleet_model) is None:
if fleet_model.current_master_instance_id is None:
return set()
if not _is_cloud_cluster_fleet_spec(get_fleet_spec(fleet_model)):
return set()

instance_lock, _ = get_locker(get_db().dialect_name).get_lockset(InstanceModel.__tablename__)
async with instance_lock:
res = await session.execute(
Expand All @@ -347,6 +379,7 @@ async def _lock_fleet_instances_for_consolidation(
),
)
.with_for_update(skip_locked=True, key_share=True, of=InstanceModel)
.options(load_only(InstanceModel.id))
)
locked_instance_models = list(res.scalars().all())
locked_instance_ids = {instance_model.id for instance_model in locked_instance_models}
Expand Down Expand Up @@ -389,7 +422,7 @@ async def _lock_fleet_instances_for_consolidation(
instance_model.lock_token = item.lock_token
instance_model.lock_owner = FleetPipeline.__name__
await session.commit()
return locked_instance_models
return locked_instance_ids


async def _apply_process_result(
Expand Down Expand Up @@ -461,30 +494,29 @@ async def _apply_process_result(

async def _process_fleet(
fleet_model: FleetModel,
consolidation_fleet_spec: Optional[FleetSpec] = None,
consolidation_instances: Optional[Sequence[InstanceModel]] = None,
) -> _ProcessResult:
result = _ProcessResult()
effective_instances = list(consolidation_instances or fleet_model.instances)
consolidation_fleet_spec = _get_fleet_spec_if_ready_for_consolidation(fleet_model)
if consolidation_fleet_spec is not None:
result = _consolidate_fleet_state_with_spec(
fleet_model,
consolidation_fleet_spec=consolidation_fleet_spec,
consolidation_instances=effective_instances,
consolidation_instances=fleet_model.instances,
)
if len(result.new_instance_creates) == 0 and _should_delete_fleet(fleet_model):
result.fleet_update_map["status"] = FleetStatus.TERMINATED
result.fleet_update_map["deleted"] = True
result.fleet_update_map["deleted_at"] = NOW_PLACEHOLDER
return result
_set_fail_instances_on_master_bootstrap_failure(
fleet_model=fleet_model,
instance_models=effective_instances,
instance_models=fleet_model.instances,
instance_id_to_update_map=result.instance_id_to_update_map,
)
_set_current_master_instance_id(
fleet_model=fleet_model,
fleet_update_map=result.fleet_update_map,
instance_models=effective_instances,
instance_models=fleet_model.instances,
instance_id_to_update_map=result.instance_id_to_update_map,
new_instance_creates=result.new_instance_creates,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
)
from dstack._internal.server.db import get_db, get_session_ctx
from dstack._internal.server.models import (
FleetModel,
InstanceHealthCheckModel,
InstanceModel,
JobModel,
Expand Down Expand Up @@ -147,6 +148,7 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]:
now = get_current_datetime()
res = await session.execute(
select(InstanceModel)
.join(InstanceModel.fleet, isouter=True)
.where(
InstanceModel.status.in_(
[
Expand All @@ -164,6 +166,11 @@ async def fetch(self, limit: int) -> list[InstancePipelineItem]:
)
),
InstanceModel.deleted == False,
or_(
# Do not try to lock instances if the fleet is waiting for the lock.
InstanceModel.fleet_id.is_(None),
FleetModel.lock_owner.is_(None),
),
or_(
InstanceModel.last_processed_at <= now - self._min_processing_interval,
InstanceModel.last_processed_at == InstanceModel.created_at,
Expand Down Expand Up @@ -239,6 +246,9 @@ async def process(self, item: InstancePipelineItem):
if process_context is None:
return

# Keep apply centralized here because every instance path returns the same
# `ProcessResult` shape for one primary model, with only a small set of
# optional side effects such as health checks or placement-group scheduling.
await _apply_process_result(
item=item,
instance_model=process_context.instance_model,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ async def fetch(self, limit: int) -> list[JobRunningPipelineItem]:
),
RunModel.status.not_in([RunStatus.TERMINATING]),
JobModel.last_processed_at <= now - self._min_processing_interval,
# Do not try to lock jobs if the run is waiting for the lock.
RunModel.lock_owner.is_(None),
or_(
JobModel.lock_expires_at.is_(None),
JobModel.lock_expires_at < now,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ async def fetch(self, limit: int) -> list[JobSubmittedPipelineItem]:
JobModel.last_processed_at <= now - self._min_processing_interval,
JobModel.last_processed_at == JobModel.submitted_at,
),
# Do not try to lock jobs if the run is waiting for the lock.
RunModel.lock_owner.is_(None),
or_(
JobModel.lock_expires_at.is_(None),
JobModel.lock_expires_at < now,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ async def _process_terminating_job(
"""
Stops the job: tells shim to stop the container, detaches the job from the instance,
and detaches volumes from the instance.
Graceful stop should already be done by `process_terminating_run`.
Graceful stop should already be done by the run terminating path.
"""
instance_update_map = None if instance_model is None else _InstanceUpdateMap()
result = _ProcessResult(instance_update_map=instance_update_map)
Expand Down
Loading
Loading