diff --git a/.claude/agents/security-reviewer.md b/.claude/agents/security-reviewer.md new file mode 100644 index 00000000000..d8f8d83e9e2 --- /dev/null +++ b/.claude/agents/security-reviewer.md @@ -0,0 +1,178 @@ +--- +name: security-reviewer +description: Performs OpenFn-specific security checks on PR changes. Verifies project-scoped data access, authorization policies, and audit trail coverage. +tools: Read, Grep, Glob, LS +model: sonnet +--- + +You are a security reviewer for the OpenFn Lightning platform. Your job is to +analyze PR changes against three critical security requirements specific to this +codebase. You must read the changed files, trace their implications, and report +findings with precise file:line references. + +## The Three Security Checks + +### S0: Project-Scoped Data Access + +**Requirement:** All access to project data (dataclips, runs, work orders, +collections, workflows, project_credentials, triggers, edges, jobs) MUST be +scoped by the current project. A user in Project A must never be able to read or +modify data belonging to Project B. + +**How to check:** + +1. Read the PR diff to identify any new or modified database queries, context + functions, LiveView mounts/handle_events, controller actions, or API + endpoints. +2. For each query that touches project-owned resources, verify it filters by + `project_id` — either directly (`where: r.project_id == ^project_id`) or + transitively through joins (e.g., run -> work_order -> workflow -> + project_id). +3. Check that the calling code obtains the project from an authenticated + source (the current user's project membership), not from user-supplied + input that could be spoofed (e.g., a raw ID from query params without + membership verification). +4. Look at the existing patterns for reference: + - `lib/lightning/workflows/query.ex` — `workflows_for/1`, `jobs_for/1` + - `lib/lightning/invocation/query.ex` — `work_orders_for/1`, `runs_for/1` + - `lib/lightning/projects.ex` — direct `project_id` filtering + +**Red flags:** +- Queries using only a resource ID without joining/filtering on project +- New API endpoints or LiveView actions that accept a `project_id` from params + without verifying the user is a member of that project via `project_users` +- `Repo.get/2` or `Repo.get!/2` calls on project-scoped resources without a + subsequent project membership check +- Missing `where` clauses on `project_id` in new Ecto queries + +### S1: Authorization Policies + +**Requirement:** All new actions that create, read, update, or delete +project-scoped resources must be protected by Bodyguard authorization policies +with appropriate role checks (`:owner`, `:admin`, `:editor`, `:viewer`). + +**How to check:** + +1. Identify new actions introduced by the PR (new LiveView handle_events, new + controller actions, new context functions exposed to the web layer). +2. For each action, verify that `Permissions.can?/4` or `Permissions.can/4` is + called before the operation is performed, using the correct policy module. +3. Check that the corresponding policy module in `lib/lightning/policies/` has + an `authorize/3` clause covering the new action with appropriate role + restrictions. +4. Verify that tests exist in `test/lightning/policies/` covering the new + authorization rules — specifically that permitted roles succeed and + non-permitted roles are denied. + +**Reference patterns:** +- Policy modules: `lib/lightning/policies/*.ex` +- Permission checks: `Lightning.Policies.Permissions.can?/4` +- Test pattern: + ```elixir + assert PolicyModule |> Permissions.can?(:action_name, user, resource) + refute PolicyModule |> Permissions.can?(:action_name, viewer, resource) + ``` + +**Red flags:** +- New LiveView `handle_event` callbacks with no `Permissions.can?` gate +- New controller actions missing `authorize/3` calls +- Policy modules updated with new actions but no corresponding test coverage +- Overly permissive roles (e.g., `:viewer` allowed to mutate data) + +### S2: Audit Trail Coverage + +**Requirement:** Any new operation that modifies the configuration of a project +or instance must produce an audit trail entry. This includes changes to +workflows, credentials, project settings, webhook auth methods, OAuth clients, +version control settings, and similar configuration resources. + +**How to check:** + +1. Identify operations in the PR that create, update, or delete configuration + resources. +2. Verify that the relevant `Ecto.Multi` pipeline (or equivalent) includes an + audit event insertion step. +3. Check that an appropriate audit module exists under the domain (e.g., + `Lightning.Credentials.Audit`, `Lightning.Workflows.Audit`). If the PR + introduces a new auditable resource type, a new audit module should be + created using the `use Lightning.Auditing.Audit` macro. +4. Verify the audit event name is descriptive (e.g., `"created"`, `"updated"`, + `"deleted"`) and that the changeset is passed so before/after diffs are + captured. + +**Reference patterns:** +- Audit macro: `use Lightning.Auditing.Audit, repo: Lightning.Repo, item: "resource_name", events: [...]` +- Event creation inside Multi: + ```elixir + |> Multi.insert(:audit, fn %{resource: resource} -> + Audit.user_initiated_event("created", resource, changeset, extra_data) + end) + ``` +- Existing audit modules: + - `lib/lightning/credentials/audit.ex` + - `lib/lightning/projects/audit.ex` + - `lib/lightning/workflows/audit.ex` + - `lib/lightning/workflows/webhook_auth_method_audit.ex` + - `lib/lightning/workorders/export_audit.ex` + - `lib/lightning/invocation/dataclip_audit.ex` + - `lib/lightning/credentials/oauth_client_audit.ex` + - `lib/lightning/version_control/audit.ex` + +**Red flags:** +- New `Repo.insert/update/delete` calls on configuration resources with no + corresponding audit event in the same transaction +- Existing audit modules not updated when new event types are introduced +- Audit events missing the changeset (so before/after diffs are empty) + +## Review Process + +1. **Read the PR diff** to understand what changed. +2. **For each changed file**, determine which security checks (S0, S1, S2) are + relevant. Not every file will be relevant to all three checks. +3. **Trace the code paths** — read referenced modules, query functions, and + policy modules as needed to verify compliance. +4. **Report findings** using the output format below. + +## Output Format + +Structure your review as follows: + +``` +## Security Review + +### S0: Project-Scoped Data Access +- **Status:** PASS | FAIL | N/A +- **Findings:** [List specific issues with file:line references, or "No issues found"] + +### S1: Authorization Policies +- **Status:** PASS | FAIL | N/A +- **Findings:** [List specific issues with file:line references, or "No issues found"] + +### S2: Audit Trail Coverage +- **Status:** PASS | FAIL | N/A +- **Findings:** [List specific issues with file:line references, or "No issues found"] + +### Summary +[1-2 sentence overall assessment] +``` + +Use **N/A** when the PR changes do not touch areas relevant to that check (e.g., +a pure frontend styling change has no S0/S1/S2 implications). + +Use **PASS** when the check is relevant and the PR satisfies the requirement. + +Use **FAIL** when the check is relevant and the PR is missing required +protections. Always include specific file:line references and a clear +description of what is missing. + +## Important Guidelines + +- **Be precise.** Always cite file:line for every finding. +- **Read the actual code.** Do not guess based on file names alone. +- **Check tests too.** Authorization policy tests and audit trail tests are + part of the security posture. +- **Minimize false positives.** Only flag issues you can substantiate by + reading the code. If you are uncertain, say so rather than asserting a + failure. +- **Stay focused.** Only evaluate S0, S1, and S2. Do not flag general code + quality, performance, or style issues. diff --git a/.github/workflows/security-review.yml b/.github/workflows/security-review.yml new file mode 100644 index 00000000000..abd3f1b3536 --- /dev/null +++ b/.github/workflows/security-review.yml @@ -0,0 +1,33 @@ +name: Security Review + +on: + pull_request: + types: [opened, ready_for_review, synchronize] + +permissions: + contents: read + pull-requests: write + id-token: write + +jobs: + security-review: + if: ${{ !github.event.pull_request.draft }} + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - uses: anthropics/claude-code-action@v1 + with: + anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }} + prompt: | + You are performing a security review on this PR using the + instructions defined in .claude/agents/security-reviewer.md. + + Read that file first, then follow its instructions exactly. + Review only the changes introduced by this PR. + Post your findings as a structured review comment. + claude_args: | + --max-turns 10 + --model claude-sonnet-4-6 diff --git a/assets/js/collaborative-editor/components/ManualRunPanel.tsx b/assets/js/collaborative-editor/components/ManualRunPanel.tsx index 226c14b19d4..b56a0556ace 100644 --- a/assets/js/collaborative-editor/components/ManualRunPanel.tsx +++ b/assets/js/collaborative-editor/components/ManualRunPanel.tsx @@ -4,7 +4,7 @@ import { QueueListIcon, XMarkIcon, } from '@heroicons/react/24/outline'; -import { useCallback, useEffect, useMemo, useState } from 'react'; +import { useCallback, useEffect, useMemo, useRef, useState } from 'react'; import { useURLState } from '#/react/lib/use-url-state'; import _logger from '#/utils/logger'; @@ -97,6 +97,10 @@ export function ManualRunPanel({ const [dataclips, setDataclips] = useState([]); const [manuallyUnselected, setManuallyUnselected] = useState(false); + // Ref to avoid stale closure in async fetch callback + const selectedDataclipRef = useRef(selectedDataclip); + selectedDataclipRef.current = selectedDataclip; + const setSelectedTab = useCallback( (tab: TabValue) => { setSelectedTabInternal(tab); @@ -294,8 +298,7 @@ export function ManualRunPanel({ response.next_cron_run_dataclip_id && !disableAutoSelection && !followedRunId && - !isDataclipControlled && - !selectedDataclip && + !selectedDataclipRef.current && !manuallyUnselected ) { const nextCronDataclip = response.data.find( diff --git a/assets/js/collaborative-editor/components/ide/FullScreenIDE.tsx b/assets/js/collaborative-editor/components/ide/FullScreenIDE.tsx index 88363664847..ce55f3da01f 100644 --- a/assets/js/collaborative-editor/components/ide/FullScreenIDE.tsx +++ b/assets/js/collaborative-editor/components/ide/FullScreenIDE.tsx @@ -1299,7 +1299,7 @@ export function FullScreenIDE({ selectedTab={selectedTab} selectedDataclip={selectedDataclipState} customBody={customBody} - disableAutoSelection + disableAutoSelection={manuallyUnselectedDataclip} /> ) : null} diff --git a/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx b/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx index 31145c2e6a7..76088d8dc1b 100644 --- a/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx +++ b/assets/js/collaborative-editor/components/inspector/TriggerForm.tsx @@ -51,6 +51,9 @@ export function TriggerForm({ trigger }: TriggerFormProps) { const kafkaTriggersEnabled = sessionContext.config?.kafka_triggers_enabled ?? false; + // Get jobs for cron input source dropdown + const jobs = useWorkflowState(state => state.jobs); + // Get active trigger auth methods from workflow store const activeTriggerAuthMethods = useWorkflowState( state => state.activeTriggerAuthMethods @@ -443,7 +446,6 @@ export function TriggerForm({ trigger }: TriggerFormProps) { if (currentType === 'cron') { return (
- {/*
*/} {/* Cron Expression Field */} - {/*
*/} + + {/* Cron Input Source */} +
+ + {field => ( +
+
+ + + + +
+ +

+ Choose which step's output to use as input + for cron-triggered runs. +

+ {field.state.meta.errors.map(error => ( +

+ {error} +

+ ))} +
+ )} +
+
); } diff --git a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts index 4581776db16..bffa8d11577 100644 --- a/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts +++ b/assets/js/collaborative-editor/hooks/useUnsavedChanges.ts @@ -99,6 +99,7 @@ function transformTrigger(trigger: Trigger) { switch (trigger.type) { case 'cron': output.cron_expression = trigger.cron_expression ?? '0 0 * * *'; // default cron expression + output.cron_cursor_job_id = trigger.cron_cursor_job_id ?? null; break; case 'kafka': output.kafka_configuration = trigger.kafka_configuration; diff --git a/assets/js/collaborative-editor/types/trigger.ts b/assets/js/collaborative-editor/types/trigger.ts index adfbe40f7dd..6605f9f212f 100644 --- a/assets/js/collaborative-editor/types/trigger.ts +++ b/assets/js/collaborative-editor/types/trigger.ts @@ -18,6 +18,7 @@ const baseTriggerSchema = z.object({ const webhookTriggerSchema = baseTriggerSchema.extend({ type: z.literal('webhook'), cron_expression: z.null().default(null), + cron_cursor_job_id: z.null().default(null), kafka_configuration: z.null().default(null), webhook_reply: z .enum(['before_start', 'after_completion']) @@ -46,6 +47,7 @@ const cronTriggerSchema = baseTriggerSchema.extend({ 'Invalid cron expression. Use format: minute hour day month weekday', } ), + cron_cursor_job_id: z.string().uuid().nullable().default(null), kafka_configuration: z.null().default(null), webhook_reply: z.null().default(null).catch(null), }); @@ -99,6 +101,7 @@ const kafkaConfigSchema = z const kafkaTriggerSchema = baseTriggerSchema.extend({ type: z.literal('kafka'), cron_expression: z.null().default(null), + cron_cursor_job_id: z.null().default(null), kafka_configuration: kafkaConfigSchema, webhook_reply: z.null().default(null).catch(null), }); @@ -131,6 +134,7 @@ export const createDefaultTrigger = ( ...base, type: 'webhook' as const, cron_expression: null, + cron_cursor_job_id: null, kafka_configuration: null, webhook_reply: 'before_start' as const, }; @@ -140,6 +144,7 @@ export const createDefaultTrigger = ( ...base, type: 'cron' as const, cron_expression: '0 0 * * *', // Daily at midnight default + cron_cursor_job_id: null, kafka_configuration: null, webhook_reply: null, }; @@ -149,6 +154,7 @@ export const createDefaultTrigger = ( ...base, type: 'kafka' as const, cron_expression: null, + cron_cursor_job_id: null, kafka_configuration: { hosts_string: '', topics_string: '', diff --git a/assets/test/collaborative-editor/components/ManualRunPanel.test.tsx b/assets/test/collaborative-editor/components/ManualRunPanel.test.tsx index f327a7c8e60..ed0fd77210f 100644 --- a/assets/test/collaborative-editor/components/ManualRunPanel.test.tsx +++ b/assets/test/collaborative-editor/components/ManualRunPanel.test.tsx @@ -1388,5 +1388,166 @@ describe('ManualRunPanel', () => { expect(runButton).toBeDisabled(); }); }); + + test('shows cron banner when controlled selectedDataclip matches next_cron_run_dataclip_id', async () => { + // FullScreenIDE passes selectedDataclip as a controlled prop after it receives + // the onDataclipChange callback. This test verifies that when the parent passes + // the cron dataclip back via selectedDataclip, the banner is displayed. + vi.mocked(dataclipApi.searchDataclips).mockResolvedValue({ + data: [mockDataclip], + next_cron_run_dataclip_id: 'dataclip-1', + can_edit_dataclip: true, + }); + + renderManualRunPanel({ + workflow: mockWorkflow, + projectId: 'project-1', + workflowId: 'workflow-1', + jobId: 'job-1', + onClose: () => {}, + renderMode: 'embedded', + selectedDataclip: mockDataclip, + }); + + await waitFor(() => { + expect( + screen.getByText('Default Next Input for Cron') + ).toBeInTheDocument(); + expect( + screen.getByText(/This workflow has a "cron" trigger/) + ).toBeInTheDocument(); + }); + }); + + test('does not show cron banner when controlled selectedDataclip does not match next_cron_run_dataclip_id', async () => { + const otherDataclip: dataclipApi.Dataclip = { + ...mockDataclip, + id: 'dataclip-other', + name: 'Other Dataclip', + }; + + vi.mocked(dataclipApi.searchDataclips).mockResolvedValue({ + data: [mockDataclip, otherDataclip], + next_cron_run_dataclip_id: 'dataclip-1', + can_edit_dataclip: true, + }); + + renderManualRunPanel({ + workflow: mockWorkflow, + projectId: 'project-1', + workflowId: 'workflow-1', + jobId: 'job-1', + onClose: () => {}, + renderMode: 'embedded', + selectedDataclip: otherDataclip, + }); + + await waitFor(() => { + expect(screen.getByText('Other Dataclip')).toBeInTheDocument(); + }); + + expect( + screen.queryByText('Default Next Input for Cron') + ).not.toBeInTheDocument(); + }); + }); + + describe('cron auto-selection with disableAutoSelection', () => { + test('does not auto-select cron dataclip when disableAutoSelection is true', async () => { + // This simulates FullScreenIDE after the user manually unselects a dataclip: + // manuallyUnselectedDataclip=true → disableAutoSelection=true is passed down. + vi.mocked(dataclipApi.searchDataclips).mockResolvedValue({ + data: [mockDataclip], + next_cron_run_dataclip_id: 'dataclip-1', + can_edit_dataclip: true, + }); + + renderManualRunPanel({ + workflow: mockWorkflow, + projectId: 'project-1', + workflowId: 'workflow-1', + jobId: 'job-1', + onClose: () => {}, + disableAutoSelection: true, + }); + + // Give time for the fetch to complete and any auto-selection to occur + await waitFor(() => { + expect(dataclipApi.searchDataclips).toHaveBeenCalledOnce(); + }); + + // The cron banner must not appear — auto-selection was suppressed + expect( + screen.queryByText('Default Next Input for Cron') + ).not.toBeInTheDocument(); + }); + + test('auto-selects cron dataclip in embedded mode when disableAutoSelection is false', async () => { + // This simulates the initial FullScreenIDE load where the user has not yet + // manually unselected anything: manuallyUnselectedDataclip=false. + vi.mocked(dataclipApi.searchDataclips).mockResolvedValue({ + data: [mockDataclip], + next_cron_run_dataclip_id: 'dataclip-1', + can_edit_dataclip: true, + }); + + const onDataclipChange = vi.fn(); + + renderManualRunPanel({ + workflow: mockWorkflow, + projectId: 'project-1', + workflowId: 'workflow-1', + jobId: 'job-1', + onClose: () => {}, + renderMode: 'embedded', + disableAutoSelection: false, + onDataclipChange, + }); + + // The component should auto-select and notify the parent via onDataclipChange + await waitFor(() => { + expect(onDataclipChange).toHaveBeenCalledWith(mockDataclip); + }); + }); + + test('shows cron banner when user manually selects the cron dataclip', async () => { + const user = userEvent.setup(); + + vi.mocked(dataclipApi.searchDataclips).mockResolvedValue({ + data: [mockDataclip], + next_cron_run_dataclip_id: 'dataclip-1', + can_edit_dataclip: true, + }); + + // disableAutoSelection=true prevents initial auto-select, but the user can + // still click the dataclip in the list and the banner should appear. + renderManualRunPanel({ + workflow: mockWorkflow, + projectId: 'project-1', + workflowId: 'workflow-1', + jobId: 'job-1', + onClose: () => {}, + disableAutoSelection: true, + }); + + // Switch to Existing tab and manually click the cron dataclip + await user.click(screen.getByText('Existing')); + + await waitFor(() => { + expect(screen.getByText('Test Dataclip')).toBeInTheDocument(); + }); + + await user.click(screen.getByText('Test Dataclip')); + + // Banner must appear because the selected dataclip IS the cron dataclip + await waitFor(() => { + expect( + screen.getByText('Default Next Input for Cron') + ).toBeInTheDocument(); + expect( + screen.getByText(/This workflow has a "cron" trigger/) + ).toBeInTheDocument(); + }); + }); }); }); diff --git a/lib/lightning/collaboration/workflow_serializer.ex b/lib/lightning/collaboration/workflow_serializer.ex index 4db1d65bee0..c65a2215e62 100644 --- a/lib/lightning/collaboration/workflow_serializer.ex +++ b/lib/lightning/collaboration/workflow_serializer.ex @@ -223,6 +223,7 @@ defmodule Lightning.Collaboration.WorkflowSerializer do Yex.MapPrelim.from(%{ "kafka_configuration" => kafka_configuration, "cron_expression" => trigger.cron_expression, + "cron_cursor_job_id" => trigger.cron_cursor_job_id, "enabled" => trigger.enabled, "id" => trigger.id, "type" => trigger.type |> to_string(), @@ -273,7 +274,7 @@ defmodule Lightning.Collaboration.WorkflowSerializer do |> Enum.map(fn trigger -> trigger |> Map.take( - ~w(id type enabled cron_expression webhook_reply kafka_configuration) + ~w(id type enabled cron_expression cron_cursor_job_id webhook_reply kafka_configuration) ) |> normalize_kafka_configuration() end) diff --git a/lib/lightning/invocation.ex b/lib/lightning/invocation.ex index 645c007f246..35eab4c0079 100644 --- a/lib/lightning/invocation.ex +++ b/lib/lightning/invocation.ex @@ -13,6 +13,7 @@ defmodule Lightning.Invocation do alias Lightning.Projects.File, as: ProjectFile alias Lightning.Projects.Project alias Lightning.Repo + alias Lightning.Run alias Lightning.Workflows.Edge alias Lightning.Workflows.Job alias Lightning.Workflows.Trigger @@ -217,43 +218,54 @@ defmodule Lightning.Invocation do end @doc """ - Gets the next cron run dataclip for a job. + Returns the dataclip that the scheduler will use for the next cron run + of the given trigger. Branches on cron_cursor_job_id. + """ + def get_next_cron_run_dataclip(%Trigger{} = trigger) do + case trigger.cron_cursor_job_id do + nil -> last_run_final_dataclip(trigger) + job_id -> last_successful_step_dataclip(job_id) + end + end - Returns the most recent output dataclip from a successful step for the given job, - filtered by the provided database filters. + @doc """ + Returns the final dataclip from the last successful run for a trigger. + Used when cron_cursor_job_id is nil (use final run state). """ - @spec get_next_cron_run_dataclip( - job_id :: Ecto.UUID.t(), - db_filters :: Ecto.Query.dynamic_expr() - ) :: - map() | nil - def get_next_cron_run_dataclip(job_id, db_filters) do + def last_run_final_dataclip(%Trigger{id: trigger_id}) do + from(r in Run, + join: wo in assoc(r, :work_order), + where: wo.trigger_id == ^trigger_id, + where: r.state == :success, + where: not is_nil(r.final_dataclip_id), + order_by: [desc: r.finished_at], + limit: 1, + preload: [:final_dataclip] + ) + |> Repo.one() + |> case do + nil -> nil + run -> run.final_dataclip + end + end + + @doc """ + Returns the output dataclip from the last successful step for a job. + Used when cron_cursor_job_id is set to a specific job. + """ + def last_successful_step_dataclip(job_id) do from(d in Dataclip, join: s in Step, on: s.output_dataclip_id == d.id, where: s.job_id == ^job_id and s.exit_reason == "success" and is_nil(d.wiped_at), - where: ^db_filters, order_by: [desc: s.finished_at], limit: 1 ) |> Repo.one() end - @doc """ - Checks if a job is triggered by a cron trigger. - """ - @spec cron_triggered_job?(job_id :: Ecto.UUID.t()) :: boolean() - def cron_triggered_job?(job_id) do - from(e in Edge, - join: t in Trigger, - on: e.source_trigger_id == t.id, - where: e.target_job_id == ^job_id and t.type == :cron - ) - |> Repo.exists?() - end - @doc """ Lists dataclips for a job, including next cron run state if cron-triggered. @@ -272,11 +284,12 @@ defmodule Lightning.Invocation do user_filters, opts ) do - if cron_triggered_job?(job_id) do - list_dataclips_with_cron_state(job_id, user_filters, opts) - else - dataclips = list_dataclips_for_job(job, user_filters, opts) - {dataclips, nil} + case get_cron_trigger_for_job(job_id) do + nil -> + {list_dataclips_for_job(job, user_filters, opts), nil} + + %Trigger{} = trigger -> + list_dataclips_with_cron_state(trigger, job_id, user_filters, opts) end end @@ -907,10 +920,20 @@ defmodule Lightning.Invocation do |> Repo.transaction() end + defp get_cron_trigger_for_job(job_id) do + from(t in Trigger, + join: e in Edge, + on: e.source_trigger_id == t.id, + where: e.target_job_id == ^job_id and t.type == :cron, + limit: 1 + ) + |> Repo.one() + end + # Query dataclips for cron-triggered jobs, including the next run state - defp list_dataclips_with_cron_state(job_id, user_filters, opts) do + defp list_dataclips_with_cron_state(trigger, job_id, user_filters, opts) do # Get the next cron run dataclip (always needed for the ID) - next_cron_dataclip = get_next_cron_run_dataclip(job_id, dynamic(true)) + next_cron_dataclip = get_next_cron_run_dataclip(trigger) next_cron_run_dataclip_id = next_cron_dataclip && next_cron_dataclip.id # Check if next cron dataclip matches user filters diff --git a/lib/lightning/runs/handlers.ex b/lib/lightning/runs/handlers.ex index e30827de9e0..81aa72fb644 100644 --- a/lib/lightning/runs/handlers.ex +++ b/lib/lightning/runs/handlers.ex @@ -61,36 +61,60 @@ defmodule Lightning.Runs.Handlers do defmodule CompleteRun do @moduledoc """ Schema to validate the input attributes of a completed run. + + The worker may send either: + - `final_dataclip_id` — reuse an existing step output dataclip (single leaf) + - `final_state` — a new map to persist as a dataclip (multiple leaves) + + These are mutually exclusive. If both are sent, `final_dataclip_id` wins. """ use Lightning.Schema import Lightning.ChangesetUtils + alias Lightning.Invocation.Dataclip + @primary_key false embedded_schema do field :state, :string field :reason, :string field :error_type, :string + field :final_dataclip_id, Ecto.UUID + field :final_state, :map + field :project_id, Ecto.UUID field :timestamp, Lightning.UnixDateTime end def call(run, params) do with {:ok, complete_run} <- params |> new() |> apply_action(:validate) do - run - |> Run.complete(to_run_params(complete_run)) - |> case do - %{valid?: false} = changeset -> - {:error, changeset} - - changeset -> - Runs.update_run(changeset) - end + Repo.transact(fn -> + with {:ok, run_params} <- + resolve_final_dataclip(complete_run, run.options) do + run + |> Run.complete(run_params) + |> case do + %{valid?: false} = changeset -> + {:error, changeset} + + changeset -> + Runs.update_run(changeset) + end + end + end) end end def new(params) do %__MODULE__{} - |> cast(params, [:state, :reason, :error_type, :timestamp]) + |> cast(params, [ + :state, + :reason, + :error_type, + :final_dataclip_id, + :final_state, + :project_id, + :timestamp + ]) |> put_new_change(:timestamp, Lightning.current_time()) |> then(fn changeset -> if reason = get_change(changeset, :reason) do @@ -119,6 +143,59 @@ defmodule Lightning.Runs.Handlers do |> Map.take([:state, :error_type]) |> Map.put(:finished_at, complete_run.timestamp) end + + # When the worker sends an existing dataclip ID, use it directly. + defp resolve_final_dataclip( + %__MODULE__{final_dataclip_id: id} = complete_run, + _options + ) + when is_binary(id) do + {:ok, to_run_params(complete_run) |> Map.put(:final_dataclip_id, id)} + end + + # When the worker sends a new final_state map, insert a new dataclip. + defp resolve_final_dataclip( + %__MODULE__{final_state: final_state, project_id: project_id} = + complete_run, + options + ) + when is_map(final_state) and is_binary(project_id) do + case save_final_dataclip(final_state, project_id, options) do + {:ok, %Dataclip{id: id}} -> + {:ok, to_run_params(complete_run) |> Map.put(:final_dataclip_id, id)} + + error -> + error + end + end + + # Neither provided (e.g., mark_run_lost, or worker didn't send final state). + defp resolve_final_dataclip(complete_run, _options) do + {:ok, to_run_params(complete_run)} + end + + defp save_final_dataclip( + _final_state, + project_id, + %Runs.RunOptions{save_dataclips: false} + ) do + Dataclip.new(%{ + project_id: project_id, + body: nil, + wiped_at: Lightning.current_time() |> DateTime.truncate(:second), + type: :step_result + }) + |> Repo.insert() + end + + defp save_final_dataclip(final_state, project_id, _options) do + Dataclip.new(%{ + project_id: project_id, + body: final_state, + type: :step_result + }) + |> Repo.insert() + end end defmodule StartStep do diff --git a/lib/lightning/runs/run.ex b/lib/lightning/runs/run.ex index 2a7450a75d1..8a2156ab9e1 100644 --- a/lib/lightning/runs/run.ex +++ b/lib/lightning/runs/run.ex @@ -71,6 +71,7 @@ defmodule Lightning.Run do belongs_to :starting_trigger, Trigger belongs_to :created_by, User belongs_to :dataclip, Lightning.Invocation.Dataclip + belongs_to :final_dataclip, Lightning.Invocation.Dataclip has_one :workflow, through: [:work_order, :workflow] belongs_to :snapshot, Snapshot @@ -169,7 +170,7 @@ defmodule Lightning.Run do run |> change() |> put_change(:state, nil) - |> cast(params, [:state, :error_type, :finished_at]) + |> cast(params, [:state, :error_type, :finished_at, :final_dataclip_id]) |> validate_required([:state, :finished_at]) |> validate_inclusion(:state, @final_states) |> validate_state_change() diff --git a/lib/lightning/workflows/scheduler.ex b/lib/lightning/workflows/scheduler.ex index dec8846902e..2747483d3a0 100644 --- a/lib/lightning/workflows/scheduler.ex +++ b/lib/lightning/workflows/scheduler.ex @@ -11,7 +11,6 @@ defmodule Lightning.Workflows.Scheduler do unique: [period: 59] alias Lightning.Invocation - alias Lightning.Repo alias Lightning.Workflows alias Lightning.Workflows.Edge alias Lightning.WorkOrders @@ -38,7 +37,9 @@ defmodule Lightning.Workflows.Scheduler do defp invoke_cronjob(%Edge{target_job: job, source_trigger: trigger}) do with %{project_id: project_id} <- job.workflow, :ok <- WorkOrders.limit_run_creation(project_id) do - case last_state_for_job(job.id) do + dataclip = Invocation.get_next_cron_run_dataclip(trigger) + + case dataclip do nil -> Logger.debug(fn -> # coveralls-ignore-start @@ -46,12 +47,6 @@ defmodule Lightning.Workflows.Scheduler do # coveralls-ignore-stop end) - # Add a facility to specify _which_ global state should be use as - # the first initial state for a cron-triggered job. - # The implementation would look like: - # default_state_for_job(id) - # %{id: uuid, type: :global, body: %{arbitrary: true}} - WorkOrders.create_for(trigger, dataclip: %{ type: :global, @@ -75,16 +70,4 @@ defmodule Lightning.Workflows.Scheduler do end end end - - defp last_state_for_job(id) do - step = - %Workflows.Job{id: id} - |> Invocation.Query.last_successful_step_for_job() - |> Repo.one() - - case step do - nil -> nil - step -> Invocation.get_output_dataclip_query(step) |> Repo.one() - end - end end diff --git a/lib/lightning/workflows/snapshot.ex b/lib/lightning/workflows/snapshot.ex index 2db0dea99ff..07108390739 100644 --- a/lib/lightning/workflows/snapshot.ex +++ b/lib/lightning/workflows/snapshot.ex @@ -55,6 +55,7 @@ defmodule Lightning.Workflows.Snapshot do field :custom_path, :string field :cron_expression, :string field :enabled, :boolean + field :cron_cursor_job_id, :binary_id field :kafka_configuration, :map field :type, Ecto.Enum, values: [:webhook, :cron, :kafka] field :has_auth_method, :boolean, virtual: true diff --git a/lib/lightning/workflows/trigger.ex b/lib/lightning/workflows/trigger.ex index ce20196bc3f..db769875af9 100644 --- a/lib/lightning/workflows/trigger.ex +++ b/lib/lightning/workflows/trigger.ex @@ -14,6 +14,7 @@ defmodule Lightning.Workflows.Trigger do use Lightning.Schema import Ecto.Query + alias Lightning.Workflows.Job alias Lightning.Workflows.Triggers.KafkaConfiguration alias Lightning.Workflows.Workflow @@ -32,6 +33,7 @@ defmodule Lightning.Workflows.Trigger do :comment, :custom_path, :cron_expression, + :cron_cursor_job_id, :type, :enabled, :webhook_reply @@ -45,6 +47,7 @@ defmodule Lightning.Workflows.Trigger do field :webhook_reply, Ecto.Enum, values: @webhook_reply_types belongs_to :workflow, Workflow + belongs_to :cron_cursor_job, Job has_many :edges, Lightning.Workflows.Edge, foreign_key: :source_trigger_id @@ -88,6 +91,7 @@ defmodule Lightning.Workflows.Trigger do :type, :workflow_id, :cron_expression, + :cron_cursor_job_id, :has_auth_method, :webhook_reply ]) @@ -123,6 +127,7 @@ defmodule Lightning.Workflows.Trigger do :webhook -> changeset |> put_change(:cron_expression, nil) + |> put_change(:cron_cursor_job_id, nil) |> put_change(:kafka_configuration, nil) |> put_default(:webhook_reply, :before_start) @@ -136,6 +141,7 @@ defmodule Lightning.Workflows.Trigger do :kafka -> changeset |> put_change(:cron_expression, nil) + |> put_change(:cron_cursor_job_id, nil) |> validate_required([:kafka_configuration]) |> put_change(:webhook_reply, nil) diff --git a/lib/lightning_web/channels/run_channel.ex b/lib/lightning_web/channels/run_channel.ex index a080b38db7f..4ff9d65ceef 100644 --- a/lib/lightning_web/channels/run_channel.ex +++ b/lib/lightning_web/channels/run_channel.ex @@ -107,6 +107,8 @@ defmodule LightningWeb.RunChannel do end def handle_in("run:complete", payload, socket) do + payload = Map.put(payload, "project_id", socket.assigns.project_id) + case Runs.complete_run(socket.assigns.run, payload) do {:ok, run} -> # TODO: Turn FailureAlerter into an Oban worker and process async diff --git a/priv/repo/migrations/20260313110903_add_final_dataclip_to_runs.exs b/priv/repo/migrations/20260313110903_add_final_dataclip_to_runs.exs new file mode 100644 index 00000000000..de50ce75798 --- /dev/null +++ b/priv/repo/migrations/20260313110903_add_final_dataclip_to_runs.exs @@ -0,0 +1,9 @@ +defmodule Lightning.Repo.Migrations.AddFinalDataclipToRuns do + use Ecto.Migration + + def change do + alter table(:runs) do + add :final_dataclip_id, references(:dataclips, type: :binary_id), null: true + end + end +end diff --git a/priv/repo/migrations/20260314161323_add_cron_cursor_job_id_to_triggers.exs b/priv/repo/migrations/20260314161323_add_cron_cursor_job_id_to_triggers.exs new file mode 100644 index 00000000000..8318c5f14ec --- /dev/null +++ b/priv/repo/migrations/20260314161323_add_cron_cursor_job_id_to_triggers.exs @@ -0,0 +1,29 @@ +defmodule Lightning.Repo.Migrations.AddCronCursorJobIdToTriggers do + use Ecto.Migration + + def up do + alter table(:triggers) do + add :cron_cursor_job_id, + references(:jobs, type: :binary_id, on_delete: :nilify_all), + null: true + end + + flush() + + # Backfill existing cron triggers to preserve current behavior: + # point at the first downstream job so the old per-step lookup is used. + execute(""" + UPDATE triggers + SET cron_cursor_job_id = workflow_edges.target_job_id + FROM workflow_edges + WHERE workflow_edges.source_trigger_id = triggers.id + AND triggers.type = 'cron' + """) + end + + def down do + alter table(:triggers) do + remove :cron_cursor_job_id + end + end +end diff --git a/test/integration/web_and_worker_test.exs b/test/integration/web_and_worker_test.exs index f6af34b6df9..d77592d38d4 100644 --- a/test/integration/web_and_worker_test.exs +++ b/test/integration/web_and_worker_test.exs @@ -108,8 +108,9 @@ defmodule Lightning.WebAndWorkerTest do assert %{state: :success} = WorkOrders.get(workorder_id) - # There was an initial http_request dataclip and 7 run_result dataclips - assert Repo.all(Lightning.Invocation.Dataclip) |> Enum.count() == 8 + # There was an initial http_request dataclip, 7 step_result dataclips, + # and 1 final_dataclip saved on the run + assert Repo.all(Lightning.Invocation.Dataclip) |> Enum.count() == 9 end @tag :integration @@ -572,6 +573,177 @@ defmodule Lightning.WebAndWorkerTest do assert work_order.state == :failed end + + @tag :integration + @tag timeout: 120_000 + test "returns final state with multiple leaf nodes from a branching workflow", + %{uri: uri} do + # + # +---+ + # | T | (webhook trigger) + # +---+ + # | + # +-+-+ + # | 1 | + # +-+-+ + # / \ + # / \ + # +-+-+ +-+-+ + # | 2 | | 3 | + # +-+-+ +-+-+ + # / \ | + # / \ | + # +-+-+ +-+-+ | + # | 4 | | 5 |<--+ + # +---+ +---+ + # + # Step 5 is reached twice (from step 2 and step 3). + # Step 4 is reached once (from step 2 only). + # The final_state payload should have 3 keys. + # + + project = insert(:project) + + webhook_trigger = build(:trigger, type: :webhook, enabled: true) + + job_1 = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => { + state.x = state.data.x * 2; + console.log("job1: x=" + state.x); + return state; + }); + """, + name: "step-1" + ) + + job_2 = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => { + state.x = state.x * 3; + console.log("job2: x=" + state.x); + return state; + }); + """, + name: "step-2" + ) + + job_3 = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => { + state.x = state.x * 5; + console.log("job3: x=" + state.x); + return state; + }); + """, + name: "step-3" + ) + + job_4 = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => { + state.x = state.x + 1; + console.log("job4: x=" + state.x); + return state; + }); + """, + name: "step-4" + ) + + job_5 = + build(:job, + adaptor: "@openfn/language-common@latest", + body: """ + fn(state => { + state.x = state.x + 100; + console.log("job5: x=" + state.x); + return state; + }); + """, + name: "step-5" + ) + + workflow = + build(:workflow, project: project) + |> with_trigger(webhook_trigger) + |> with_job(job_1) + |> with_edge({webhook_trigger, job_1}, condition_type: :always) + |> with_job(job_2) + |> with_edge({job_1, job_2}, condition_type: :on_job_success) + |> with_job(job_3) + |> with_edge({job_1, job_3}, condition_type: :on_job_success) + |> with_job(job_4) + |> with_edge({job_2, job_4}, condition_type: :on_job_success) + |> with_job(job_5) + |> with_edge({job_2, job_5}, condition_type: :on_job_success) + |> with_edge({job_3, job_5}, condition_type: :on_job_success) + |> insert() + + [trigger] = workflow.triggers + + trigger = + trigger + |> Ecto.Changeset.change(webhook_reply: :after_completion) + |> Repo.update!() + + Snapshot.create(workflow |> Repo.reload!()) + + # input x=1 + # step 1: x = 1 * 2 = 2 + # step 2: x = 2 * 3 = 6 (from step 1) + # step 3: x = 2 * 5 = 10 (from step 1) + # step 4: x = 6 + 1 = 7 (from step 2) — leaf + # step 5 via step 2: x = 6 + 100 = 106 — leaf + # step 5 via step 3: x = 10 + 100 = 110 — leaf + webhook_body = %{"x" => 1} + + response = + Tesla.client( + [ + {Tesla.Middleware.BaseUrl, uri}, + Tesla.Middleware.JSON + ], + {Tesla.Adapter.Finch, name: Lightning.Finch} + ) + |> Tesla.post!("/i/#{trigger.id}", webhook_body) + + assert response.status == 201 + + assert %{"data" => final_state, "meta" => meta} = response.body + + assert meta["state"] == "success" + + # The final_state is keyed by job ID. When the same job is reached + # twice (step 5), the second entry gets a "-1" suffix. + assert map_size(final_state) == 3 + + job_4_id = job_4.id + job_5_id = job_5.id + + # job 4 appears once, keyed by its job ID + assert %{"x" => 7} = final_state[job_4_id] + + # job 5 appears twice: once as job_5_id, once as job_5_id <> "-1" + assert %{"x" => x5a} = final_state[job_5_id] + assert %{"x" => x5b} = final_state[job_5_id <> "-1"] + + assert Enum.sort([x5a, x5b]) == [106, 110] + + # Verify all steps completed successfully + %{entries: steps} = Invocation.list_steps_for_project(project) + + # 1 root + 2 branches from root + 2 from step 2 + 1 from step 3 = 6 steps + assert Enum.count(steps) == 6 + assert Enum.all?(steps, fn step -> step.exit_reason == "success" end) + end end defp webhook_expression do diff --git a/test/lightning/collaboration/workflow_serializer_test.exs b/test/lightning/collaboration/workflow_serializer_test.exs index d561061d737..3df6484e57d 100644 --- a/test/lightning/collaboration/workflow_serializer_test.exs +++ b/test/lightning/collaboration/workflow_serializer_test.exs @@ -759,6 +759,7 @@ defmodule Lightning.Collaboration.WorkflowSerializerTest do "type" => "webhook", "enabled" => true, "cron_expression" => nil, + "cron_cursor_job_id" => nil, "kafka_configuration" => nil, "webhook_reply" => "before_start" } == extracted_trigger @@ -980,6 +981,7 @@ defmodule Lightning.Collaboration.WorkflowSerializerTest do "type" => to_string(original_trigger.type), "enabled" => original_trigger.enabled, "cron_expression" => original_trigger.cron_expression, + "cron_cursor_job_id" => original_trigger.cron_cursor_job_id, "kafka_configuration" => nil, "webhook_reply" => nil } == extracted_trigger @@ -1290,6 +1292,7 @@ defmodule Lightning.Collaboration.WorkflowSerializerTest do "type" => "cron", "enabled" => true, "cron_expression" => "0 */6 * * *", + "cron_cursor_job_id" => original_trigger.cron_cursor_job_id, "kafka_configuration" => nil, "webhook_reply" => nil } == trigger diff --git a/test/lightning/workflow_versions_test.exs b/test/lightning/workflow_versions_test.exs index 251ca02d12b..e0bb6e4f779 100644 --- a/test/lightning/workflow_versions_test.exs +++ b/test/lightning/workflow_versions_test.exs @@ -6,7 +6,7 @@ defmodule Lightning.WorkflowVersionsTest do alias Lightning.Repo alias Lightning.WorkflowVersions - alias Lightning.Workflows.{Workflow, WorkflowVersion} + alias Lightning.Workflows.WorkflowVersion @a "aaaaaaaaaaaa" @b "bbbbbbbbbbbb" diff --git a/test/lightning/workflows/scheduler_test.exs b/test/lightning/workflows/scheduler_test.exs index d0bd0cd96cc..c5821da435f 100644 --- a/test/lightning/workflows/scheduler_test.exs +++ b/test/lightning/workflows/scheduler_test.exs @@ -8,7 +8,7 @@ defmodule Lightning.Workflows.SchedulerTest do alias Lightning.Workflows.Scheduler describe "enqueue_cronjobs/1" do - test "enqueues a cron job that's never been run before" do + test "cron_cursor_job_id nil, no prior run: creates empty global dataclip" do job = insert(:job) trigger = @@ -45,7 +45,103 @@ defmodule Lightning.Workflows.SchedulerTest do assert Jason.decode!(run.dataclip.body) == %{} end - test "enqueues a cron job that has been run before" do + test "cron_cursor_job_id nil, prior successful run: uses final_dataclip_id" do + job = insert(:job) + + trigger = + insert(:trigger, %{ + type: :cron, + cron_expression: "* * * * *", + workflow: job.workflow + }) + + insert(:edge, %{ + workflow: job.workflow, + source_trigger: trigger, + target_job: job + }) + + final_dataclip = + insert(:dataclip, + type: :step_result, + body: %{"final" => "state"} + ) + + with_snapshot(job.workflow) + + insert(:run, + work_order: + build(:workorder, + workflow: job.workflow, + dataclip: insert(:dataclip), + trigger: trigger, + state: :success + ), + starting_trigger: trigger, + state: :success, + dataclip: insert(:dataclip), + final_dataclip: final_dataclip, + finished_at: DateTime.utc_now(), + steps: [] + ) + + Mox.expect( + Lightning.Extensions.MockUsageLimiter, + :limit_action, + fn _action, _context -> :ok end + ) + + Scheduler.enqueue_cronjobs() + + new_run = + Run + |> last(:inserted_at) + |> preload(dataclip: ^Invocation.Query.dataclip_with_body()) + |> Repo.one() + + assert new_run.dataclip.type == :step_result + assert Jason.decode!(new_run.dataclip.body) == %{"final" => "state"} + end + + test "cron_cursor_job_id set, no prior run: creates empty global dataclip" do + job = insert(:job) + + trigger = + insert(:trigger, %{ + type: :cron, + cron_expression: "* * * * *", + workflow: job.workflow, + cron_cursor_job_id: job.id + }) + + insert(:edge, %{ + workflow: job.workflow, + source_trigger: trigger, + target_job: job + }) + + with_snapshot(job.workflow) + + Mox.expect( + Lightning.Extensions.MockUsageLimiter, + :limit_action, + fn _action, _context -> :ok end + ) + + Scheduler.enqueue_cronjobs() + + run = Repo.one(Run) + + assert run.starting_trigger_id == trigger.id + + run = + Repo.preload(run, dataclip: Invocation.Query.dataclip_with_body()) + + assert run.dataclip.type == :global + assert Jason.decode!(run.dataclip.body) == %{} + end + + test "cron_cursor_job_id set, prior successful run: uses that job's output" do job = insert(:job, body: "fn(state => { console.log(state); return { changed: true }; })" @@ -55,7 +151,8 @@ defmodule Lightning.Workflows.SchedulerTest do insert(:trigger, %{ type: :cron, cron_expression: "* * * * *", - workflow: job.workflow + workflow: job.workflow, + cron_cursor_job_id: job.id }) insert(:edge, %{ diff --git a/test/lightning/workflows/trigger_test.exs b/test/lightning/workflows/trigger_test.exs index 9e0c80f9f35..20425c1400e 100644 --- a/test/lightning/workflows/trigger_test.exs +++ b/test/lightning/workflows/trigger_test.exs @@ -217,6 +217,91 @@ defmodule Lightning.Workflows.TriggerTest do assert get_field(changeset, :webhook_reply) == nil end + test "cron_cursor_job_id is cast for cron triggers" do + job_id = Ecto.UUID.generate() + + changeset = + Trigger.changeset(%Trigger{}, %{ + type: :cron, + cron_expression: "* * * * *", + cron_cursor_job_id: job_id + }) + + assert get_field(changeset, :cron_cursor_job_id) == job_id + end + + test "cron_cursor_job_id is cleared when type changes to :webhook" do + job_id = Ecto.UUID.generate() + + changeset = + Trigger.changeset(%Trigger{}, %{ + type: :webhook, + cron_cursor_job_id: job_id + }) + + assert get_field(changeset, :cron_cursor_job_id) == nil + + # Also when converting from cron to webhook + changeset = + Trigger.changeset( + %Trigger{ + type: :cron, + cron_expression: "* * * * *", + cron_cursor_job_id: job_id + }, + %{type: :webhook} + ) + + assert get_field(changeset, :cron_cursor_job_id) == nil + end + + test "cron_cursor_job_id is cleared when type changes to :kafka" do + job_id = Ecto.UUID.generate() + + changeset = + Trigger.changeset(%Trigger{}, %{ + type: :kafka, + cron_cursor_job_id: job_id, + kafka_configuration: %{ + group_id: "group_id", + hosts: [["host1", "9092"]], + hosts_string: "host1:9092", + initial_offset_reset_policy: "earliest", + sasl: "plain", + ssl: true, + topics: ["foo"], + topics_string: "foo" + } + }) + + assert get_field(changeset, :cron_cursor_job_id) == nil + + # Also when converting from cron to kafka + changeset = + Trigger.changeset( + %Trigger{ + type: :cron, + cron_expression: "* * * * *", + cron_cursor_job_id: job_id + }, + %{ + type: :kafka, + kafka_configuration: %{ + group_id: "group_id", + hosts: [["host1", "9092"]], + hosts_string: "host1:9092", + initial_offset_reset_policy: "earliest", + sasl: "plain", + ssl: true, + topics: ["foo"], + topics_string: "foo" + } + } + ) + + assert get_field(changeset, :cron_cursor_job_id) == nil + end + test "allows webhook_reply to be set for webhook triggers" do changeset = Trigger.changeset(%Trigger{}, %{ diff --git a/test/lightning_web/live/workflow_live/new_manual_run_test.exs b/test/lightning_web/live/workflow_live/new_manual_run_test.exs index 12b0a7c19b6..5b159921b52 100644 --- a/test/lightning_web/live/workflow_live/new_manual_run_test.exs +++ b/test/lightning_web/live/workflow_live/new_manual_run_test.exs @@ -50,330 +50,277 @@ defmodule LightningWeb.WorkflowLive.NewManualRunTest do assert changeset.errors |> Enum.any?(&match?({:type, {"is invalid", _}}, &1)) end - describe "search_selectable_dataclips/4" do - test "returns next cron run for cron-triggered workflows with successful runs" do - project = insert(:project) - - # Create a cron-triggered workflow + describe "cron cursor: next cron run dataclip" do + defp build_two_job_cron_workflow(project, opts \\ []) do cron_trigger = build(:trigger, type: :cron, cron_expression: "0 0 * * *") - job = build(:job) + job_a = build(:job, name: "Job A") + job_b = build(:job, name: "Job B") workflow = build(:workflow, project: project) |> with_trigger(cron_trigger) - |> with_job(job) - |> with_edge({cron_trigger, job}) + |> with_job(job_a) + |> with_job(job_b) + |> with_edge({cron_trigger, job_a}) + |> with_edge({job_a, job_b}) |> insert() |> with_snapshot() - # Create a successful step with output dataclip - input_dataclip = - insert(:dataclip, project: project, body: %{"input" => "data"}) - - output_dataclip = - insert(:dataclip, - project: project, - body: %{"output" => "result"}, - type: :step_result - ) + job_a_record = Enum.find(workflow.jobs, &(&1.name == "Job A")) + job_b_record = Enum.find(workflow.jobs, &(&1.name == "Job B")) + trigger_record = hd(workflow.triggers) + + if cursor_job_id = opts[:cron_cursor_job_id] do + trigger_record + |> Ecto.Changeset.change(%{cron_cursor_job_id: cursor_job_id}) + |> Lightning.Repo.update!() + end + + %{ + workflow: workflow, + trigger: trigger_record, + job_a: job_a_record, + job_b: job_b_record + } + end - insert(:step, - job: workflow.jobs |> List.first(), - input_dataclip: input_dataclip, - output_dataclip: output_dataclip, - exit_reason: "success", - finished_at: DateTime.utc_now() - ) + defp create_successful_run(workflow, trigger, project, final_dataclip) do + snapshot = Lightning.Workflows.Snapshot.get_current_for(workflow) - job_id = workflow.jobs |> List.first() |> Map.get(:id) + work_order = + insert(:workorder, + workflow: workflow, + trigger_id: trigger.id, + snapshot: snapshot + ) + |> with_run( + build(:run, + dataclip: + insert(:dataclip, project: project, body: %{"init" => true}), + snapshot: snapshot, + starting_trigger: trigger + ) + ) - {:ok, result} = - NewManualRun.search_selectable_dataclips(job_id, "query=+", 10, 0) + run = hd(work_order.runs) - assert %{ - dataclips: dataclips, - next_cron_run_dataclip_id: next_cron_run_dataclip_id - } = result + run + |> Ecto.Changeset.change(%{state: :claimed}) + |> Lightning.Repo.update!() - assert next_cron_run_dataclip_id == output_dataclip.id - assert List.first(dataclips).id == output_dataclip.id + run + |> Ecto.Changeset.change(%{ + state: :success, + finished_at: DateTime.utc_now(), + final_dataclip_id: final_dataclip.id + }) + |> Lightning.Repo.update!() end - test "returns next cron run for cron-triggered workflows with successful runs that matches name" do + test "cursor nil: uses final_dataclip_id from the last successful run, not step outputs" do project = insert(:project) - # Create a cron-triggered workflow - cron_trigger = build(:trigger, type: :cron, cron_expression: "0 0 * * *") - job = build(:job) + %{workflow: workflow, trigger: trigger, job_a: job_a, job_b: job_b} = + build_two_job_cron_workflow(project) - workflow = - build(:workflow, project: project) - |> with_trigger(cron_trigger) - |> with_job(job) - |> with_edge({cron_trigger, job}) - |> insert() - |> with_snapshot() + # The run's final_dataclip -- this is what the cron scheduler should pick + final_dataclip = + insert(:dataclip, + project: project, + body: %{"final" => "run state"}, + type: :step_result + ) + + create_successful_run(workflow, trigger, project, final_dataclip) - # Create a successful step with output dataclip - input_dataclip = - insert(:dataclip, project: project, body: %{"input" => "data"}) + # Create step outputs for both jobs -- these should NOT be chosen + job_a_output = + insert(:dataclip, + project: project, + body: %{"job_a" => "output"}, + type: :step_result + ) - output_dataclip = + job_b_output = insert(:dataclip, - name: "123abc246", project: project, - body: %{"output" => "result"}, + body: %{"job_b" => "output"}, type: :step_result ) insert(:step, - job: workflow.jobs |> List.first(), - input_dataclip: input_dataclip, - output_dataclip: output_dataclip, + job: job_a, + input_dataclip: insert(:dataclip, project: project, body: %{}), + output_dataclip: job_a_output, exit_reason: "success", finished_at: DateTime.utc_now() ) - job_id = workflow.jobs |> List.first() |> Map.get(:id) + insert(:step, + job: job_b, + input_dataclip: insert(:dataclip, project: project, body: %{}), + output_dataclip: job_b_output, + exit_reason: "success", + finished_at: DateTime.utc_now() + ) {:ok, result} = - NewManualRun.search_selectable_dataclips(job_id, "query=abc", 10, 0) - - assert %{ - dataclips: dataclips, - next_cron_run_dataclip_id: next_cron_run_dataclip_id - } = result + NewManualRun.search_selectable_dataclips( + job_a.id, + "query=+", + 10, + 0 + ) - assert next_cron_run_dataclip_id == output_dataclip.id - assert List.first(dataclips).id == output_dataclip.id + # Must be the run's final_dataclip, not any step output + assert result.next_cron_run_dataclip_id == final_dataclip.id + refute result.next_cron_run_dataclip_id == job_a_output.id + refute result.next_cron_run_dataclip_id == job_b_output.id end - test "does not return next cron run for cron-triggered that don't match the name" do + test "cursor set to job_b: uses job_b's step output, not job_a's or the run's final_dataclip" do project = insert(:project) - # Create a cron-triggered workflow - cron_trigger = build(:trigger, type: :cron, cron_expression: "0 0 * * *") - job = build(:job) - - workflow = - build(:workflow, project: project) - |> with_trigger(cron_trigger) - |> with_job(job) - |> with_edge({cron_trigger, job}) - |> insert() - |> with_snapshot() + %{workflow: workflow, trigger: trigger, job_a: job_a, job_b: job_b} = + build_two_job_cron_workflow(project, + cron_cursor_job_id: nil + ) - # Create a successful step with output dataclip - input_dataclip = - insert(:dataclip, project: project, body: %{"input" => "data"}) + # We need the job_b id, so update the trigger now + trigger + |> Ecto.Changeset.change(%{cron_cursor_job_id: job_b.id}) + |> Lightning.Repo.update!() - output_dataclip = + # Create a run with a final_dataclip that should NOT be chosen + final_dataclip = insert(:dataclip, project: project, - body: %{"output" => "result"}, + body: %{"final" => "run state"}, type: :step_result ) - insert(:step, - job: hd(workflow.jobs), - input_dataclip: input_dataclip, - output_dataclip: output_dataclip, - exit_reason: "success", - finished_at: DateTime.utc_now() - ) - - job_id = workflow.jobs |> hd() |> Map.get(:id) - - {:ok, result} = - NewManualRun.search_selectable_dataclips(job_id, "query=abc", 10, 0) - - assert %{ - dataclips: [], - next_cron_run_dataclip_id: next_cron_run_dataclip_id - } = result - - assert next_cron_run_dataclip_id == output_dataclip.id - end - - test "does not return next cron run for webhook-triggered workflows" do - project = insert(:project) - - # Create a webhook-triggered workflow - webhook_trigger = build(:trigger, type: :webhook) - job = build(:job) - - workflow = - build(:workflow, project: project) - |> with_trigger(webhook_trigger) - |> with_job(job) - |> with_edge({webhook_trigger, job}) - |> insert() - |> with_snapshot() + create_successful_run(workflow, trigger, project, final_dataclip) - # Create a successful step with output dataclip - input_dataclip = - insert(:dataclip, project: project, body: %{"input" => "data"}) + # Step outputs: different dataclips for each job + job_a_output = + insert(:dataclip, + project: project, + body: %{"job_a" => "output"}, + type: :step_result + ) - output_dataclip = + job_b_output = insert(:dataclip, project: project, - body: %{"output" => "result"}, + body: %{"job_b" => "output"}, type: :step_result ) insert(:step, - job: workflow.jobs |> List.first(), - input_dataclip: input_dataclip, - output_dataclip: output_dataclip, + job: job_a, + input_dataclip: insert(:dataclip, project: project, body: %{}), + output_dataclip: job_a_output, exit_reason: "success", finished_at: DateTime.utc_now() ) - job_id = workflow.jobs |> List.first() |> Map.get(:id) + insert(:step, + job: job_b, + input_dataclip: insert(:dataclip, project: project, body: %{}), + output_dataclip: job_b_output, + exit_reason: "success", + finished_at: DateTime.utc_now() + ) {:ok, result} = - NewManualRun.search_selectable_dataclips(job_id, "query=+", 10, 0) - - assert %{ - dataclips: _dataclips, - next_cron_run_dataclip_id: next_cron_run_dataclip_id - } = - result + NewManualRun.search_selectable_dataclips( + job_a.id, + "query=+", + 10, + 0 + ) - assert next_cron_run_dataclip_id == nil + # Must be job_b's step output specifically + assert result.next_cron_run_dataclip_id == job_b_output.id + refute result.next_cron_run_dataclip_id == job_a_output.id + refute result.next_cron_run_dataclip_id == final_dataclip.id end - test "does not return next cron run for cron-triggered workflows with no runs" do + test "cursor nil, no successful runs: returns nil" do project = insert(:project) - # Create a cron-triggered workflow without any runs - cron_trigger = build(:trigger, type: :cron, cron_expression: "0 0 * * *") - job = build(:job) - - workflow = - build(:workflow, project: project) - |> with_trigger(cron_trigger) - |> with_job(job) - |> with_edge({cron_trigger, job}) - |> insert() - |> with_snapshot() - - job_id = workflow.jobs |> List.first() |> Map.get(:id) + %{job_a: job_a} = build_two_job_cron_workflow(project) {:ok, result} = - NewManualRun.search_selectable_dataclips(job_id, "query=+", 10, 0) - - assert %{ - dataclips: _dataclips, - next_cron_run_dataclip_id: next_cron_run_dataclip_id - } = - result + NewManualRun.search_selectable_dataclips( + job_a.id, + "query=+", + 10, + 0 + ) - assert next_cron_run_dataclip_id == nil + assert result.next_cron_run_dataclip_id == nil end - test "does not return next cron run for cron-triggered workflows with only failed runs" do + test "cursor set to job_b, no successful steps: returns nil" do project = insert(:project) - # Create a cron-triggered workflow - cron_trigger = build(:trigger, type: :cron, cron_expression: "0 0 * * *") - job = build(:job) - - workflow = - build(:workflow, project: project) - |> with_trigger(cron_trigger) - |> with_job(job) - |> with_edge({cron_trigger, job}) - |> insert() - |> with_snapshot() - - # Create a failed step (no output dataclip for failed steps) - input_dataclip = - insert(:dataclip, project: project, body: %{"input" => "data"}) - - insert(:step, - job: workflow.jobs |> List.first(), - input_dataclip: input_dataclip, - output_dataclip: nil, - exit_reason: "failed", - finished_at: DateTime.utc_now() - ) + %{trigger: trigger, job_a: job_a, job_b: job_b} = + build_two_job_cron_workflow(project) - job_id = workflow.jobs |> List.first() |> Map.get(:id) + trigger + |> Ecto.Changeset.change(%{cron_cursor_job_id: job_b.id}) + |> Lightning.Repo.update!() {:ok, result} = - NewManualRun.search_selectable_dataclips(job_id, "query=+", 10, 0) - - assert %{ - dataclips: _dataclips, - next_cron_run_dataclip_id: next_cron_run_dataclip_id - } = - result + NewManualRun.search_selectable_dataclips( + job_a.id, + "query=+", + 10, + 0 + ) - assert next_cron_run_dataclip_id == nil + assert result.next_cron_run_dataclip_id == nil end - test "does not return next cron run for cron-triggered workflows with failed runs after successful ones" do + test "webhook trigger: returns nil regardless of step history" do project = insert(:project) - # Create a cron-triggered workflow - cron_trigger = build(:trigger, type: :cron, cron_expression: "0 0 * * *") + webhook_trigger = build(:trigger, type: :webhook) job = build(:job) workflow = build(:workflow, project: project) - |> with_trigger(cron_trigger) + |> with_trigger(webhook_trigger) |> with_job(job) - |> with_edge({cron_trigger, job}) + |> with_edge({webhook_trigger, job}) |> insert() |> with_snapshot() - # Create a successful step with output dataclip (older) - input_dataclip1 = - insert(:dataclip, project: project, body: %{"input" => "data1"}) - - output_dataclip1 = - insert(:dataclip, - project: project, - body: %{"output" => "result1"}, - type: :step_result - ) + job_record = hd(workflow.jobs) insert(:step, - job: workflow.jobs |> List.first(), - input_dataclip: input_dataclip1, - output_dataclip: output_dataclip1, + job: job_record, + input_dataclip: insert(:dataclip, project: project, body: %{}), + output_dataclip: + insert(:dataclip, + project: project, + body: %{"out" => true}, + type: :step_result + ), exit_reason: "success", - # 1 hour ago - finished_at: DateTime.utc_now() |> DateTime.add(-3600, :second) - ) - - # Create a failed step (more recent) - input_dataclip2 = - insert(:dataclip, project: project, body: %{"input" => "data2"}) - - insert(:step, - job: workflow.jobs |> List.first(), - input_dataclip: input_dataclip2, - output_dataclip: nil, - exit_reason: "failed", finished_at: DateTime.utc_now() ) - job_id = workflow.jobs |> List.first() |> Map.get(:id) - {:ok, result} = - NewManualRun.search_selectable_dataclips(job_id, "query=+", 10, 0) - - # Should still return the successful run's output as next cron run - # because last_successful_step_for_job looks for the most recent successful step - assert %{ - dataclips: dataclips, - next_cron_run_dataclip_id: next_cron_run_dataclip_id - } = result + NewManualRun.search_selectable_dataclips( + job_record.id, + "query=+", + 10, + 0 + ) - assert next_cron_run_dataclip_id == output_dataclip1.id - assert List.first(dataclips).id == output_dataclip1.id + assert result.next_cron_run_dataclip_id == nil end end end