Skip to content

Commit 2dfe398

Browse files
authored
Implement instance pipeline (#3636)
* Move delete_instance_health_checks to a separate module * Move utils/provisioning.py to ssh_fleets/provisioning.py * Use SSHProvisioningError for ssh instances errors * Fix _add_remote() nested try-excepts * Refactor _resolve_ssh_instance_network * Refactor _process_instance() into thin dispatcher * Refactor instance check code * Add fetchers tests * Add TestInstanceWorker * Run pyright for pipeline tests * WIP: InstanceWorker * Fix volumes pipeline processing active * Refactor log_lock_token * Build instance events from update map * Rename * Refactor instance pipeline into modules * Inline _get_effective_ helpers * Process new instance immediately * Do not refetch status * Fix sibling_update_rows * Drop redundant synchronize_session=False * Add ProcessContext * Simplify placement groups code * Drop _PlacementGroupState * Restore comments * Fix result.sibling_update_rows append * Fix unset typing * Add migration * Lock instances in fleet pipeline * Optimize instance lock in fleet pipeline * Respect instance lock in delete_fleets * Skip locked instances in process_next_terminating_job * Respect instance lock in submitted_jobs * Add ix_instances_pipeline_fetch_q_index * Wire instance pipeline * Set current_master_instance * Refactor current_master_instance * Terminate instances with MASTER_FAILED if the master dies with NO_OFFERS * Fix instance unlock in fleet pipeline * Remove extra fleet_model_to_fleet * Wire pipeline_hinter * Remove extra fleet_model_to_fleet * Fix index name * Rebase migrations * Fix redundant fleet.instances loads in instance pipeline * Do not lock all instances in delete_fleets * Add FIXME * Fix tests * Retry instance lock in delete_fleets * Retry lock in all delete endpoints * Fix created_at and last_processed_at init values
1 parent 8bedd73 commit 2dfe398

52 files changed

Lines changed: 7535 additions & 566 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
- Python targets 3.9+ with 4-space indentation and max line length of 99 (see `ruff.toml`; `E501` is ignored but keep lines readable).
1919
- Imports are sorted via Ruff’s isort settings (`dstack` treated as first-party).
2020
- Keep primary/public functions before local helper functions in a module section.
21+
- Keep private classes, exceptions, and similar implementation-specific types close to the private functions that use them unless they are shared more broadly in the module.
2122
- Prefer pydantic-style models in `core/models`.
2223
- Tests use `test_*.py` modules and `test_*` functions; fixtures live near usage.
2324

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ include = [
106106
"src/dstack/_internal/core/backends/runpod",
107107
"src/dstack/_internal/cli/services/configurators",
108108
"src/dstack/_internal/cli/commands",
109+
"src/tests/_internal/server/background/pipeline_tasks",
109110
]
110111
ignore = [
111112
"src/dstack/_internal/server/migrations/versions",

src/dstack/_internal/core/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@ class ConfigurationError(DstackError):
136136
pass
137137

138138

139+
class SSHProvisioningError(DstackError):
140+
pass
141+
142+
139143
class SSHError(DstackError):
140144
pass
141145

src/dstack/_internal/server/background/pipeline_tasks/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from dstack._internal.server.background.pipeline_tasks.compute_groups import ComputeGroupPipeline
55
from dstack._internal.server.background.pipeline_tasks.fleets import FleetPipeline
66
from dstack._internal.server.background.pipeline_tasks.gateways import GatewayPipeline
7+
from dstack._internal.server.background.pipeline_tasks.instances import InstancePipeline
78
from dstack._internal.server.background.pipeline_tasks.placement_groups import (
89
PlacementGroupPipeline,
910
)
@@ -19,6 +20,7 @@ def __init__(self) -> None:
1920
ComputeGroupPipeline(),
2021
FleetPipeline(),
2122
GatewayPipeline(),
23+
InstancePipeline(),
2224
PlacementGroupPipeline(),
2325
VolumePipeline(),
2426
]

src/dstack/_internal/server/background/pipeline_tasks/base.py

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import asyncio
2+
import logging
23
import math
34
import random
5+
import time
46
import uuid
57
from abc import ABC, abstractmethod
68
from collections.abc import Iterable, Sequence
@@ -331,14 +333,20 @@ async def start(self):
331333
self._running = True
332334
while self._running:
333335
item = await self._queue.get()
336+
start_time = time.time()
334337
logger.debug("Processing %s item %s", item.__tablename__, item.id)
335338
try:
336339
await self.process(item)
337340
except Exception:
338341
logger.exception("Unexpected exception when processing item")
339342
finally:
340343
await self._heartbeater.untrack(item)
341-
logger.debug("Processed %s item %s", item.__tablename__, item.id)
344+
logger.debug(
345+
"Processed %s item %s in %.3f",
346+
item.__tablename__,
347+
item.id,
348+
time.time() - start_time,
349+
)
342350

343351
def stop(self):
344352
self._running = False
@@ -416,3 +424,40 @@ def resolve_now_placeholders(update_values: _ResolveNowInput, now: datetime):
416424
for key, value in update_values.items():
417425
if value is NOW_PLACEHOLDER:
418426
update_values[key] = now
427+
428+
429+
def log_lock_token_mismatch(
430+
logger: logging.Logger,
431+
item: PipelineItem,
432+
action: str = "process",
433+
) -> None:
434+
logger.warning(
435+
"Failed to %s %s item %s: lock_token mismatch."
436+
" The item is expected to be processed and updated on another fetch iteration.",
437+
action,
438+
item.__tablename__,
439+
item.id,
440+
)
441+
442+
443+
def log_lock_token_changed_after_processing(
444+
logger: logging.Logger,
445+
item: PipelineItem,
446+
action: str = "update",
447+
expected_outcome: str = "updated",
448+
) -> None:
449+
logger.warning(
450+
"Failed to %s %s item %s after processing: lock_token changed."
451+
" The item is expected to be processed and %s on another fetch iteration.",
452+
action,
453+
item.__tablename__,
454+
item.id,
455+
expected_outcome,
456+
)
457+
458+
459+
def log_lock_token_changed_on_reset(logger: logging.Logger) -> None:
460+
logger.warning(
461+
"Failed to reset lock: lock_token changed."
462+
" The item is expected to be processed and updated on another fetch iteration."
463+
)

src/dstack/_internal/server/background/pipeline_tasks/compute_groups.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
PipelineItem,
2121
UpdateMapDateTime,
2222
Worker,
23+
log_lock_token_changed_after_processing,
24+
log_lock_token_mismatch,
2325
resolve_now_placeholders,
2426
set_processed_update_map_fields,
2527
set_unlock_update_map_fields,
@@ -194,12 +196,7 @@ async def process(self, item: PipelineItem):
194196
)
195197
compute_group_model = res.unique().scalar_one_or_none()
196198
if compute_group_model is None:
197-
logger.warning(
198-
"Failed to process %s item %s: lock_token mismatch."
199-
" The item is expected to be processed and updated on another fetch iteration.",
200-
item.__tablename__,
201-
item.id,
202-
)
199+
log_lock_token_mismatch(logger, item)
203200
return
204201

205202
result = _TerminateResult()
@@ -228,12 +225,7 @@ async def process(self, item: PipelineItem):
228225
)
229226
updated_ids = list(res.scalars().all())
230227
if len(updated_ids) == 0:
231-
logger.warning(
232-
"Failed to update %s item %s after processing: lock_token changed."
233-
" The item is expected to be processed and updated on another fetch iteration.",
234-
item.__tablename__,
235-
item.id,
236-
)
228+
log_lock_token_changed_after_processing(logger, item)
237229
return
238230
if not result.instances_update_map:
239231
return
@@ -249,6 +241,8 @@ async def process(self, item: PipelineItem):
249241
instance_model=instance_model,
250242
old_status=instance_model.status,
251243
new_status=InstanceStatus.TERMINATED,
244+
termination_reason=instance_model.termination_reason,
245+
termination_reason_message=instance_model.termination_reason_message,
252246
)
253247

254248

0 commit comments

Comments
 (0)