From 911cc30db306f7caa613593ca051d40c3d324b10 Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Sun, 22 Mar 2026 23:05:23 -0700 Subject: [PATCH 1/8] Add data transfer quota to terminate jobs exceeding outbound traffic limits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a configurable per-job outbound data transfer quota (AWS only) that terminates jobs when the total external traffic exceeds the threshold. Metering uses iptables byte counters on the shim (host-level), excluding private/VPC traffic. The shim notifies the runner via a new /api/terminate endpoint so the server reads the termination reason through the existing /api/pull flow — same pattern as log quota. Configured via DSTACK_SERVER_DATA_TRANSFER_QUOTA_PER_JOB_AWS (bytes, 0=unlimited). Co-Authored-By: Claude Opus 4.6 (1M context) --- runner/internal/common/types/types.go | 3 +- runner/internal/runner/api/http.go | 18 ++ runner/internal/runner/api/server.go | 1 + runner/internal/runner/schemas/schemas.go | 5 + runner/internal/shim/docker.go | 77 ++++- runner/internal/shim/docker_test.go | 4 + runner/internal/shim/models.go | 8 +- runner/internal/shim/netmeter/netmeter.go | 264 ++++++++++++++++++ .../internal/shim/netmeter/netmeter_test.go | 98 +++++++ src/dstack/_internal/core/models/runs.py | 3 + .../background/pipeline_tasks/jobs_running.py | 5 + .../scheduled_tasks/running_jobs.py | 5 + src/dstack/_internal/server/schemas/runner.py | 1 + .../server/services/runner/client.py | 2 + src/dstack/_internal/server/settings.py | 14 + 15 files changed, 503 insertions(+), 5 deletions(-) create mode 100644 runner/internal/shim/netmeter/netmeter.go create mode 100644 runner/internal/shim/netmeter/netmeter_test.go diff --git a/runner/internal/common/types/types.go b/runner/internal/common/types/types.go index 057c0248c..c0a34c6e7 100644 --- a/runner/internal/common/types/types.go +++ b/runner/internal/common/types/types.go @@ -10,5 +10,6 @@ const ( TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user" TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server" TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded" - TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded" + TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded" + TerminationReasonDataTransferQuotaExceeded TerminationReason = "data_transfer_quota_exceeded" ) diff --git a/runner/internal/runner/api/http.go b/runner/internal/runner/api/http.go index 34220acc6..800da516f 100644 --- a/runner/internal/runner/api/http.go +++ b/runner/internal/runner/api/http.go @@ -194,6 +194,24 @@ func (s *Server) stopPostHandler(w http.ResponseWriter, r *http.Request) (interf return nil, nil } +func (s *Server) terminatePostHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) { + var body schemas.TerminateBody + if err := api.DecodeJSONBody(w, r, &body, true); err != nil { + return nil, err + } + ctx := r.Context() + log.Error(ctx, "Terminate requested", "reason", body.Reason, "message", body.Message) + // No executor.Lock() needed — SetJobStateWithTerminationReason acquires its own lock. + // Using the external lock would deadlock with io.Copy holding it during job execution. + s.executor.SetJobStateWithTerminationReason( + ctx, + schemas.JobStateFailed, + body.Reason, + body.Message, + ) + return nil, nil +} + func isMaxBytesError(err error) bool { var maxBytesError *http.MaxBytesError return errors.As(err, &maxBytesError) diff --git a/runner/internal/runner/api/server.go b/runner/internal/runner/api/server.go index 11b76d887..478361232 100644 --- a/runner/internal/runner/api/server.go +++ b/runner/internal/runner/api/server.go @@ -68,6 +68,7 @@ func NewServer(ctx context.Context, address string, version string, ex executor. r.AddHandler("POST", "/api/run", s.runPostHandler) r.AddHandler("GET", "/api/pull", s.pullGetHandler) r.AddHandler("POST", "/api/stop", s.stopPostHandler) + r.AddHandler("POST", "/api/terminate", s.terminatePostHandler) r.AddHandler("GET", "/logs_ws", s.logsWsGetHandler) return s, nil } diff --git a/runner/internal/runner/schemas/schemas.go b/runner/internal/runner/schemas/schemas.go index 47706228c..a3f83fee0 100644 --- a/runner/internal/runner/schemas/schemas.go +++ b/runner/internal/runner/schemas/schemas.go @@ -39,6 +39,11 @@ type SubmitBody struct { LogQuotaHour int `json:"log_quota_hour"` // bytes per hour, 0 = unlimited } +type TerminateBody struct { + Reason types.TerminationReason `json:"reason"` + Message string `json:"message"` +} + type PullResponse struct { JobStates []JobStateEvent `json:"job_states"` JobLogs []LogEvent `json:"job_logs"` diff --git a/runner/internal/shim/docker.go b/runner/internal/shim/docker.go index 6acfb27a5..733420257 100644 --- a/runner/internal/shim/docker.go +++ b/runner/internal/shim/docker.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "io" + "net/http" "os" "os/exec" "os/user" @@ -37,6 +38,7 @@ import ( "github.com/dstackai/dstack/runner/internal/common/types" "github.com/dstackai/dstack/runner/internal/shim/backends" "github.com/dstackai/dstack/runner/internal/shim/host" + "github.com/dstackai/dstack/runner/internal/shim/netmeter" ) // TODO: Allow for configuration via cli arguments or environment variables. @@ -380,7 +382,8 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { if err := d.tasks.Update(task); err != nil { return fmt.Errorf("%w: failed to update task %s: %w", ErrInternal, task.ID, err) } - err = d.waitContainer(ctx, &task) + + err = d.waitContainerWithQuota(ctx, &task, cfg) } if err != nil { log.Error(ctx, "failed to run container", "err", err) @@ -910,6 +913,49 @@ func (d *DockerRunner) waitContainer(ctx context.Context, task *Task) error { return nil } +// waitContainerWithQuota waits for the container to finish, optionally enforcing +// a data transfer quota. If the quota is exceeded, it notifies the runner +// (so the server reads the termination reason via /api/pull) and stops the container. +func (d *DockerRunner) waitContainerWithQuota(ctx context.Context, task *Task, cfg TaskConfig) error { + if cfg.DataTransferQuota <= 0 { + return d.waitContainer(ctx, task) + } + + nm := netmeter.New(task.ID, cfg.DataTransferQuota) + if err := nm.Start(ctx); err != nil { + errMessage := fmt.Sprintf("data transfer quota configured but metering unavailable: %s", err) + log.Error(ctx, errMessage) + task.SetStatusTerminated(string(types.TerminationReasonExecutorError), errMessage) + return fmt.Errorf("data transfer meter: %w", err) + } + defer nm.Stop() + + waitDone := make(chan error, 1) + go func() { waitDone <- d.waitContainer(ctx, task) }() + + select { + case err := <-waitDone: + return err + case <-nm.Exceeded(): + log.Error(ctx, "Data transfer quota exceeded", "task", task.ID, "quota", cfg.DataTransferQuota) + terminateMsg := fmt.Sprintf("Outbound data transfer exceeded quota of %d bytes", cfg.DataTransferQuota) + if err := terminateRunner(ctx, d.dockerParams.RunnerHTTPPort(), + types.TerminationReasonDataTransferQuotaExceeded, terminateMsg); err != nil { + log.Error(ctx, "failed to notify runner of termination", "err", err) + } + stopTimeout := 10 + stopOpts := container.StopOptions{Timeout: &stopTimeout} + if err := d.client.ContainerStop(ctx, task.containerID, stopOpts); err != nil { + log.Error(ctx, "failed to stop container after quota exceeded", "err", err) + } + <-waitDone + // The runner already set the job state with the termination reason. + // The server will read it via /api/pull. + task.SetStatusTerminated(string(types.TerminationReasonDoneByRunner), "") + return nil + } +} + func encodeRegistryAuth(username string, password string) (string, error) { if username == "" && password == "" { return "", nil @@ -1180,6 +1226,31 @@ func getContainerLastLogs(ctx context.Context, client docker.APIClient, containe return lines, nil } +// terminateRunner calls the runner's /api/terminate endpoint to set the job termination state. +// This allows the server to read the termination reason via /api/pull before the container dies. +func terminateRunner(ctx context.Context, runnerPort int, reason types.TerminationReason, message string) error { + url := fmt.Sprintf("http://localhost:%d/api/terminate", runnerPort) + body := fmt.Sprintf(`{"reason":%q,"message":%q}`, reason, message) + // 5s is generous for a localhost HTTP call; if the runner doesn't respond in time, + // we proceed with stopping the container anyway (the server will handle the termination). + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(body)) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("request failed: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status: %d", resp.StatusCode) + } + return nil +} + /* DockerParameters interface implementation for CLIArgs */ func (c *CLIArgs) DockerPrivileged() bool { @@ -1228,6 +1299,10 @@ func (c *CLIArgs) DockerPorts() []int { return []int{c.Runner.HTTPPort, c.Runner.SSHPort} } +func (c *CLIArgs) RunnerHTTPPort() int { + return c.Runner.HTTPPort +} + func (c *CLIArgs) MakeRunnerDir(name string) (string, error) { runnerTemp := filepath.Join(c.Shim.HomeDir, "runners", name) if err := os.MkdirAll(runnerTemp, 0o755); err != nil { diff --git a/runner/internal/shim/docker_test.go b/runner/internal/shim/docker_test.go index 18f8c31fc..3723f53e3 100644 --- a/runner/internal/shim/docker_test.go +++ b/runner/internal/shim/docker_test.go @@ -123,6 +123,10 @@ func (c *dockerParametersMock) DockerPorts() []int { return []int{} } +func (c *dockerParametersMock) RunnerHTTPPort() int { + return 10999 +} + func (c *dockerParametersMock) DockerMounts(string) ([]mount.Mount, error) { return nil, nil } diff --git a/runner/internal/shim/models.go b/runner/internal/shim/models.go index d50fe6e29..473ec3b8e 100644 --- a/runner/internal/shim/models.go +++ b/runner/internal/shim/models.go @@ -9,6 +9,7 @@ type DockerParameters interface { DockerShellCommands(authorizedKeys []string, runnerHttpAddress string) []string DockerMounts(string) ([]mount.Mount, error) DockerPorts() []int + RunnerHTTPPort() int MakeRunnerDir(name string) (string, error) DockerPJRTDevice() string } @@ -98,9 +99,10 @@ type TaskConfig struct { // GPUDevices allows the server to set gpu devices instead of relying on the runner default logic. // E.g. passing nvidia devices directly instead of using nvidia-container-toolkit. GPUDevices []GPUDevice `json:"gpu_devices"` - HostSshUser string `json:"host_ssh_user"` - HostSshKeys []string `json:"host_ssh_keys"` - ContainerSshKeys []string `json:"container_ssh_keys"` + HostSshUser string `json:"host_ssh_user"` + HostSshKeys []string `json:"host_ssh_keys"` + ContainerSshKeys []string `json:"container_ssh_keys"` + DataTransferQuota int64 `json:"data_transfer_quota"` // total bytes for job lifetime; 0 = unlimited } type TaskListItem struct { diff --git a/runner/internal/shim/netmeter/netmeter.go b/runner/internal/shim/netmeter/netmeter.go new file mode 100644 index 000000000..bf438b946 --- /dev/null +++ b/runner/internal/shim/netmeter/netmeter.go @@ -0,0 +1,264 @@ +package netmeter + +import ( + "bytes" + "context" + "fmt" + "os/exec" + "strconv" + "strings" + "sync" + "time" + + "github.com/dstackai/dstack/runner/internal/common/log" +) + +const ( + pollInterval = 10 * time.Second + chainPrefix = "dstack-nm-" +) + +// NetMeter monitors outbound data transfer using iptables byte counters. +// It excludes private/VPC traffic and counts only external (billable) bytes. +// When cumulative bytes exceed the configured quota, the Exceeded() channel is closed. +type NetMeter struct { + quota int64 // total bytes for job lifetime + chainName string // unique iptables chain name + + exceeded chan struct{} + exceededOnce sync.Once + stopCh chan struct{} + stopped chan struct{} +} + +// New creates a new NetMeter with the given quota in bytes. +func New(taskID string, quota int64) *NetMeter { + // Use first 8 chars of task ID for chain name uniqueness + suffix := taskID + if len(suffix) > 8 { + suffix = suffix[:8] + } + return &NetMeter{ + quota: quota, + chainName: chainPrefix + suffix, + exceeded: make(chan struct{}), + stopCh: make(chan struct{}), + stopped: make(chan struct{}), + } +} + +// Start sets up iptables rules and begins polling byte counters. +func (m *NetMeter) Start(ctx context.Context) error { + if err := checkIptables(); err != nil { + return fmt.Errorf("iptables not available: %w", err) + } + + if err := m.setupChain(ctx); err != nil { + return fmt.Errorf("setup iptables chain: %w", err) + } + + go m.pollLoop(ctx) + return nil +} + +// Stop signals the poll loop to stop and cleans up iptables rules. +func (m *NetMeter) Stop() { + close(m.stopCh) + <-m.stopped +} + +// Exceeded returns a channel that is closed when the quota is exceeded. +func (m *NetMeter) Exceeded() <-chan struct{} { + return m.exceeded +} + +func checkIptables() error { + _, err := exec.LookPath("iptables") + return err +} + +func (m *NetMeter) setupChain(ctx context.Context) error { + // Create the chain + if err := iptables(ctx, "-N", m.chainName); err != nil { + return fmt.Errorf("create chain: %w", err) + } + + // Add exclusion rules for private/internal traffic (these RETURN without counting) + privateCIDRs := []struct { + cidr string + comment string + }{ + {"10.0.0.0/8", "VPC/private"}, + {"172.16.0.0/12", "VPC/private"}, + {"192.168.0.0/16", "VPC/private"}, + {"169.254.0.0/16", "link-local/metadata"}, + {"127.0.0.0/8", "loopback"}, + } + for _, p := range privateCIDRs { + if err := iptables(ctx, "-A", m.chainName, "-d", p.cidr, "-j", "RETURN"); err != nil { + m.cleanup(ctx) + return fmt.Errorf("add exclusion rule for %s: %w", p.comment, err) + } + } + + // Add catch-all counting rule (counts all remaining = external/billable bytes) + if err := iptables(ctx, "-A", m.chainName, "-j", "RETURN"); err != nil { + m.cleanup(ctx) + return fmt.Errorf("add counting rule: %w", err) + } + + // Insert jump from OUTPUT chain (catches host-mode Docker and host processes) + if err := iptables(ctx, "-I", "OUTPUT", "-j", m.chainName); err != nil { + m.cleanup(ctx) + return fmt.Errorf("insert OUTPUT jump: %w", err) + } + + // Insert jump from FORWARD chain (catches bridge-mode Docker traffic) + if err := iptables(ctx, "-I", "FORWARD", "-j", m.chainName); err != nil { + m.cleanup(ctx) + return fmt.Errorf("insert FORWARD jump: %w", err) + } + + return nil +} + +func (m *NetMeter) cleanup(ctx context.Context) { + // Remove jumps from OUTPUT and FORWARD (ignore errors — may not exist if setup failed partway) + _ = iptables(ctx, "-D", "OUTPUT", "-j", m.chainName) + _ = iptables(ctx, "-D", "FORWARD", "-j", m.chainName) + // Flush and delete chain + _ = iptables(ctx, "-F", m.chainName) + _ = iptables(ctx, "-X", m.chainName) +} + +func (m *NetMeter) pollLoop(ctx context.Context) { + defer close(m.stopped) + defer m.cleanup(ctx) + + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + + for { + select { + case <-m.stopCh: + return + case <-ticker.C: + bytes, err := m.readCounter(ctx) + if err != nil { + log.Error(ctx, "failed to read network counter", "chain", m.chainName, "err", err) + continue + } + if bytes > m.quota { + log.Error(ctx, "data transfer quota exceeded", + "chain", m.chainName, "bytes", bytes, "quota", m.quota) + m.exceededOnce.Do(func() { close(m.exceeded) }) + return + } + } + } +} + +// readCounter reads the cumulative byte count from the catch-all rule (last rule in chain). +func (m *NetMeter) readCounter(ctx context.Context) (int64, error) { + output, err := iptablesOutput(ctx, "-L", m.chainName, "-v", "-x", "-n") + if err != nil { + return 0, err + } + return parseByteCounter(output, m.chainName) +} + +// parseByteCounter extracts the byte count from the last rule (catch-all counting rule) +// in the iptables -L -v -x -n output. +// +// Example output: +// +// Chain dstack-nm-abcd1234 (1 references) +// pkts bytes target prot opt in out source destination +// 0 0 RETURN all -- * * 0.0.0.0/0 10.0.0.0/8 +// 0 0 RETURN all -- * * 0.0.0.0/0 172.16.0.0/12 +// 0 0 RETURN all -- * * 0.0.0.0/0 192.168.0.0/16 +// 0 0 RETURN all -- * * 0.0.0.0/0 169.254.0.0/16 +// 0 0 RETURN all -- * * 0.0.0.0/0 127.0.0.0/8 +// 123 456789 RETURN all -- * * 0.0.0.0/0 0.0.0.0/0 +// +// The last rule (destination 0.0.0.0/0) is the catch-all; its bytes field is what we want. +func parseByteCounter(output string, chainName string) (int64, error) { + lines := strings.Split(strings.TrimSpace(output), "\n") + + // Find lines that are rule entries (skip header lines) + var lastRuleLine string + for _, line := range lines { + trimmed := strings.TrimSpace(line) + if trimmed == "" { + continue + } + // Skip "Chain ..." and column header lines + if strings.HasPrefix(trimmed, "Chain ") { + continue + } + if strings.HasPrefix(trimmed, "pkts") { + continue + } + lastRuleLine = trimmed + } + + if lastRuleLine == "" { + return 0, fmt.Errorf("no rules found in chain %s", chainName) + } + + // Parse the bytes field (second field in the line) + fields := strings.Fields(lastRuleLine) + if len(fields) < 2 { + return 0, fmt.Errorf("unexpected rule format: %q", lastRuleLine) + } + + byteCount, err := strconv.ParseInt(fields[1], 10, 64) + if err != nil { + return 0, fmt.Errorf("parse byte count %q: %w", fields[1], err) + } + + return byteCount, nil +} + +func iptables(ctx context.Context, args ...string) error { + cmd := exec.CommandContext(ctx, "iptables", args...) + var stderr bytes.Buffer + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return fmt.Errorf("iptables %s: %s: %w", strings.Join(args, " "), stderr.String(), err) + } + return nil +} + +func iptablesOutput(ctx context.Context, args ...string) (string, error) { + cmd := exec.CommandContext(ctx, "iptables", args...) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("iptables %s: %s: %w", strings.Join(args, " "), stderr.String(), err) + } + return stdout.String(), nil +} + +// CleanupOrphanedChains removes any leftover dstack-nm-* chains from previous runs. +// Call this on shim startup. +func CleanupOrphanedChains(ctx context.Context) { + output, err := iptablesOutput(ctx, "-L", "-n") + if err != nil { + return + } + for _, line := range strings.Split(output, "\n") { + if strings.HasPrefix(line, "Chain "+chainPrefix) { + fields := strings.Fields(line) + if len(fields) >= 2 { + chainName := fields[1] + log.Info(ctx, "cleaning up orphaned data transfer meter chain", "chain", chainName) + _ = iptables(ctx, "-D", "OUTPUT", "-j", chainName) + _ = iptables(ctx, "-D", "FORWARD", "-j", chainName) + _ = iptables(ctx, "-F", chainName) + _ = iptables(ctx, "-X", chainName) + } + } + } +} diff --git a/runner/internal/shim/netmeter/netmeter_test.go b/runner/internal/shim/netmeter/netmeter_test.go new file mode 100644 index 000000000..055843e93 --- /dev/null +++ b/runner/internal/shim/netmeter/netmeter_test.go @@ -0,0 +1,98 @@ +package netmeter + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseByteCounter(t *testing.T) { + tests := []struct { + name string + output string + chain string + expected int64 + expectErr bool + }{ + { + name: "typical output with traffic", + output: `Chain dstack-nm-abcd1234 (1 references) + pkts bytes target prot opt in out source destination + 0 0 RETURN all -- * * 0.0.0.0/0 10.0.0.0/8 + 0 0 RETURN all -- * * 0.0.0.0/0 172.16.0.0/12 + 0 0 RETURN all -- * * 0.0.0.0/0 192.168.0.0/16 + 0 0 RETURN all -- * * 0.0.0.0/0 169.254.0.0/16 + 0 0 RETURN all -- * * 0.0.0.0/0 127.0.0.0/8 + 123 456789 RETURN all -- * * 0.0.0.0/0 0.0.0.0/0 +`, + chain: "dstack-nm-abcd1234", + expected: 456789, + }, + { + name: "zero traffic", + output: `Chain dstack-nm-abcd1234 (1 references) + pkts bytes target prot opt in out source destination + 0 0 RETURN all -- * * 0.0.0.0/0 10.0.0.0/8 + 0 0 RETURN all -- * * 0.0.0.0/0 172.16.0.0/12 + 0 0 RETURN all -- * * 0.0.0.0/0 192.168.0.0/16 + 0 0 RETURN all -- * * 0.0.0.0/0 169.254.0.0/16 + 0 0 RETURN all -- * * 0.0.0.0/0 127.0.0.0/8 + 0 0 RETURN all -- * * 0.0.0.0/0 0.0.0.0/0 +`, + chain: "dstack-nm-abcd1234", + expected: 0, + }, + { + name: "large byte count", + output: `Chain dstack-nm-test1234 (1 references) + pkts bytes target prot opt in out source destination + 10000 5000000 RETURN all -- * * 0.0.0.0/0 10.0.0.0/8 + 0 0 RETURN all -- * * 0.0.0.0/0 172.16.0.0/12 + 0 0 RETURN all -- * * 0.0.0.0/0 192.168.0.0/16 + 0 0 RETURN all -- * * 0.0.0.0/0 169.254.0.0/16 + 0 0 RETURN all -- * * 0.0.0.0/0 127.0.0.0/8 + 500000 107374182400 RETURN all -- * * 0.0.0.0/0 0.0.0.0/0 +`, + chain: "dstack-nm-test1234", + expected: 107374182400, // ~100 GB + }, + { + name: "empty output", + output: "", + chain: "dstack-nm-abcd1234", + expectErr: true, + }, + { + name: "only headers no rules", + output: `Chain dstack-nm-abcd1234 (1 references) + pkts bytes target prot opt in out source destination +`, + chain: "dstack-nm-abcd1234", + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := parseByteCounter(tt.output, tt.chain) + if tt.expectErr { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expected, result) + } + }) + } +} + +func TestNew(t *testing.T) { + nm := New("abcdefghijklmnop", 1000000) + assert.Equal(t, int64(1000000), nm.quota) + assert.Equal(t, "dstack-nm-abcdefgh", nm.chainName) +} + +func TestNew_ShortID(t *testing.T) { + nm := New("abc", 500) + assert.Equal(t, "dstack-nm-abc", nm.chainName) +} diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 0c48664a1..175e08a4d 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -152,6 +152,7 @@ class JobTerminationReason(str, Enum): EXECUTOR_ERROR = "executor_error" MAX_DURATION_EXCEEDED = "max_duration_exceeded" LOG_QUOTA_EXCEEDED = "log_quota_exceeded" + DATA_TRANSFER_QUOTA_EXCEEDED = "data_transfer_quota_exceeded" def to_status(self) -> JobStatus: mapping = { @@ -175,6 +176,7 @@ def to_status(self) -> JobStatus: self.EXECUTOR_ERROR: JobStatus.FAILED, self.MAX_DURATION_EXCEEDED: JobStatus.TERMINATED, self.LOG_QUOTA_EXCEEDED: JobStatus.FAILED, + self.DATA_TRANSFER_QUOTA_EXCEEDED: JobStatus.FAILED, } return mapping[self] @@ -208,6 +210,7 @@ def to_error(self) -> Optional[str]: JobTerminationReason.EXECUTOR_ERROR: "executor error", JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded", JobTerminationReason.LOG_QUOTA_EXCEEDED: "log quota exceeded", + JobTerminationReason.DATA_TRANSFER_QUOTA_EXCEEDED: "data transfer quota exceeded", } return error_mapping.get(self) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py index 5e061ee77..6f45a9d16 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py @@ -34,6 +34,7 @@ RunStatus, ) from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint +from dstack._internal.server import settings as server_settings from dstack._internal.server.background.pipeline_tasks.base import ( Fetcher, Heartbeater, @@ -1106,6 +1107,9 @@ def _process_provisioning_with_shim( memory = None network_mode = NetworkMode.HOST image_name = resolve_provisioning_image_name(job_spec, jpd) + data_transfer_quota = 0 + if jpd.backend == BackendType.AWS: + data_transfer_quota = server_settings.SERVER_DATA_TRANSFER_QUOTA_PER_JOB_AWS if shim_client.is_api_v2_supported(): shim_client.submit_task( task_id=job_model.id, @@ -1128,6 +1132,7 @@ def _process_provisioning_with_shim( host_ssh_keys=[ssh_key] if ssh_key else [], container_ssh_keys=public_keys, instance_id=jpd.instance_id, + data_transfer_quota=data_transfer_quota, ) else: submitted = shim_client.submit( diff --git a/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py index 72d60b581..185e3d751 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py @@ -37,6 +37,7 @@ RunStatus, ) from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint +from dstack._internal.server import settings as server_settings from dstack._internal.server.background.scheduled_tasks.common import get_provisioning_timeout from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import ( @@ -786,6 +787,9 @@ def _process_provisioning_with_shim( memory = None network_mode = NetworkMode.HOST image_name = resolve_provisioning_image_name(job_spec, jpd) + data_transfer_quota = 0 + if jpd.backend == BackendType.AWS: + data_transfer_quota = server_settings.SERVER_DATA_TRANSFER_QUOTA_PER_JOB_AWS if shim_client.is_api_v2_supported(): shim_client.submit_task( task_id=job_model.id, @@ -808,6 +812,7 @@ def _process_provisioning_with_shim( host_ssh_keys=[ssh_key] if ssh_key else [], container_ssh_keys=public_keys, instance_id=jpd.instance_id, + data_transfer_quota=data_transfer_quota, ) else: submitted = shim_client.submit( diff --git a/src/dstack/_internal/server/schemas/runner.py b/src/dstack/_internal/server/schemas/runner.py index 549ff7914..5fadde06d 100644 --- a/src/dstack/_internal/server/schemas/runner.py +++ b/src/dstack/_internal/server/schemas/runner.py @@ -245,6 +245,7 @@ class TaskSubmitRequest(CoreModel): host_ssh_user: str host_ssh_keys: list[str] container_ssh_keys: list[str] + data_transfer_quota: int = 0 # total bytes; 0 = unlimited class TaskTerminateRequest(CoreModel): diff --git a/src/dstack/_internal/server/services/runner/client.py b/src/dstack/_internal/server/services/runner/client.py index 4b78eefee..c3500e4a9 100644 --- a/src/dstack/_internal/server/services/runner/client.py +++ b/src/dstack/_internal/server/services/runner/client.py @@ -416,6 +416,7 @@ def submit_task( host_ssh_keys: list[str], container_ssh_keys: list[str], instance_id: str, + data_transfer_quota: int = 0, ) -> None: if not self.is_api_v2_supported(): raise ShimAPIVersionError() @@ -439,6 +440,7 @@ def submit_task( host_ssh_user=host_ssh_user, host_ssh_keys=host_ssh_keys, container_ssh_keys=container_ssh_keys, + data_transfer_quota=data_transfer_quota, ) self._request("POST", "/api/tasks", body, raise_for_status=True) diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index e005a6cb5..f0a434aab 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -145,6 +145,20 @@ os.getenv("DSTACK_SERVER_LOG_QUOTA_PER_JOB_HOUR", 50 * 1024 * 1024) # 50 MB ) +# Per-job data transfer quota for AWS backend: maximum total outbound bytes to external IPs. +# 0 = unlimited. Only applied to instances running on AWS. +# Limitations: +# - Meters all outbound traffic to non-private IPs (excludes 10.0.0.0/8, 172.16.0.0/12, +# 192.168.0.0/16, 169.254.0.0/16). This covers inter-region and internet egress. +# - Does not differentiate by destination region — the same quota applies regardless of +# whether traffic goes to another AWS region ($0.01-0.02/GB) or the internet ($0.09/GB). +# - Only effective on Linux instances with iptables available. +# Task fails with executor_error on systems without iptables if quota is set. +# To add support for other backends, add DSTACK_SERVER_DATA_TRANSFER_QUOTA_PER_JOB_GCP, etc. +SERVER_DATA_TRANSFER_QUOTA_PER_JOB_AWS = int( + os.getenv("DSTACK_SERVER_DATA_TRANSFER_QUOTA_PER_JOB_AWS", 0) # disabled by default +) + # Development settings SQL_ECHO_ENABLED = os.getenv("DSTACK_SQL_ECHO_ENABLED") is not None From 7d1836ad24a8d6f179ef8b0f29dbfaaf204a34c3 Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Sun, 22 Mar 2026 23:41:20 -0700 Subject: [PATCH 2/8] Fix golangci-lint issues: errcheck, gci formatting Co-Authored-By: Claude Opus 4.6 (1M context) --- runner/internal/common/types/types.go | 2 +- runner/internal/shim/docker.go | 2 +- runner/internal/shim/models.go | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/runner/internal/common/types/types.go b/runner/internal/common/types/types.go index c0a34c6e7..524147f03 100644 --- a/runner/internal/common/types/types.go +++ b/runner/internal/common/types/types.go @@ -10,6 +10,6 @@ const ( TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user" TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server" TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded" - TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded" + TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded" TerminationReasonDataTransferQuotaExceeded TerminationReason = "data_transfer_quota_exceeded" ) diff --git a/runner/internal/shim/docker.go b/runner/internal/shim/docker.go index 733420257..019ec7e36 100644 --- a/runner/internal/shim/docker.go +++ b/runner/internal/shim/docker.go @@ -1244,7 +1244,7 @@ func terminateRunner(ctx context.Context, runnerPort int, reason types.Terminati if err != nil { return fmt.Errorf("request failed: %w", err) } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { return fmt.Errorf("unexpected status: %d", resp.StatusCode) } diff --git a/runner/internal/shim/models.go b/runner/internal/shim/models.go index 473ec3b8e..0cdcc6f4c 100644 --- a/runner/internal/shim/models.go +++ b/runner/internal/shim/models.go @@ -99,10 +99,10 @@ type TaskConfig struct { // GPUDevices allows the server to set gpu devices instead of relying on the runner default logic. // E.g. passing nvidia devices directly instead of using nvidia-container-toolkit. GPUDevices []GPUDevice `json:"gpu_devices"` - HostSshUser string `json:"host_ssh_user"` - HostSshKeys []string `json:"host_ssh_keys"` - ContainerSshKeys []string `json:"container_ssh_keys"` - DataTransferQuota int64 `json:"data_transfer_quota"` // total bytes for job lifetime; 0 = unlimited + HostSshUser string `json:"host_ssh_user"` + HostSshKeys []string `json:"host_ssh_keys"` + ContainerSshKeys []string `json:"container_ssh_keys"` + DataTransferQuota int64 `json:"data_transfer_quota"` // total bytes for job lifetime; 0 = unlimited } type TaskListItem struct { From e679bbebbde7d7e7f69e34dfc82321c557fad802 Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Mon, 23 Mar 2026 00:01:47 -0700 Subject: [PATCH 3/8] Fix golangci-lint gci formatting and update tests for data_transfer_quota Co-Authored-By: Claude Opus 4.6 (1M context) --- runner/internal/common/types/types.go | 14 +++++++------- runner/internal/shim/models.go | 10 +++++----- .../background/pipeline_tasks/test_running_jobs.py | 1 + .../scheduled_tasks/test_running_jobs.py | 1 + .../server/services/runner/test_client.py | 1 + 5 files changed, 15 insertions(+), 12 deletions(-) diff --git a/runner/internal/common/types/types.go b/runner/internal/common/types/types.go index 524147f03..ac2755856 100644 --- a/runner/internal/common/types/types.go +++ b/runner/internal/common/types/types.go @@ -3,13 +3,13 @@ package types type TerminationReason string const ( - TerminationReasonExecutorError TerminationReason = "executor_error" - TerminationReasonCreatingContainerError TerminationReason = "creating_container_error" - TerminationReasonContainerExitedWithError TerminationReason = "container_exited_with_error" - TerminationReasonDoneByRunner TerminationReason = "done_by_runner" - TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user" - TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server" - TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded" + TerminationReasonExecutorError TerminationReason = "executor_error" + TerminationReasonCreatingContainerError TerminationReason = "creating_container_error" + TerminationReasonContainerExitedWithError TerminationReason = "container_exited_with_error" + TerminationReasonDoneByRunner TerminationReason = "done_by_runner" + TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user" + TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server" + TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded" TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded" TerminationReasonDataTransferQuotaExceeded TerminationReason = "data_transfer_quota_exceeded" ) diff --git a/runner/internal/shim/models.go b/runner/internal/shim/models.go index 0cdcc6f4c..40e5cb8a1 100644 --- a/runner/internal/shim/models.go +++ b/runner/internal/shim/models.go @@ -98,11 +98,11 @@ type TaskConfig struct { InstanceMounts []InstanceMountPoint `json:"instance_mounts"` // GPUDevices allows the server to set gpu devices instead of relying on the runner default logic. // E.g. passing nvidia devices directly instead of using nvidia-container-toolkit. - GPUDevices []GPUDevice `json:"gpu_devices"` - HostSshUser string `json:"host_ssh_user"` - HostSshKeys []string `json:"host_ssh_keys"` - ContainerSshKeys []string `json:"container_ssh_keys"` - DataTransferQuota int64 `json:"data_transfer_quota"` // total bytes for job lifetime; 0 = unlimited + GPUDevices []GPUDevice `json:"gpu_devices"` + HostSshUser string `json:"host_ssh_user"` + HostSshKeys []string `json:"host_ssh_keys"` + ContainerSshKeys []string `json:"container_ssh_keys"` + DataTransferQuota int64 `json:"data_transfer_quota"` // total bytes for job lifetime; 0 = unlimited } type TaskListItem struct { diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py index a52924a55..eb0cb6b70 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py @@ -573,6 +573,7 @@ async def test_provisioning_shim_with_volumes( host_ssh_keys=["user_ssh_key"], container_ssh_keys=[project_ssh_pub_key, "user_ssh_key"], instance_id=job_provisioning_data.instance_id, + data_transfer_quota=0, ) await session.refresh(job) assert job.status == JobStatus.PULLING diff --git a/src/tests/_internal/server/background/scheduled_tasks/test_running_jobs.py b/src/tests/_internal/server/background/scheduled_tasks/test_running_jobs.py index 66b38f331..862565c64 100644 --- a/src/tests/_internal/server/background/scheduled_tasks/test_running_jobs.py +++ b/src/tests/_internal/server/background/scheduled_tasks/test_running_jobs.py @@ -389,6 +389,7 @@ async def test_provisioning_shim_with_volumes( host_ssh_keys=["user_ssh_key"], container_ssh_keys=[project_ssh_pub_key, "user_ssh_key"], instance_id=job_provisioning_data.instance_id, + data_transfer_quota=0, ) await session.refresh(job) assert job.status == JobStatus.PULLING diff --git a/src/tests/_internal/server/services/runner/test_client.py b/src/tests/_internal/server/services/runner/test_client.py index 588c231a1..dbbece0a5 100644 --- a/src/tests/_internal/server/services/runner/test_client.py +++ b/src/tests/_internal/server/services/runner/test_client.py @@ -455,6 +455,7 @@ def test_submit_task(self, client: ShimClient, adapter: requests_mock.Adapter): "host_ssh_user": "dstack", "host_ssh_keys": ["host_key"], "container_ssh_keys": ["project_key", "user_key"], + "data_transfer_quota": 0, } self.assert_request(adapter, 1, "POST", "/api/tasks", expected_request) From 0a408849381d4cb322e734a874d498969282951f Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Wed, 25 Mar 2026 14:00:53 +0100 Subject: [PATCH 4/8] Refactor data transfer metering from quota enforcement to passive reporting Replace the per-job quota termination approach with per-instance passive metering. The shim starts an iptables-based netmeter at startup that continuously tracks outbound external bytes. The server reads this via the existing /api/instance/health endpoint during periodic health checks (~60s) and captures a final reading before instance termination. Changes: - Netmeter: per-instance chain (dstack-nm), no quota, exposes Bytes() - Shim: starts netmeter at boot, reports via InstanceHealthResponse - Server: stores data_transfer_bytes on InstanceModel, final read at termination - Removed: quota enforcement, /api/terminate endpoint, DATA_TRANSFER_QUOTA_EXCEEDED Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitignore | 2 + .justfile | 2 +- examples/.dstack.yml | 7 +- runner/cmd/shim/main.go | 12 ++ runner/internal/common/types/types.go | 17 ++- runner/internal/runner/api/http.go | 18 --- runner/internal/runner/api/server.go | 1 - runner/internal/runner/schemas/schemas.go | 5 - runner/internal/shim/api/handlers.go | 4 + runner/internal/shim/api/handlers_test.go | 4 +- runner/internal/shim/api/schemas.go | 3 +- runner/internal/shim/api/server.go | 6 + runner/internal/shim/docker.go | 72 +---------- runner/internal/shim/models.go | 9 +- runner/internal/shim/netmeter/netmeter.go | 121 ++++++------------ .../internal/shim/netmeter/netmeter_test.go | 27 ++-- src/dstack/_internal/core/models/instances.py | 1 + src/dstack/_internal/core/models/runs.py | 3 - .../pipeline_tasks/instances/check.py | 4 + .../pipeline_tasks/instances/common.py | 1 + .../pipeline_tasks/instances/termination.py | 39 +++++- .../background/pipeline_tasks/jobs_running.py | 5 - .../background/scheduled_tasks/instances.py | 30 +++++ .../scheduled_tasks/running_jobs.py | 5 - ...f6_add_data_transfer_bytes_to_instances.py | 27 ++++ src/dstack/_internal/server/models.py | 3 + .../_internal/server/schemas/instances.py | 5 +- src/dstack/_internal/server/schemas/runner.py | 2 +- .../_internal/server/services/instances.py | 1 + .../server/services/runner/client.py | 2 - src/dstack/_internal/server/settings.py | 14 -- .../pipeline_tasks/test_running_jobs.py | 1 - .../scheduled_tasks/test_running_jobs.py | 1 - .../server/services/runner/test_client.py | 1 - 34 files changed, 205 insertions(+), 250 deletions(-) create mode 100644 src/dstack/_internal/server/migrations/versions/2026/03_25_1200_a1b2c3d4e5f6_add_data_transfer_bytes_to_instances.py diff --git a/.gitignore b/.gitignore index bed3aa2cc..4c7f02696 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,5 @@ uv.lock profiling_results.html docs/docs/reference/api/rest/openapi.json +.agents/skills +.claude/skills diff --git a/.justfile b/.justfile index 59defcfa1..5be2f0879 100644 --- a/.justfile +++ b/.justfile @@ -17,4 +17,4 @@ import "runner/.justfile" import "frontend/.justfile" docs-serve: - uv run mkdocs serve --livereload -w examples -s + DYLD_FALLBACK_LIBRARY_PATH=/opt/homebrew/lib mkdocs serve --livereload -w examples -s diff --git a/examples/.dstack.yml b/examples/.dstack.yml index dc8a9c58c..352d9e0d1 100644 --- a/examples/.dstack.yml +++ b/examples/.dstack.yml @@ -1,12 +1,11 @@ type: dev-environment -name: cursor +name: dev python: 3.12 -ide: cursor # Mount the repo directory to `/workflow` (the default working directory) repos: - .. -resources: - gpu: 1 +#resources: +# gpu: 1 diff --git a/runner/cmd/shim/main.go b/runner/cmd/shim/main.go index c696bd467..95cdf448b 100644 --- a/runner/cmd/shim/main.go +++ b/runner/cmd/shim/main.go @@ -22,6 +22,7 @@ import ( "github.com/dstackai/dstack/runner/internal/shim/api" "github.com/dstackai/dstack/runner/internal/shim/components" "github.com/dstackai/dstack/runner/internal/shim/dcgm" + "github.com/dstackai/dstack/runner/internal/shim/netmeter" ) // Version is a build-time variable. The value is overridden by ldflags. @@ -270,11 +271,22 @@ func start(ctx context.Context, args shim.CLIArgs, serviceMode bool) (err error) } } + var nm *netmeter.NetMeter + nm = netmeter.New() + if err := nm.Start(ctx); err != nil { + log.Warning(ctx, "data transfer metering unavailable", "err", err) + nm = nil + } else { + log.Info(ctx, "data transfer metering started") + defer nm.Stop() + } + address := fmt.Sprintf("localhost:%d", args.Shim.HTTPPort) shimServer := api.NewShimServer( ctx, address, Version, dockerRunner, dcgmExporter, dcgmWrapper, runnerManager, shimManager, + nm, ) if serviceMode { diff --git a/runner/internal/common/types/types.go b/runner/internal/common/types/types.go index ac2755856..057c0248c 100644 --- a/runner/internal/common/types/types.go +++ b/runner/internal/common/types/types.go @@ -3,13 +3,12 @@ package types type TerminationReason string const ( - TerminationReasonExecutorError TerminationReason = "executor_error" - TerminationReasonCreatingContainerError TerminationReason = "creating_container_error" - TerminationReasonContainerExitedWithError TerminationReason = "container_exited_with_error" - TerminationReasonDoneByRunner TerminationReason = "done_by_runner" - TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user" - TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server" - TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded" - TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded" - TerminationReasonDataTransferQuotaExceeded TerminationReason = "data_transfer_quota_exceeded" + TerminationReasonExecutorError TerminationReason = "executor_error" + TerminationReasonCreatingContainerError TerminationReason = "creating_container_error" + TerminationReasonContainerExitedWithError TerminationReason = "container_exited_with_error" + TerminationReasonDoneByRunner TerminationReason = "done_by_runner" + TerminationReasonTerminatedByUser TerminationReason = "terminated_by_user" + TerminationReasonTerminatedByServer TerminationReason = "terminated_by_server" + TerminationReasonMaxDurationExceeded TerminationReason = "max_duration_exceeded" + TerminationReasonLogQuotaExceeded TerminationReason = "log_quota_exceeded" ) diff --git a/runner/internal/runner/api/http.go b/runner/internal/runner/api/http.go index 800da516f..34220acc6 100644 --- a/runner/internal/runner/api/http.go +++ b/runner/internal/runner/api/http.go @@ -194,24 +194,6 @@ func (s *Server) stopPostHandler(w http.ResponseWriter, r *http.Request) (interf return nil, nil } -func (s *Server) terminatePostHandler(w http.ResponseWriter, r *http.Request) (interface{}, error) { - var body schemas.TerminateBody - if err := api.DecodeJSONBody(w, r, &body, true); err != nil { - return nil, err - } - ctx := r.Context() - log.Error(ctx, "Terminate requested", "reason", body.Reason, "message", body.Message) - // No executor.Lock() needed — SetJobStateWithTerminationReason acquires its own lock. - // Using the external lock would deadlock with io.Copy holding it during job execution. - s.executor.SetJobStateWithTerminationReason( - ctx, - schemas.JobStateFailed, - body.Reason, - body.Message, - ) - return nil, nil -} - func isMaxBytesError(err error) bool { var maxBytesError *http.MaxBytesError return errors.As(err, &maxBytesError) diff --git a/runner/internal/runner/api/server.go b/runner/internal/runner/api/server.go index 478361232..11b76d887 100644 --- a/runner/internal/runner/api/server.go +++ b/runner/internal/runner/api/server.go @@ -68,7 +68,6 @@ func NewServer(ctx context.Context, address string, version string, ex executor. r.AddHandler("POST", "/api/run", s.runPostHandler) r.AddHandler("GET", "/api/pull", s.pullGetHandler) r.AddHandler("POST", "/api/stop", s.stopPostHandler) - r.AddHandler("POST", "/api/terminate", s.terminatePostHandler) r.AddHandler("GET", "/logs_ws", s.logsWsGetHandler) return s, nil } diff --git a/runner/internal/runner/schemas/schemas.go b/runner/internal/runner/schemas/schemas.go index a3f83fee0..47706228c 100644 --- a/runner/internal/runner/schemas/schemas.go +++ b/runner/internal/runner/schemas/schemas.go @@ -39,11 +39,6 @@ type SubmitBody struct { LogQuotaHour int `json:"log_quota_hour"` // bytes per hour, 0 = unlimited } -type TerminateBody struct { - Reason types.TerminationReason `json:"reason"` - Message string `json:"message"` -} - type PullResponse struct { JobStates []JobStateEvent `json:"job_states"` JobLogs []LogEvent `json:"job_logs"` diff --git a/runner/internal/shim/api/handlers.go b/runner/internal/shim/api/handlers.go index b3382d0f2..37de925a6 100644 --- a/runner/internal/shim/api/handlers.go +++ b/runner/internal/shim/api/handlers.go @@ -47,6 +47,10 @@ func (s *ShimServer) InstanceHealthHandler(w http.ResponseWriter, r *http.Reques response.DCGM = &dcgmHealth } } + if s.netMeter != nil { + b := s.netMeter.Bytes() + response.DataTransferBytes = &b + } return &response, nil } diff --git a/runner/internal/shim/api/handlers_test.go b/runner/internal/shim/api/handlers_test.go index bb19ebbf1..751151c81 100644 --- a/runner/internal/shim/api/handlers_test.go +++ b/runner/internal/shim/api/handlers_test.go @@ -13,7 +13,7 @@ func TestHealthcheck(t *testing.T) { request := httptest.NewRequest("GET", "/api/healthcheck", nil) responseRecorder := httptest.NewRecorder() - server := NewShimServer(context.Background(), ":12345", "0.0.1.dev2", NewDummyRunner(), nil, nil, nil, nil) + server := NewShimServer(context.Background(), ":12345", "0.0.1.dev2", NewDummyRunner(), nil, nil, nil, nil, nil) f := commonapi.JSONResponseHandler(server.HealthcheckHandler) f(responseRecorder, request) @@ -30,7 +30,7 @@ func TestHealthcheck(t *testing.T) { } func TestTaskSubmit(t *testing.T) { - server := NewShimServer(context.Background(), ":12340", "0.0.1.dev2", NewDummyRunner(), nil, nil, nil, nil) + server := NewShimServer(context.Background(), ":12340", "0.0.1.dev2", NewDummyRunner(), nil, nil, nil, nil, nil) requestBody := `{ "id": "dummy-id", "name": "dummy-name", diff --git a/runner/internal/shim/api/schemas.go b/runner/internal/shim/api/schemas.go index cd0db6a20..adc272375 100644 --- a/runner/internal/shim/api/schemas.go +++ b/runner/internal/shim/api/schemas.go @@ -16,7 +16,8 @@ type ShutdownRequest struct { } type InstanceHealthResponse struct { - DCGM *dcgm.Health `json:"dcgm"` + DCGM *dcgm.Health `json:"dcgm"` + DataTransferBytes *int64 `json:"data_transfer_bytes,omitempty"` } type TaskListResponse struct { diff --git a/runner/internal/shim/api/server.go b/runner/internal/shim/api/server.go index 9008aa2ef..98378bce7 100644 --- a/runner/internal/shim/api/server.go +++ b/runner/internal/shim/api/server.go @@ -13,6 +13,7 @@ import ( "github.com/dstackai/dstack/runner/internal/shim" "github.com/dstackai/dstack/runner/internal/shim/components" "github.com/dstackai/dstack/runner/internal/shim/dcgm" + "github.com/dstackai/dstack/runner/internal/shim/netmeter" ) type TaskRunner interface { @@ -45,6 +46,8 @@ type ShimServer struct { runnerManager components.ComponentManager shimManager components.ComponentManager + netMeter *netmeter.NetMeter // may be nil if metering is unavailable + version string } @@ -52,6 +55,7 @@ func NewShimServer( ctx context.Context, address string, version string, runner TaskRunner, dcgmExporter *dcgm.DCGMExporter, dcgmWrapper dcgm.DCGMWrapperInterface, runnerManager components.ComponentManager, shimManager components.ComponentManager, + nm *netmeter.NetMeter, ) *ShimServer { bgJobsCtx, bgJobsCancel := context.WithCancel(ctx) if dcgmWrapper != nil && reflect.ValueOf(dcgmWrapper).IsNil() { @@ -78,6 +82,8 @@ func NewShimServer( runnerManager: runnerManager, shimManager: shimManager, + netMeter: nm, + version: version, } diff --git a/runner/internal/shim/docker.go b/runner/internal/shim/docker.go index 019ec7e36..ad26b590a 100644 --- a/runner/internal/shim/docker.go +++ b/runner/internal/shim/docker.go @@ -9,7 +9,6 @@ import ( "errors" "fmt" "io" - "net/http" "os" "os/exec" "os/user" @@ -38,7 +37,6 @@ import ( "github.com/dstackai/dstack/runner/internal/common/types" "github.com/dstackai/dstack/runner/internal/shim/backends" "github.com/dstackai/dstack/runner/internal/shim/host" - "github.com/dstackai/dstack/runner/internal/shim/netmeter" ) // TODO: Allow for configuration via cli arguments or environment variables. @@ -383,7 +381,7 @@ func (d *DockerRunner) Run(ctx context.Context, taskID string) error { return fmt.Errorf("%w: failed to update task %s: %w", ErrInternal, task.ID, err) } - err = d.waitContainerWithQuota(ctx, &task, cfg) + err = d.waitContainer(ctx, &task) } if err != nil { log.Error(ctx, "failed to run container", "err", err) @@ -913,49 +911,6 @@ func (d *DockerRunner) waitContainer(ctx context.Context, task *Task) error { return nil } -// waitContainerWithQuota waits for the container to finish, optionally enforcing -// a data transfer quota. If the quota is exceeded, it notifies the runner -// (so the server reads the termination reason via /api/pull) and stops the container. -func (d *DockerRunner) waitContainerWithQuota(ctx context.Context, task *Task, cfg TaskConfig) error { - if cfg.DataTransferQuota <= 0 { - return d.waitContainer(ctx, task) - } - - nm := netmeter.New(task.ID, cfg.DataTransferQuota) - if err := nm.Start(ctx); err != nil { - errMessage := fmt.Sprintf("data transfer quota configured but metering unavailable: %s", err) - log.Error(ctx, errMessage) - task.SetStatusTerminated(string(types.TerminationReasonExecutorError), errMessage) - return fmt.Errorf("data transfer meter: %w", err) - } - defer nm.Stop() - - waitDone := make(chan error, 1) - go func() { waitDone <- d.waitContainer(ctx, task) }() - - select { - case err := <-waitDone: - return err - case <-nm.Exceeded(): - log.Error(ctx, "Data transfer quota exceeded", "task", task.ID, "quota", cfg.DataTransferQuota) - terminateMsg := fmt.Sprintf("Outbound data transfer exceeded quota of %d bytes", cfg.DataTransferQuota) - if err := terminateRunner(ctx, d.dockerParams.RunnerHTTPPort(), - types.TerminationReasonDataTransferQuotaExceeded, terminateMsg); err != nil { - log.Error(ctx, "failed to notify runner of termination", "err", err) - } - stopTimeout := 10 - stopOpts := container.StopOptions{Timeout: &stopTimeout} - if err := d.client.ContainerStop(ctx, task.containerID, stopOpts); err != nil { - log.Error(ctx, "failed to stop container after quota exceeded", "err", err) - } - <-waitDone - // The runner already set the job state with the termination reason. - // The server will read it via /api/pull. - task.SetStatusTerminated(string(types.TerminationReasonDoneByRunner), "") - return nil - } -} - func encodeRegistryAuth(username string, password string) (string, error) { if username == "" && password == "" { return "", nil @@ -1226,31 +1181,6 @@ func getContainerLastLogs(ctx context.Context, client docker.APIClient, containe return lines, nil } -// terminateRunner calls the runner's /api/terminate endpoint to set the job termination state. -// This allows the server to read the termination reason via /api/pull before the container dies. -func terminateRunner(ctx context.Context, runnerPort int, reason types.TerminationReason, message string) error { - url := fmt.Sprintf("http://localhost:%d/api/terminate", runnerPort) - body := fmt.Sprintf(`{"reason":%q,"message":%q}`, reason, message) - // 5s is generous for a localhost HTTP call; if the runner doesn't respond in time, - // we proceed with stopping the container anyway (the server will handle the termination). - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - req, err := http.NewRequestWithContext(ctx, "POST", url, strings.NewReader(body)) - if err != nil { - return fmt.Errorf("create request: %w", err) - } - req.Header.Set("Content-Type", "application/json") - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("request failed: %w", err) - } - defer func() { _ = resp.Body.Close() }() - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected status: %d", resp.StatusCode) - } - return nil -} - /* DockerParameters interface implementation for CLIArgs */ func (c *CLIArgs) DockerPrivileged() bool { diff --git a/runner/internal/shim/models.go b/runner/internal/shim/models.go index 40e5cb8a1..e7913aced 100644 --- a/runner/internal/shim/models.go +++ b/runner/internal/shim/models.go @@ -98,11 +98,10 @@ type TaskConfig struct { InstanceMounts []InstanceMountPoint `json:"instance_mounts"` // GPUDevices allows the server to set gpu devices instead of relying on the runner default logic. // E.g. passing nvidia devices directly instead of using nvidia-container-toolkit. - GPUDevices []GPUDevice `json:"gpu_devices"` - HostSshUser string `json:"host_ssh_user"` - HostSshKeys []string `json:"host_ssh_keys"` - ContainerSshKeys []string `json:"container_ssh_keys"` - DataTransferQuota int64 `json:"data_transfer_quota"` // total bytes for job lifetime; 0 = unlimited + GPUDevices []GPUDevice `json:"gpu_devices"` + HostSshUser string `json:"host_ssh_user"` + HostSshKeys []string `json:"host_ssh_keys"` + ContainerSshKeys []string `json:"container_ssh_keys"` } type TaskListItem struct { diff --git a/runner/internal/shim/netmeter/netmeter.go b/runner/internal/shim/netmeter/netmeter.go index bf438b946..43993e4a5 100644 --- a/runner/internal/shim/netmeter/netmeter.go +++ b/runner/internal/shim/netmeter/netmeter.go @@ -7,7 +7,7 @@ import ( "os/exec" "strconv" "strings" - "sync" + "sync/atomic" "time" "github.com/dstackai/dstack/runner/internal/common/log" @@ -15,35 +15,23 @@ import ( const ( pollInterval = 10 * time.Second - chainPrefix = "dstack-nm-" + chainName = "dstack-nm" ) // NetMeter monitors outbound data transfer using iptables byte counters. // It excludes private/VPC traffic and counts only external (billable) bytes. -// When cumulative bytes exceed the configured quota, the Exceeded() channel is closed. +// The meter runs for the lifetime of the shim process (per-instance, not per-task). type NetMeter struct { - quota int64 // total bytes for job lifetime - chainName string // unique iptables chain name - - exceeded chan struct{} - exceededOnce sync.Once - stopCh chan struct{} - stopped chan struct{} + bytes atomic.Int64 + stopCh chan struct{} + stopped chan struct{} } -// New creates a new NetMeter with the given quota in bytes. -func New(taskID string, quota int64) *NetMeter { - // Use first 8 chars of task ID for chain name uniqueness - suffix := taskID - if len(suffix) > 8 { - suffix = suffix[:8] - } +// New creates a new NetMeter. +func New() *NetMeter { return &NetMeter{ - quota: quota, - chainName: chainPrefix + suffix, - exceeded: make(chan struct{}), - stopCh: make(chan struct{}), - stopped: make(chan struct{}), + stopCh: make(chan struct{}), + stopped: make(chan struct{}), } } @@ -53,7 +41,10 @@ func (m *NetMeter) Start(ctx context.Context) error { return fmt.Errorf("iptables not available: %w", err) } - if err := m.setupChain(ctx); err != nil { + // Clean up any orphaned chain from a previous shim process + cleanupChain(ctx) + + if err := setupChain(ctx); err != nil { return fmt.Errorf("setup iptables chain: %w", err) } @@ -67,9 +58,9 @@ func (m *NetMeter) Stop() { <-m.stopped } -// Exceeded returns a channel that is closed when the quota is exceeded. -func (m *NetMeter) Exceeded() <-chan struct{} { - return m.exceeded +// Bytes returns the cumulative external outbound byte count (thread-safe). +func (m *NetMeter) Bytes() int64 { + return m.bytes.Load() } func checkIptables() error { @@ -77,9 +68,9 @@ func checkIptables() error { return err } -func (m *NetMeter) setupChain(ctx context.Context) error { +func setupChain(ctx context.Context) error { // Create the chain - if err := iptables(ctx, "-N", m.chainName); err != nil { + if err := iptables(ctx, "-N", chainName); err != nil { return fmt.Errorf("create chain: %w", err) } @@ -95,45 +86,43 @@ func (m *NetMeter) setupChain(ctx context.Context) error { {"127.0.0.0/8", "loopback"}, } for _, p := range privateCIDRs { - if err := iptables(ctx, "-A", m.chainName, "-d", p.cidr, "-j", "RETURN"); err != nil { - m.cleanup(ctx) + if err := iptables(ctx, "-A", chainName, "-d", p.cidr, "-j", "RETURN"); err != nil { + cleanupChain(ctx) return fmt.Errorf("add exclusion rule for %s: %w", p.comment, err) } } // Add catch-all counting rule (counts all remaining = external/billable bytes) - if err := iptables(ctx, "-A", m.chainName, "-j", "RETURN"); err != nil { - m.cleanup(ctx) + if err := iptables(ctx, "-A", chainName, "-j", "RETURN"); err != nil { + cleanupChain(ctx) return fmt.Errorf("add counting rule: %w", err) } // Insert jump from OUTPUT chain (catches host-mode Docker and host processes) - if err := iptables(ctx, "-I", "OUTPUT", "-j", m.chainName); err != nil { - m.cleanup(ctx) + if err := iptables(ctx, "-I", "OUTPUT", "-j", chainName); err != nil { + cleanupChain(ctx) return fmt.Errorf("insert OUTPUT jump: %w", err) } // Insert jump from FORWARD chain (catches bridge-mode Docker traffic) - if err := iptables(ctx, "-I", "FORWARD", "-j", m.chainName); err != nil { - m.cleanup(ctx) + if err := iptables(ctx, "-I", "FORWARD", "-j", chainName); err != nil { + cleanupChain(ctx) return fmt.Errorf("insert FORWARD jump: %w", err) } return nil } -func (m *NetMeter) cleanup(ctx context.Context) { - // Remove jumps from OUTPUT and FORWARD (ignore errors — may not exist if setup failed partway) - _ = iptables(ctx, "-D", "OUTPUT", "-j", m.chainName) - _ = iptables(ctx, "-D", "FORWARD", "-j", m.chainName) - // Flush and delete chain - _ = iptables(ctx, "-F", m.chainName) - _ = iptables(ctx, "-X", m.chainName) +func cleanupChain(ctx context.Context) { + _ = iptables(ctx, "-D", "OUTPUT", "-j", chainName) + _ = iptables(ctx, "-D", "FORWARD", "-j", chainName) + _ = iptables(ctx, "-F", chainName) + _ = iptables(ctx, "-X", chainName) } func (m *NetMeter) pollLoop(ctx context.Context) { defer close(m.stopped) - defer m.cleanup(ctx) + defer cleanupChain(ctx) ticker := time.NewTicker(pollInterval) defer ticker.Stop() @@ -143,28 +132,24 @@ func (m *NetMeter) pollLoop(ctx context.Context) { case <-m.stopCh: return case <-ticker.C: - bytes, err := m.readCounter(ctx) + b, err := readCounter(ctx) if err != nil { - log.Error(ctx, "failed to read network counter", "chain", m.chainName, "err", err) + log.Error(ctx, "failed to read data transfer counter", "err", err) continue } - if bytes > m.quota { - log.Error(ctx, "data transfer quota exceeded", - "chain", m.chainName, "bytes", bytes, "quota", m.quota) - m.exceededOnce.Do(func() { close(m.exceeded) }) - return - } + m.bytes.Store(b) + log.Debug(ctx, "data transfer meter poll", "bytes", b) } } } // readCounter reads the cumulative byte count from the catch-all rule (last rule in chain). -func (m *NetMeter) readCounter(ctx context.Context) (int64, error) { - output, err := iptablesOutput(ctx, "-L", m.chainName, "-v", "-x", "-n") +func readCounter(ctx context.Context) (int64, error) { + output, err := iptablesOutput(ctx, "-L", chainName, "-v", "-x", "-n") if err != nil { return 0, err } - return parseByteCounter(output, m.chainName) + return parseByteCounter(output) } // parseByteCounter extracts the byte count from the last rule (catch-all counting rule) @@ -172,7 +157,7 @@ func (m *NetMeter) readCounter(ctx context.Context) (int64, error) { // // Example output: // -// Chain dstack-nm-abcd1234 (1 references) +// Chain dstack-nm (1 references) // pkts bytes target prot opt in out source destination // 0 0 RETURN all -- * * 0.0.0.0/0 10.0.0.0/8 // 0 0 RETURN all -- * * 0.0.0.0/0 172.16.0.0/12 @@ -182,7 +167,7 @@ func (m *NetMeter) readCounter(ctx context.Context) (int64, error) { // 123 456789 RETURN all -- * * 0.0.0.0/0 0.0.0.0/0 // // The last rule (destination 0.0.0.0/0) is the catch-all; its bytes field is what we want. -func parseByteCounter(output string, chainName string) (int64, error) { +func parseByteCounter(output string) (int64, error) { lines := strings.Split(strings.TrimSpace(output), "\n") // Find lines that are rule entries (skip header lines) @@ -240,25 +225,3 @@ func iptablesOutput(ctx context.Context, args ...string) (string, error) { } return stdout.String(), nil } - -// CleanupOrphanedChains removes any leftover dstack-nm-* chains from previous runs. -// Call this on shim startup. -func CleanupOrphanedChains(ctx context.Context) { - output, err := iptablesOutput(ctx, "-L", "-n") - if err != nil { - return - } - for _, line := range strings.Split(output, "\n") { - if strings.HasPrefix(line, "Chain "+chainPrefix) { - fields := strings.Fields(line) - if len(fields) >= 2 { - chainName := fields[1] - log.Info(ctx, "cleaning up orphaned data transfer meter chain", "chain", chainName) - _ = iptables(ctx, "-D", "OUTPUT", "-j", chainName) - _ = iptables(ctx, "-D", "FORWARD", "-j", chainName) - _ = iptables(ctx, "-F", chainName) - _ = iptables(ctx, "-X", chainName) - } - } - } -} diff --git a/runner/internal/shim/netmeter/netmeter_test.go b/runner/internal/shim/netmeter/netmeter_test.go index 055843e93..5af00c61e 100644 --- a/runner/internal/shim/netmeter/netmeter_test.go +++ b/runner/internal/shim/netmeter/netmeter_test.go @@ -11,13 +11,12 @@ func TestParseByteCounter(t *testing.T) { tests := []struct { name string output string - chain string expected int64 expectErr bool }{ { name: "typical output with traffic", - output: `Chain dstack-nm-abcd1234 (1 references) + output: `Chain dstack-nm (1 references) pkts bytes target prot opt in out source destination 0 0 RETURN all -- * * 0.0.0.0/0 10.0.0.0/8 0 0 RETURN all -- * * 0.0.0.0/0 172.16.0.0/12 @@ -26,12 +25,11 @@ func TestParseByteCounter(t *testing.T) { 0 0 RETURN all -- * * 0.0.0.0/0 127.0.0.0/8 123 456789 RETURN all -- * * 0.0.0.0/0 0.0.0.0/0 `, - chain: "dstack-nm-abcd1234", expected: 456789, }, { name: "zero traffic", - output: `Chain dstack-nm-abcd1234 (1 references) + output: `Chain dstack-nm (1 references) pkts bytes target prot opt in out source destination 0 0 RETURN all -- * * 0.0.0.0/0 10.0.0.0/8 0 0 RETURN all -- * * 0.0.0.0/0 172.16.0.0/12 @@ -40,12 +38,11 @@ func TestParseByteCounter(t *testing.T) { 0 0 RETURN all -- * * 0.0.0.0/0 127.0.0.0/8 0 0 RETURN all -- * * 0.0.0.0/0 0.0.0.0/0 `, - chain: "dstack-nm-abcd1234", expected: 0, }, { name: "large byte count", - output: `Chain dstack-nm-test1234 (1 references) + output: `Chain dstack-nm (1 references) pkts bytes target prot opt in out source destination 10000 5000000 RETURN all -- * * 0.0.0.0/0 10.0.0.0/8 0 0 RETURN all -- * * 0.0.0.0/0 172.16.0.0/12 @@ -54,28 +51,25 @@ func TestParseByteCounter(t *testing.T) { 0 0 RETURN all -- * * 0.0.0.0/0 127.0.0.0/8 500000 107374182400 RETURN all -- * * 0.0.0.0/0 0.0.0.0/0 `, - chain: "dstack-nm-test1234", expected: 107374182400, // ~100 GB }, { name: "empty output", output: "", - chain: "dstack-nm-abcd1234", expectErr: true, }, { name: "only headers no rules", - output: `Chain dstack-nm-abcd1234 (1 references) + output: `Chain dstack-nm (1 references) pkts bytes target prot opt in out source destination `, - chain: "dstack-nm-abcd1234", expectErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result, err := parseByteCounter(tt.output, tt.chain) + result, err := parseByteCounter(tt.output) if tt.expectErr { assert.Error(t, err) } else { @@ -87,12 +81,7 @@ func TestParseByteCounter(t *testing.T) { } func TestNew(t *testing.T) { - nm := New("abcdefghijklmnop", 1000000) - assert.Equal(t, int64(1000000), nm.quota) - assert.Equal(t, "dstack-nm-abcdefgh", nm.chainName) -} - -func TestNew_ShortID(t *testing.T) { - nm := New("abc", 500) - assert.Equal(t, "dstack-nm-abc", nm.chainName) + nm := New() + assert.NotNil(t, nm) + assert.Equal(t, int64(0), nm.Bytes()) } diff --git a/src/dstack/_internal/core/models/instances.py b/src/dstack/_internal/core/models/instances.py index 11a1aca51..392f4bb51 100644 --- a/src/dstack/_internal/core/models/instances.py +++ b/src/dstack/_internal/core/models/instances.py @@ -356,3 +356,4 @@ class Instance(CoreModel): price: Optional[float] = None total_blocks: Optional[int] = None busy_blocks: int = 0 + data_transfer_bytes: int = 0 diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 175e08a4d..0c48664a1 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -152,7 +152,6 @@ class JobTerminationReason(str, Enum): EXECUTOR_ERROR = "executor_error" MAX_DURATION_EXCEEDED = "max_duration_exceeded" LOG_QUOTA_EXCEEDED = "log_quota_exceeded" - DATA_TRANSFER_QUOTA_EXCEEDED = "data_transfer_quota_exceeded" def to_status(self) -> JobStatus: mapping = { @@ -176,7 +175,6 @@ def to_status(self) -> JobStatus: self.EXECUTOR_ERROR: JobStatus.FAILED, self.MAX_DURATION_EXCEEDED: JobStatus.TERMINATED, self.LOG_QUOTA_EXCEEDED: JobStatus.FAILED, - self.DATA_TRANSFER_QUOTA_EXCEEDED: JobStatus.FAILED, } return mapping[self] @@ -210,7 +208,6 @@ def to_error(self) -> Optional[str]: JobTerminationReason.EXECUTOR_ERROR: "executor error", JobTerminationReason.MAX_DURATION_EXCEEDED: "max duration exceeded", JobTerminationReason.LOG_QUOTA_EXCEEDED: "log quota exceeded", - JobTerminationReason.DATA_TRANSFER_QUOTA_EXCEEDED: "data transfer quota exceeded", } return error_mapping.get(self) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/instances/check.py b/src/dstack/_internal/server/background/pipeline_tasks/instances/check.py index d23d536cd..cf1998dc4 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/instances/check.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/instances/check.py @@ -166,6 +166,10 @@ async def check_instance(instance_model: InstanceModel) -> ProcessResult: status=health_status, response=instance_check.health_response.json(), ) + if instance_check.health_response.data_transfer_bytes is not None: + result.instance_update_map["data_transfer_bytes"] = ( + instance_check.health_response.data_transfer_bytes + ) set_health_update( update_map=result.instance_update_map, diff --git a/src/dstack/_internal/server/background/pipeline_tasks/instances/common.py b/src/dstack/_internal/server/background/pipeline_tasks/instances/common.py index 34e80311f..59937a72c 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/instances/common.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/instances/common.py @@ -52,6 +52,7 @@ class InstanceUpdateMap(ItemUpdateMap, total=False): job_provisioning_data: str total_blocks: int busy_blocks: int + data_transfer_bytes: int deleted: bool deleted_at: UpdateMapDateTime diff --git a/src/dstack/_internal/server/background/pipeline_tasks/instances/termination.py b/src/dstack/_internal/server/background/pipeline_tasks/instances/termination.py index eb1f3c8a3..28f09c8be 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/instances/termination.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/instances/termination.py @@ -1,6 +1,8 @@ +from dstack._internal.core.consts import DSTACK_SHIM_HTTP_PORT from dstack._internal.core.errors import BackendError, NotYetTerminated from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.instances import InstanceStatus +from dstack._internal.core.models.runs import JobProvisioningData from dstack._internal.server.background.pipeline_tasks.base import NOW_PLACEHOLDER from dstack._internal.server.background.pipeline_tasks.instances.common import ( ProcessResult, @@ -10,7 +12,12 @@ ) from dstack._internal.server.models import InstanceModel from dstack._internal.server.services import backends as backends_services -from dstack._internal.server.services.instances import get_instance_provisioning_data +from dstack._internal.server.services.instances import ( + get_instance_provisioning_data, + get_instance_ssh_private_keys, +) +from dstack._internal.server.services.runner import client as runner_client +from dstack._internal.server.services.runner.ssh import runner_ssh_tunnel from dstack._internal.utils.common import get_current_datetime, run_async from dstack._internal.utils.logging import get_logger @@ -39,6 +46,7 @@ async def terminate_instance(instance_model: InstanceModel) -> ProcessResult: job_provisioning_data.backend, ) else: + await _capture_final_data_transfer_bytes(instance_model, job_provisioning_data, result) logger.debug("Terminating runner instance %s", job_provisioning_data.hostname) try: await run_async( @@ -86,3 +94,32 @@ async def terminate_instance(instance_model: InstanceModel) -> ProcessResult: new_status=InstanceStatus.TERMINATED, ) return result + + +async def _capture_final_data_transfer_bytes( + instance_model: InstanceModel, + jpd: JobProvisioningData, + result: ProcessResult, +) -> None: + """Best-effort final read of data_transfer_bytes before the instance is destroyed.""" + try: + health_response = await run_async( + _read_instance_health, + get_instance_ssh_private_keys(instance_model), + jpd, + instance=instance_model, + ) + if health_response is not None and health_response.data_transfer_bytes is not None: + result.instance_update_map["data_transfer_bytes"] = health_response.data_transfer_bytes + except Exception as exc: + logger.debug( + "Failed to capture final data_transfer_bytes for %s: %s", + instance_model.name, + exc, + ) + + +@runner_ssh_tunnel(ports=[DSTACK_SHIM_HTTP_PORT], retries=1) +def _read_instance_health(ports, *, instance): + shim_client = runner_client.ShimClient(port=ports[DSTACK_SHIM_HTTP_PORT]) + return shim_client.get_instance_health() diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py index 6f45a9d16..5e061ee77 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_running.py @@ -34,7 +34,6 @@ RunStatus, ) from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint -from dstack._internal.server import settings as server_settings from dstack._internal.server.background.pipeline_tasks.base import ( Fetcher, Heartbeater, @@ -1107,9 +1106,6 @@ def _process_provisioning_with_shim( memory = None network_mode = NetworkMode.HOST image_name = resolve_provisioning_image_name(job_spec, jpd) - data_transfer_quota = 0 - if jpd.backend == BackendType.AWS: - data_transfer_quota = server_settings.SERVER_DATA_TRANSFER_QUOTA_PER_JOB_AWS if shim_client.is_api_v2_supported(): shim_client.submit_task( task_id=job_model.id, @@ -1132,7 +1128,6 @@ def _process_provisioning_with_shim( host_ssh_keys=[ssh_key] if ssh_key else [], container_ssh_keys=public_keys, instance_id=jpd.instance_id, - data_transfer_quota=data_transfer_quota, ) else: submitted = shim_client.submit( diff --git a/src/dstack/_internal/server/background/scheduled_tasks/instances.py b/src/dstack/_internal/server/background/scheduled_tasks/instances.py index 5c041dc2b..972ad0200 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/instances.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/instances.py @@ -763,6 +763,8 @@ async def _check_instance(session: AsyncSession, instance: InstanceModel) -> Non response=instance_check.health_response.json(), ) session.add(health_check_model) + if instance_check.health_response.data_transfer_bytes is not None: + instance.data_transfer_bytes = instance_check.health_response.data_transfer_bytes _set_health(session, instance, health_status) _set_unreachable(session, instance, unreachable=not instance_check.reachable) @@ -1127,6 +1129,7 @@ async def _terminate(session: AsyncSession, instance: InstanceModel) -> None: jpd.backend, ) else: + await _capture_final_data_transfer_bytes(instance, jpd) logger.debug("Terminating runner instance %s", jpd.hostname) try: await run_async( @@ -1165,6 +1168,33 @@ async def _terminate(session: AsyncSession, instance: InstanceModel) -> None: switch_instance_status(session, instance, InstanceStatus.TERMINATED) +async def _capture_final_data_transfer_bytes( + instance: InstanceModel, jpd: JobProvisioningData +) -> None: + """Best-effort final read of data_transfer_bytes before the instance is destroyed.""" + try: + health_response = await run_async( + _read_instance_health, + get_instance_ssh_private_keys(instance), + jpd, + instance=instance, + ) + if health_response is not None and health_response.data_transfer_bytes is not None: + instance.data_transfer_bytes = health_response.data_transfer_bytes + except Exception as exc: + logger.debug( + "Failed to capture final data_transfer_bytes for %s: %s", + instance.name, + exc, + ) + + +@runner_ssh_tunnel(ports=[DSTACK_SHIM_HTTP_PORT], retries=1) +def _read_instance_health(ports, *, instance): + shim_client = runner_client.ShimClient(port=ports[DSTACK_SHIM_HTTP_PORT]) + return shim_client.get_instance_health() + + def _set_health(session: AsyncSession, instance: InstanceModel, health: HealthStatus) -> None: if instance.health != health: events.emit( diff --git a/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py index 185e3d751..72d60b581 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py @@ -37,7 +37,6 @@ RunStatus, ) from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint -from dstack._internal.server import settings as server_settings from dstack._internal.server.background.scheduled_tasks.common import get_provisioning_timeout from dstack._internal.server.db import get_db, get_session_ctx from dstack._internal.server.models import ( @@ -787,9 +786,6 @@ def _process_provisioning_with_shim( memory = None network_mode = NetworkMode.HOST image_name = resolve_provisioning_image_name(job_spec, jpd) - data_transfer_quota = 0 - if jpd.backend == BackendType.AWS: - data_transfer_quota = server_settings.SERVER_DATA_TRANSFER_QUOTA_PER_JOB_AWS if shim_client.is_api_v2_supported(): shim_client.submit_task( task_id=job_model.id, @@ -812,7 +808,6 @@ def _process_provisioning_with_shim( host_ssh_keys=[ssh_key] if ssh_key else [], container_ssh_keys=public_keys, instance_id=jpd.instance_id, - data_transfer_quota=data_transfer_quota, ) else: submitted = shim_client.submit( diff --git a/src/dstack/_internal/server/migrations/versions/2026/03_25_1200_a1b2c3d4e5f6_add_data_transfer_bytes_to_instances.py b/src/dstack/_internal/server/migrations/versions/2026/03_25_1200_a1b2c3d4e5f6_add_data_transfer_bytes_to_instances.py new file mode 100644 index 000000000..4e7d20d66 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/2026/03_25_1200_a1b2c3d4e5f6_add_data_transfer_bytes_to_instances.py @@ -0,0 +1,27 @@ +"""Add data_transfer_bytes to instances + +Revision ID: a1b2c3d4e5f6 +Revises: 8b6d5d8c1b9a +Create Date: 2026-03-25 12:00:00.000000+00:00 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "a1b2c3d4e5f6" +down_revision = "8b6d5d8c1b9a" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.add_column( + "instances", + sa.Column("data_transfer_bytes", sa.BigInteger(), nullable=False, server_default="0"), + ) + + +def downgrade() -> None: + op.drop_column("instances", "data_transfer_bytes") diff --git a/src/dstack/_internal/server/models.py b/src/dstack/_internal/server/models.py index b599c4314..156f502d7 100644 --- a/src/dstack/_internal/server/models.py +++ b/src/dstack/_internal/server/models.py @@ -776,6 +776,9 @@ class InstanceModel(PipelineModelMixin, BaseModel): """`total_blocks` uses `NULL` to mean `auto` during provisioning; once ready it is not `NULL`.""" busy_blocks: Mapped[int] = mapped_column(Integer, default=0) + data_transfer_bytes: Mapped[int] = mapped_column(BigInteger, default=0) + """Cumulative outbound data transfer bytes (external/billable traffic only).""" + jobs: Mapped[list["JobModel"]] = relationship(back_populates="instance") last_job_processed_at: Mapped[Optional[datetime]] = mapped_column(NaiveDateTime) diff --git a/src/dstack/_internal/server/schemas/instances.py b/src/dstack/_internal/server/schemas/instances.py index 8f87935b9..fd72bbf66 100644 --- a/src/dstack/_internal/server/schemas/instances.py +++ b/src/dstack/_internal/server/schemas/instances.py @@ -37,7 +37,10 @@ def get_health_status(self) -> HealthStatus: def has_health_checks(self) -> bool: if self.health_response is None: return False - return self.health_response.dcgm is not None + return ( + self.health_response.dcgm is not None + or self.health_response.data_transfer_bytes is not None + ) class GetInstanceHealthChecksRequest(CoreModel): diff --git a/src/dstack/_internal/server/schemas/runner.py b/src/dstack/_internal/server/schemas/runner.py index 5fadde06d..fe6d14cb5 100644 --- a/src/dstack/_internal/server/schemas/runner.py +++ b/src/dstack/_internal/server/schemas/runner.py @@ -128,6 +128,7 @@ class HealthcheckResponse(CoreModel): class InstanceHealthResponse(CoreModel): dcgm: Optional[DCGMHealthResponse] = None + data_transfer_bytes: Optional[int] = None class ShutdownRequest(CoreModel): @@ -245,7 +246,6 @@ class TaskSubmitRequest(CoreModel): host_ssh_user: str host_ssh_keys: list[str] container_ssh_keys: list[str] - data_transfer_quota: int = 0 # total bytes; 0 = unlimited class TaskTerminateRequest(CoreModel): diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index d54ec8b68..710cf2444 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -241,6 +241,7 @@ def instance_model_to_instance(instance_model: InstanceModel) -> Instance: finished_at=instance_model.finished_at, total_blocks=instance_model.total_blocks, busy_blocks=instance_model.busy_blocks, + data_transfer_bytes=instance_model.data_transfer_bytes, ) offer = get_instance_offer(instance_model) diff --git a/src/dstack/_internal/server/services/runner/client.py b/src/dstack/_internal/server/services/runner/client.py index c3500e4a9..4b78eefee 100644 --- a/src/dstack/_internal/server/services/runner/client.py +++ b/src/dstack/_internal/server/services/runner/client.py @@ -416,7 +416,6 @@ def submit_task( host_ssh_keys: list[str], container_ssh_keys: list[str], instance_id: str, - data_transfer_quota: int = 0, ) -> None: if not self.is_api_v2_supported(): raise ShimAPIVersionError() @@ -440,7 +439,6 @@ def submit_task( host_ssh_user=host_ssh_user, host_ssh_keys=host_ssh_keys, container_ssh_keys=container_ssh_keys, - data_transfer_quota=data_transfer_quota, ) self._request("POST", "/api/tasks", body, raise_for_status=True) diff --git a/src/dstack/_internal/server/settings.py b/src/dstack/_internal/server/settings.py index f0a434aab..e005a6cb5 100644 --- a/src/dstack/_internal/server/settings.py +++ b/src/dstack/_internal/server/settings.py @@ -145,20 +145,6 @@ os.getenv("DSTACK_SERVER_LOG_QUOTA_PER_JOB_HOUR", 50 * 1024 * 1024) # 50 MB ) -# Per-job data transfer quota for AWS backend: maximum total outbound bytes to external IPs. -# 0 = unlimited. Only applied to instances running on AWS. -# Limitations: -# - Meters all outbound traffic to non-private IPs (excludes 10.0.0.0/8, 172.16.0.0/12, -# 192.168.0.0/16, 169.254.0.0/16). This covers inter-region and internet egress. -# - Does not differentiate by destination region — the same quota applies regardless of -# whether traffic goes to another AWS region ($0.01-0.02/GB) or the internet ($0.09/GB). -# - Only effective on Linux instances with iptables available. -# Task fails with executor_error on systems without iptables if quota is set. -# To add support for other backends, add DSTACK_SERVER_DATA_TRANSFER_QUOTA_PER_JOB_GCP, etc. -SERVER_DATA_TRANSFER_QUOTA_PER_JOB_AWS = int( - os.getenv("DSTACK_SERVER_DATA_TRANSFER_QUOTA_PER_JOB_AWS", 0) # disabled by default -) - # Development settings SQL_ECHO_ENABLED = os.getenv("DSTACK_SQL_ECHO_ENABLED") is not None diff --git a/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py b/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py index eb0cb6b70..a52924a55 100644 --- a/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py +++ b/src/tests/_internal/server/background/pipeline_tasks/test_running_jobs.py @@ -573,7 +573,6 @@ async def test_provisioning_shim_with_volumes( host_ssh_keys=["user_ssh_key"], container_ssh_keys=[project_ssh_pub_key, "user_ssh_key"], instance_id=job_provisioning_data.instance_id, - data_transfer_quota=0, ) await session.refresh(job) assert job.status == JobStatus.PULLING diff --git a/src/tests/_internal/server/background/scheduled_tasks/test_running_jobs.py b/src/tests/_internal/server/background/scheduled_tasks/test_running_jobs.py index 862565c64..66b38f331 100644 --- a/src/tests/_internal/server/background/scheduled_tasks/test_running_jobs.py +++ b/src/tests/_internal/server/background/scheduled_tasks/test_running_jobs.py @@ -389,7 +389,6 @@ async def test_provisioning_shim_with_volumes( host_ssh_keys=["user_ssh_key"], container_ssh_keys=[project_ssh_pub_key, "user_ssh_key"], instance_id=job_provisioning_data.instance_id, - data_transfer_quota=0, ) await session.refresh(job) assert job.status == JobStatus.PULLING diff --git a/src/tests/_internal/server/services/runner/test_client.py b/src/tests/_internal/server/services/runner/test_client.py index dbbece0a5..588c231a1 100644 --- a/src/tests/_internal/server/services/runner/test_client.py +++ b/src/tests/_internal/server/services/runner/test_client.py @@ -455,7 +455,6 @@ def test_submit_task(self, client: ShimClient, adapter: requests_mock.Adapter): "host_ssh_user": "dstack", "host_ssh_keys": ["host_key"], "container_ssh_keys": ["project_key", "user_key"], - "data_transfer_quota": 0, } self.assert_request(adapter, 1, "POST", "/api/tasks", expected_request) From 53c29c75b1d99a56bc30119160a186f761d0a578 Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Wed, 25 Mar 2026 14:06:58 +0100 Subject: [PATCH 5/8] Fix pyright: add missing job_runtime_data arg, handle False return from SSH tunnel Co-Authored-By: Claude Opus 4.6 (1M context) --- .../background/pipeline_tasks/instances/termination.py | 7 ++++++- .../server/background/scheduled_tasks/instances.py | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/instances/termination.py b/src/dstack/_internal/server/background/pipeline_tasks/instances/termination.py index 28f09c8be..7c9fea35d 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/instances/termination.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/instances/termination.py @@ -107,9 +107,14 @@ async def _capture_final_data_transfer_bytes( _read_instance_health, get_instance_ssh_private_keys(instance_model), jpd, + None, instance=instance_model, ) - if health_response is not None and health_response.data_transfer_bytes is not None: + if ( + health_response is not False + and health_response is not None + and health_response.data_transfer_bytes is not None + ): result.instance_update_map["data_transfer_bytes"] = health_response.data_transfer_bytes except Exception as exc: logger.debug( diff --git a/src/dstack/_internal/server/background/scheduled_tasks/instances.py b/src/dstack/_internal/server/background/scheduled_tasks/instances.py index 972ad0200..b70b37ffb 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/instances.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/instances.py @@ -1177,9 +1177,14 @@ async def _capture_final_data_transfer_bytes( _read_instance_health, get_instance_ssh_private_keys(instance), jpd, + None, instance=instance, ) - if health_response is not None and health_response.data_transfer_bytes is not None: + if ( + health_response is not False + and health_response is not None + and health_response.data_transfer_bytes is not None + ): instance.data_transfer_bytes = health_response.data_transfer_bytes except Exception as exc: logger.debug( From 26fcbe390c7d87d6e69edfea97d2d499bb19d0a2 Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Wed, 25 Mar 2026 14:07:50 +0100 Subject: [PATCH 6/8] Revert unintended change to examples/.dstack.yml Co-Authored-By: Claude Opus 4.6 (1M context) --- examples/.dstack.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/.dstack.yml b/examples/.dstack.yml index 352d9e0d1..dc8a9c58c 100644 --- a/examples/.dstack.yml +++ b/examples/.dstack.yml @@ -1,11 +1,12 @@ type: dev-environment -name: dev +name: cursor python: 3.12 +ide: cursor # Mount the repo directory to `/workflow` (the default working directory) repos: - .. -#resources: -# gpu: 1 +resources: + gpu: 1 From 8ec5b15435cdc6ba07b37644d688c08fe6b12a85 Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Wed, 25 Mar 2026 14:24:49 +0100 Subject: [PATCH 7/8] Fix tests: add data_transfer_bytes to expected API responses Co-Authored-By: Claude Opus 4.6 (1M context) --- ..._1200_a1b2c3d4e5f6_add_data_transfer_bytes_to_instances.py | 4 ++-- src/dstack/_internal/server/services/instances.py | 2 +- src/tests/_internal/server/routers/test_fleets.py | 4 ++++ 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/migrations/versions/2026/03_25_1200_a1b2c3d4e5f6_add_data_transfer_bytes_to_instances.py b/src/dstack/_internal/server/migrations/versions/2026/03_25_1200_a1b2c3d4e5f6_add_data_transfer_bytes_to_instances.py index 4e7d20d66..1727e1f62 100644 --- a/src/dstack/_internal/server/migrations/versions/2026/03_25_1200_a1b2c3d4e5f6_add_data_transfer_bytes_to_instances.py +++ b/src/dstack/_internal/server/migrations/versions/2026/03_25_1200_a1b2c3d4e5f6_add_data_transfer_bytes_to_instances.py @@ -1,7 +1,7 @@ """Add data_transfer_bytes to instances Revision ID: a1b2c3d4e5f6 -Revises: 8b6d5d8c1b9a +Revises: c1c2ecaee45c Create Date: 2026-03-25 12:00:00.000000+00:00 """ @@ -11,7 +11,7 @@ # revision identifiers, used by Alembic. revision = "a1b2c3d4e5f6" -down_revision = "8b6d5d8c1b9a" +down_revision = "c1c2ecaee45c" branch_labels = None depends_on = None diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index 710cf2444..9dc196e16 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -241,7 +241,7 @@ def instance_model_to_instance(instance_model: InstanceModel) -> Instance: finished_at=instance_model.finished_at, total_blocks=instance_model.total_blocks, busy_blocks=instance_model.busy_blocks, - data_transfer_bytes=instance_model.data_transfer_bytes, + data_transfer_bytes=instance_model.data_transfer_bytes or 0, ) offer = get_instance_offer(instance_model) diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 47544b921..5b7ac55e8 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -1025,6 +1025,7 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async "price": None, "total_blocks": 1, "busy_blocks": 0, + "data_transfer_bytes": 0, } ], } @@ -1167,6 +1168,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A "price": 0.0, "total_blocks": 1, "busy_blocks": 0, + "data_transfer_bytes": 0, } ], } @@ -1347,6 +1349,7 @@ async def test_updates_ssh_fleet(self, test_db, session: AsyncSession, client: A "price": 0.0, "total_blocks": 1, "busy_blocks": 0, + "data_transfer_bytes": 0, }, { "id": SomeUUID4Str(), @@ -1382,6 +1385,7 @@ async def test_updates_ssh_fleet(self, test_db, session: AsyncSession, client: A "price": 0.0, "total_blocks": 1, "busy_blocks": 0, + "data_transfer_bytes": 0, }, ], } From 22bed96c96d1f65b447a9502dd1ca6fd8d58dbd8 Mon Sep 17 00:00:00 2001 From: Andrey Cheptsov Date: Wed, 25 Mar 2026 14:37:41 +0100 Subject: [PATCH 8/8] Revert unintended changes to .justfile and .gitignore Co-Authored-By: Claude Opus 4.6 (1M context) --- .gitignore | 2 -- .justfile | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 4c7f02696..bed3aa2cc 100644 --- a/.gitignore +++ b/.gitignore @@ -27,5 +27,3 @@ uv.lock profiling_results.html docs/docs/reference/api/rest/openapi.json -.agents/skills -.claude/skills diff --git a/.justfile b/.justfile index 5be2f0879..59defcfa1 100644 --- a/.justfile +++ b/.justfile @@ -17,4 +17,4 @@ import "runner/.justfile" import "frontend/.justfile" docs-serve: - DYLD_FALLBACK_LIBRARY_PATH=/opt/homebrew/lib mkdocs serve --livereload -w examples -s + uv run mkdocs serve --livereload -w examples -s