Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
invmock "github.com/block/Version-Guard/pkg/inventory/mock"
"github.com/block/Version-Guard/pkg/inventory/wiz"
"github.com/block/Version-Guard/pkg/policy"
"github.com/block/Version-Guard/pkg/schedule"
"github.com/block/Version-Guard/pkg/snapshot"
"github.com/block/Version-Guard/pkg/store/memory"
"github.com/block/Version-Guard/pkg/types"
Expand Down Expand Up @@ -67,6 +68,12 @@ type ServerCLI struct {
TagEnvKeys string `help:"Comma-separated tag keys for environment" default:"environment,env" env:"TAG_ENV_KEYS"`
TagBrandKeys string `help:"Comma-separated tag keys for brand/business unit" default:"brand" env:"TAG_BRAND_KEYS"`

// Schedule configuration
ScheduleEnabled bool `help:"Enable scheduled scanning" default:"false" env:"SCHEDULE_ENABLED"`
ScheduleCron string `help:"Cron expression for scan schedule" default:"0 6 * * *" env:"SCHEDULE_CRON"`
ScheduleID string `help:"Temporal schedule ID" default:"version-guard-scan" env:"SCHEDULE_ID"`
ScheduleJitter string `help:"Schedule jitter duration" default:"5m" env:"SCHEDULE_JITTER"`

// Global flags
Verbose bool `short:"v" help:"Enable verbose logging"`
DryRun bool `help:"Run in dry-run mode (no Temporal workers started)"`
Expand Down Expand Up @@ -114,6 +121,12 @@ func (s *ServerCLI) Run(_ *kong.Context) error {
fmt.Printf(" Tag Keys - App: %s\n", s.TagAppKeys)
fmt.Printf(" Tag Keys - Env: %s\n", s.TagEnvKeys)
fmt.Printf(" Tag Keys - Brand: %s\n", s.TagBrandKeys)
if s.ScheduleEnabled {
fmt.Printf(" Schedule: enabled (cron: %s, id: %s, jitter: %s)\n",
s.ScheduleCron, s.ScheduleID, s.ScheduleJitter)
} else {
fmt.Printf(" Schedule: disabled\n")
}
}

if s.DryRun {
Expand Down Expand Up @@ -327,10 +340,40 @@ func (s *ServerCLI) Run(_ *kong.Context) error {
fmt.Println("⚠️ Orchestrator snapshot activity not registered (no S3 store)")
}

// Create schedule (if enabled)
if s.ScheduleEnabled {
jitter, parseErr := time.ParseDuration(s.ScheduleJitter)
if parseErr != nil {
fmt.Printf("⚠️ Invalid schedule jitter %q, using default 5m: %v\n", s.ScheduleJitter, parseErr)
jitter = 5 * time.Minute
}

scheduleMgr := schedule.NewManager(temporalClient)
schedCtx, schedCancel := context.WithTimeout(ctx, 10*time.Second)
defer schedCancel()
schedErr := scheduleMgr.EnsureSchedule(schedCtx, schedule.Config{
Enabled: true,
ScheduleID: s.ScheduleID,
CronExpression: s.ScheduleCron,
Jitter: jitter,
TaskQueue: s.TemporalTaskQueue,
})
if schedErr != nil {
fmt.Printf("⚠️ Failed to create/update schedule: %v\n", schedErr)
fmt.Println(" Worker will continue — trigger scans manually")
} else {
fmt.Printf("✓ Schedule configured: %s (cron: %s, jitter: %s)\n",
s.ScheduleID, s.ScheduleCron, s.ScheduleJitter)
}
}

// Start worker
fmt.Printf("\n✓ Temporal worker starting on queue: %s\n", s.TemporalTaskQueue)
fmt.Println("\nVersion Guard is ready!")
fmt.Println("\n📖 To trigger a scan, use the Temporal UI or CLI:")
if s.ScheduleEnabled {
fmt.Printf(" Scans will run automatically (schedule: %s)\n", s.ScheduleCron)
}
fmt.Println("\n📖 To trigger a scan manually, use the Temporal UI or CLI:")
fmt.Printf(" temporal workflow start --task-queue %s --type %s --input '{}'\n", s.TemporalTaskQueue, orchestrator.OrchestratorWorkflowType)
fmt.Println("\n📖 To query findings via gRPC:")
fmt.Printf(" grpcurl -plaintext localhost:%d list\n", s.GRPCPort)
Expand Down
121 changes: 121 additions & 0 deletions pkg/schedule/schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package schedule

import (
"context"
"errors"
"fmt"
"time"

"go.temporal.io/sdk/client"
"go.temporal.io/sdk/temporal"

"github.com/block/Version-Guard/pkg/workflow/orchestrator"
)

// Config holds configuration for the Temporal schedule.
type Config struct {
ScheduleID string
CronExpression string
TaskQueue string
Jitter time.Duration
Enabled bool
Paused bool
}

// Creator abstracts the Temporal schedule client for testability.
type Creator interface {
Create(ctx context.Context, options client.ScheduleOptions) (client.ScheduleHandle, error)
GetHandle(ctx context.Context, scheduleID string) client.ScheduleHandle
}

// Manager handles Temporal schedule lifecycle.
type Manager struct {
scheduleClient Creator
}

// NewManager creates a Manager from a Temporal client.
func NewManager(c client.Client) *Manager {
return &Manager{scheduleClient: c.ScheduleClient()}
}

// NewManagerWithClient creates a Manager with an explicit Creator (for testing).
func NewManagerWithClient(sc Creator) *Manager {
return &Manager{scheduleClient: sc}
}

// EnsureSchedule creates the schedule if it doesn't exist, or updates it
// if the cron expression has changed.
func (m *Manager) EnsureSchedule(ctx context.Context, cfg Config) error {
if !cfg.Enabled {
return nil
}

opts := client.ScheduleOptions{
ID: cfg.ScheduleID,
Spec: client.ScheduleSpec{
CronExpressions: []string{cfg.CronExpression},
Jitter: cfg.Jitter,
},
Action: &client.ScheduleWorkflowAction{
Workflow: orchestrator.OrchestratorWorkflow,
Args: []interface{}{orchestrator.WorkflowInput{}},
TaskQueue: cfg.TaskQueue,
WorkflowExecutionTimeout: 2 * time.Hour,
},
Paused: cfg.Paused,
}

_, err := m.scheduleClient.Create(ctx, opts)
if err == nil {
return nil
}

// If the schedule already exists, check if we need to update it
if !isScheduleAlreadyRunning(err) {
return fmt.Errorf("failed to create schedule %q: %w", cfg.ScheduleID, err)
}

handle := m.scheduleClient.GetHandle(ctx, cfg.ScheduleID)
desc, err := handle.Describe(ctx)
if err != nil {
return fmt.Errorf("failed to describe existing schedule %q: %w", cfg.ScheduleID, err)
}

// Check if the cron expression or jitter has changed
existingSpec := desc.Schedule.Spec
if existingSpec == nil {
existingSpec = &client.ScheduleSpec{}
}
existingCrons := existingSpec.CronExpressions
if len(existingCrons) == 1 && existingCrons[0] == cfg.CronExpression && existingSpec.Jitter == cfg.Jitter {
fmt.Printf(" Schedule %q already configured (cron: %s)\n", cfg.ScheduleID, cfg.CronExpression)
return nil
}

// Update the schedule with the new spec
err = handle.Update(ctx, client.ScheduleUpdateOptions{
DoUpdate: func(input client.ScheduleUpdateInput) (*client.ScheduleUpdate, error) {
if input.Description.Schedule.Spec == nil {
input.Description.Schedule.Spec = &client.ScheduleSpec{}
}
input.Description.Schedule.Spec.CronExpressions = []string{cfg.CronExpression}
input.Description.Schedule.Spec.Jitter = cfg.Jitter
if action, ok := input.Description.Schedule.Action.(*client.ScheduleWorkflowAction); ok {
action.TaskQueue = cfg.TaskQueue
}
return &client.ScheduleUpdate{
Schedule: &input.Description.Schedule,
}, nil
},
})
if err != nil {
return fmt.Errorf("failed to update schedule %q: %w", cfg.ScheduleID, err)
}

fmt.Printf(" Schedule %q updated (cron: %s)\n", cfg.ScheduleID, cfg.CronExpression)
return nil
}

func isScheduleAlreadyRunning(err error) bool {
return errors.Is(err, temporal.ErrScheduleAlreadyRunning)
}
Loading
Loading