diff --git a/.github/workflows/signoff-commit.yml b/.github/workflows/signoff-commit.yml index 7caabea24..ce5425452 100644 --- a/.github/workflows/signoff-commit.yml +++ b/.github/workflows/signoff-commit.yml @@ -71,7 +71,7 @@ jobs: - name: Set up Node.js uses: actions/setup-node@v4 with: - node-version: '20.11.0' + node-version: '20.18.0' cache: 'npm' cache-dependency-path: 'assets/package-lock.json' diff --git a/assets/svelte/components/ColumnSelectionForm.svelte b/assets/svelte/components/ColumnSelectionForm.svelte new file mode 100644 index 000000000..6c5676dea --- /dev/null +++ b/assets/svelte/components/ColumnSelectionForm.svelte @@ -0,0 +1,382 @@ + + +
+ + + {#if isEdit && (columnSelectionMode !== "all" || excludeColumnAttnums.length > 0 || includeColumnAttnums.length > 0)} + + + Column selection changes will apply to new changes going forward, but + will not affect existing events already captured in the destination + table. + + + {/if} + +
+ +
+
+ + +
+
+ + +
+
+ + +
+
+
+ + {#if columnSelectionMode !== "all" && availableColumns.length > 0} +
+ + +
+ {#each availableColumns as column} + {@const isPk = column.isPk || column["isPk?"] || false} + {@const isChecked = isColumnSelected(column.attnum)} +
+ {#if isPk} + + +
+ + + {column.name} + + PK + + + ({column.type}) + + +
+
+ +

+ {#if columnSelectionMode === "exclude"} + Primary key columns cannot be excluded from sync. They + are always included and required for change tracking. + {:else} + Primary key columns are always included in sync. They + are required for change tracking and cannot be + unchecked. + {/if} +

+
+
+ {:else} + toggleColumn(column.attnum)} + /> + + {/if} +
+ {/each} +
+ + {#if selectedColumns.length > 0} +
+ {#each selectedColumns as column} + {@const isPk = column.isPk || column["isPk?"] || false} + {#if isPk} + + +
+ {column.name} + + PK + +
+
+ +

+ {#if columnSelectionMode === "exclude"} + Primary key columns cannot be excluded from sync. They + are always included and required for change tracking. + {:else} + Primary key columns are always included in sync. They + are required for change tracking and cannot be + unchecked. + {/if} +

+
+
+ {:else} +
+ {column.name} + +
+ {/if} + {/each} +
+ {/if} +
+ {/if} +
+
diff --git a/assets/svelte/wal_pipelines/Form.svelte b/assets/svelte/wal_pipelines/Form.svelte index 8349bfd06..94d947a5c 100644 --- a/assets/svelte/wal_pipelines/Form.svelte +++ b/assets/svelte/wal_pipelines/Form.svelte @@ -15,6 +15,7 @@ } from "$lib/components/ui/card"; import TableSelector from "../components/TableSelector.svelte"; import FilterForm from "../components/FilterForm.svelte"; + import ColumnSelectionForm from "../components/ColumnSelectionForm.svelte"; import FullPageForm from "../components/FullPageForm.svelte"; import { cn } from "$lib/utils"; @@ -42,6 +43,8 @@ "delete", ], sourceTableFilters: walPipeline.sourceTableFilters || [], + excludeColumnAttnums: walPipeline.excludeColumnAttnums || [], + includeColumnAttnums: walPipeline.includeColumnAttnums || [], sortColumnAttnum: walPipeline.sortColumnAttnum || "", }; @@ -161,6 +164,12 @@ bind:form onFilterChange={(filters) => (form.sourceTableFilters = filters)} /> + {/if} diff --git a/assets/svelte/wal_pipelines/Show.svelte b/assets/svelte/wal_pipelines/Show.svelte index 26fd13c29..8296a4e4c 100644 --- a/assets/svelte/wal_pipelines/Show.svelte +++ b/assets/svelte/wal_pipelines/Show.svelte @@ -283,6 +283,42 @@ Table not found {/if} +
+

Column selection

+ {#if walPipeline.exclude_columns && walPipeline.exclude_columns.length > 0} +
+

+ Excluded columns: +

+
+ {#each walPipeline.exclude_columns as column} + + {column} + + {/each} +
+
+ {:else if walPipeline.include_columns && walPipeline.include_columns.length > 0} +
+

+ Included columns: +

+
+ {#each walPipeline.include_columns as column} + + {column} + + {/each} +
+
+ {:else} +

All columns are synced

+ {/if} +

Filters

{#if walPipeline.source_filters.length > 0} diff --git a/docs/how-to/setup-change-retention.mdx b/docs/how-to/setup-change-retention.mdx index 9fe3cfe94..0715d90ef 100644 --- a/docs/how-to/setup-change-retention.mdx +++ b/docs/how-to/setup-change-retention.mdx @@ -30,6 +30,13 @@ Open the Sequin web console and navigate to the **Change Retention** tab. Click Optionally, specify which operations to capture (e.g., `insert`, `update`, and/or `delete`). You can also add [column filters](/reference/filters) to capture only certain changes. + + + Optionally, select which columns to include or exclude from the synced changes. This allows you to exclude sensitive columns or irrelevant metadata columns, or include only the specific columns you need. You can choose to: + - **Sync all columns** (default) + - **Exclude specific columns**: All columns except the ones you select will be synced + - **Include only specific columns**: Only the columns you select will be synced + ### Configure destination diff --git a/docs/reference/change-retention.mdx b/docs/reference/change-retention.mdx index e45d0f723..110891d49 100644 --- a/docs/reference/change-retention.mdx +++ b/docs/reference/change-retention.mdx @@ -22,6 +22,12 @@ You can configure change retention for tables in the Sequin web console, under t Select the table you want to retain changes for. You can specify which operations to capture (i.e. inserts, updates, and/or deletes). And you can filter the changes to capture with one or more column [filters](/reference/filters). +You can also configure column selection to exclude or include specific columns from being synced. This allows you to exclude sensitive columns or irrelevant metadata columns, or include only the specific columns you need. You can either: +- **Exclude specific columns**: All columns except the ones you specify will be synced +- **Include only specific columns**: Only the columns you specify will be synced + +Note: You cannot use both `exclude_columns` and `include_columns` at the same time. + ### Destination configuration Sequin writes events to an [event table](#event-table) with a Sequin-specified schema. If you already have an event table ready to go, you can select it. Otherwise, click "Create new event table" which will open a modal that will provide instructions for creating an event table in your database. diff --git a/docs/reference/sequin-yaml.mdx b/docs/reference/sequin-yaml.mdx index f8ceba897..d5d72d9ca 100644 --- a/docs/reference/sequin-yaml.mdx +++ b/docs/reference/sequin-yaml.mdx @@ -726,6 +726,16 @@ change_retentions: operator: "=" comparison_value: "premium" field_type: "string" + exclude_columns: # Optional, exclude specific columns from sync + - "password_hash" + - "ssn" + - "credit_card_number" + # OR use include_columns (mutually exclusive with exclude_columns) + # include_columns: + # - "id" + # - "email" + # - "name" + # - "created_at" ``` ## Environment variable substitution diff --git a/lib/sequin/consumers/consumers.ex b/lib/sequin/consumers/consumers.ex index 1ecfab0d4..5aceac6a2 100644 --- a/lib/sequin/consumers/consumers.ex +++ b/lib/sequin/consumers/consumers.ex @@ -652,19 +652,36 @@ defmodule Sequin.Consumers do def message_pks(%Message{} = message), do: Enum.map(message.ids, &to_string/1) - def message_record(%Message{} = message) do - message - |> Message.message_fields() - |> Message.fields_to_map() + def message_record(%Message{} = message, source_table \\ nil) do + fields = + message + |> Message.message_fields() + |> maybe_filter_columns(source_table) + + Message.fields_to_map(fields) + end + + defp maybe_filter_columns(fields, nil), do: fields + + defp maybe_filter_columns(fields, %Sequin.WalPipeline.SourceTable{} = source_table) do + Sequin.WalPipeline.SourceTable.ColumnSelection.filter_fields(fields, source_table) end - def message_changes(%Message{action: :insert}), do: nil - def message_changes(%Message{action: :delete}), do: nil - def message_changes(%Message{action: :update, old_fields: nil}), do: %{} + def message_changes(message, source_table \\ nil) + def message_changes(%Message{action: :insert}, _source_table), do: nil + def message_changes(%Message{action: :delete}, _source_table), do: nil + def message_changes(%Message{action: :update, old_fields: nil}, _source_table), do: %{} + + def message_changes(%Message{action: :update} = message, source_table) do + old_fields = + message.old_fields + |> maybe_filter_columns(source_table) + |> Message.fields_to_map() - def message_changes(%Message{action: :update} = message) do - old_fields = Message.fields_to_map(message.old_fields) - new_fields = Message.fields_to_map(message.fields) + new_fields = + message.fields + |> maybe_filter_columns(source_table) + |> Message.fields_to_map() Enum.reduce(old_fields, %{}, fn {k, v}, acc -> if Map.get(new_fields, k) in [:unchanged_toast, v] do diff --git a/lib/sequin/replication/source_table/column_selection.ex b/lib/sequin/replication/source_table/column_selection.ex new file mode 100644 index 000000000..fec77669b --- /dev/null +++ b/lib/sequin/replication/source_table/column_selection.ex @@ -0,0 +1,87 @@ +defmodule Sequin.WalPipeline.SourceTable.ColumnSelection do + @moduledoc false + + alias Sequin.Error + alias Sequin.Runtime.SlotProcessor.Message.Field + alias Sequin.WalPipeline.SourceTable + + @doc """ + Filters a list of fields based on the source table's column selection configuration. + + Returns all fields if no column selection is configured. + """ + @spec filter_fields([Field.t()], SourceTable.t()) :: [Field.t()] + def filter_fields(fields, %SourceTable{} = source_table) do + validate_column_selection!(source_table) + + cond do + # Include specific columns + is_list(source_table.include_column_attnums) and source_table.include_column_attnums != [] -> + Enum.filter(fields, fn field -> + field.column_attnum in source_table.include_column_attnums + end) + + # Exclude specific columns + is_list(source_table.exclude_column_attnums) and source_table.exclude_column_attnums != [] -> + Enum.reject(fields, fn field -> + field.column_attnum in source_table.exclude_column_attnums + end) + + # No filtering configured + true -> + fields + end + end + + @doc """ + Filters column attnums from a list of columns (used during backfills). + """ + @spec filter_column_attnums([integer()], SourceTable.t() | nil) :: [integer()] + def filter_column_attnums(column_attnums, nil), do: column_attnums + + def filter_column_attnums(column_attnums, %SourceTable{} = source_table) do + validate_column_selection!(source_table) + + cond do + # Include specific columns + is_list(source_table.include_column_attnums) and source_table.include_column_attnums != [] -> + Enum.filter(column_attnums, fn attnum -> + attnum in source_table.include_column_attnums + end) + + # Exclude specific columns + is_list(source_table.exclude_column_attnums) and source_table.exclude_column_attnums != [] -> + Enum.reject(column_attnums, fn attnum -> + attnum in source_table.exclude_column_attnums + end) + + # No filtering configured + true -> + column_attnums + end + end + + @doc """ + Returns true if column selection is configured (either include or exclude). + """ + @spec has_column_selection?(SourceTable.t()) :: boolean() + def has_column_selection?(%SourceTable{} = source_table) do + (is_list(source_table.include_column_attnums) and source_table.include_column_attnums != []) or + (is_list(source_table.exclude_column_attnums) and source_table.exclude_column_attnums != []) + end + + defp validate_column_selection!(%SourceTable{} = source_table) do + has_include = is_list(source_table.include_column_attnums) and source_table.include_column_attnums != [] + has_exclude = is_list(source_table.exclude_column_attnums) and source_table.exclude_column_attnums != [] + + if has_include and has_exclude do + raise Error.invariant( + message: + "Cannot specify both include_column_attnums and exclude_column_attnums. " <> + "These options are mutually exclusive. " <> + "Found include_column_attnums: #{inspect(source_table.include_column_attnums)}, " <> + "exclude_column_attnums: #{inspect(source_table.exclude_column_attnums)}" + ) + end + end +end diff --git a/lib/sequin/replication/source_table/source_table.ex b/lib/sequin/replication/source_table/source_table.ex index f7440fcb0..83c82376f 100644 --- a/lib/sequin/replication/source_table/source_table.ex +++ b/lib/sequin/replication/source_table/source_table.ex @@ -7,7 +7,17 @@ defmodule Sequin.WalPipeline.SourceTable do alias Sequin.WalPipeline.SourceTable.ColumnFilter @derive {Jason.Encoder, - only: [:oid, :schema_name, :table_name, :actions, :column_filters, :sort_column_attnum, :group_column_attnums]} + only: [ + :oid, + :schema_name, + :table_name, + :actions, + :column_filters, + :sort_column_attnum, + :group_column_attnums, + :include_column_attnums, + :exclude_column_attnums + ]} @type t :: %__MODULE__{ oid: integer, @@ -16,7 +26,9 @@ defmodule Sequin.WalPipeline.SourceTable do actions: [atom()], column_filters: [ColumnFilter.t()], sort_column_attnum: integer() | nil, - group_column_attnums: [integer()] | nil + group_column_attnums: [integer()] | nil, + include_column_attnums: [integer()] | nil, + exclude_column_attnums: [integer()] | nil } @type filter_type :: :string | :number | :boolean | :datetime @@ -29,15 +41,41 @@ defmodule Sequin.WalPipeline.SourceTable do field :sort_column_attnum, :integer field :actions, {:array, Ecto.Enum}, values: [:insert, :update, :delete] field :group_column_attnums, {:array, :integer} + field :include_column_attnums, {:array, :integer} + field :exclude_column_attnums, {:array, :integer} embeds_many :column_filters, ColumnFilter end def changeset(source_table, attrs) do source_table - |> cast(attrs, [:oid, :schema_name, :table_name, :actions, :sort_column_attnum, :group_column_attnums]) + |> cast(attrs, [ + :oid, + :schema_name, + :table_name, + :actions, + :sort_column_attnum, + :group_column_attnums, + :include_column_attnums, + :exclude_column_attnums + ]) |> validate_required([:oid, :actions]) |> cast_embed(:column_filters, with: &ColumnFilter.changeset/2) |> validate_length(:actions, min: 1) + |> validate_mutually_exclusive([:include_column_attnums, :exclude_column_attnums]) + end + + defp validate_mutually_exclusive(changeset, fields) do + present_fields = + Enum.filter(fields, fn field -> + value = get_field(changeset, field) + is_list(value) and value != [] + end) + + case present_fields do + [] -> changeset + [_field] -> changeset + [field1, field2 | _] -> add_error(changeset, field2, "cannot be set when #{field1} is set") + end end def record_changeset(source_table, attrs) do diff --git a/lib/sequin/runtime/message_handler.ex b/lib/sequin/runtime/message_handler.ex index cb4e6cefd..57ae9781b 100644 --- a/lib/sequin/runtime/message_handler.ex +++ b/lib/sequin/runtime/message_handler.ex @@ -284,13 +284,15 @@ defmodule Sequin.Runtime.MessageHandler do end defp wal_event(pipeline, message) do + source_table = Enum.find(pipeline.source_tables, &(&1.oid == message.table_oid)) + wal_event = %WalEvent{ wal_pipeline_id: pipeline.id, commit_lsn: message.commit_lsn, commit_idx: message.commit_idx, record_pks: Consumers.message_pks(message), - record: Consumers.message_record(message), - changes: Consumers.message_changes(message), + record: Consumers.message_record(message, source_table), + changes: Consumers.message_changes(message, source_table), action: message.action, committed_at: message.commit_timestamp, replication_message_trace_id: message.trace_id, diff --git a/lib/sequin/transforms/transforms.ex b/lib/sequin/transforms/transforms.ex index 573d79eae..bf2a57357 100644 --- a/lib/sequin/transforms/transforms.ex +++ b/lib/sequin/transforms/transforms.ex @@ -492,6 +492,12 @@ defmodule Sequin.Transforms do destination_table = Sequin.Enum.find!(wal_pipeline.destination_database.tables, &(&1.oid == wal_pipeline.destination_oid)) + source_table_struct = + Sequin.Enum.find!( + wal_pipeline.source_database.tables, + &(&1.oid == source_table.oid) + ) + %{ id: wal_pipeline.id, name: wal_pipeline.name, @@ -502,7 +508,9 @@ defmodule Sequin.Transforms do destination_table_schema: destination_table.schema, destination_table_name: destination_table.name, filters: Enum.map(source_table.column_filters, &to_external/1), - actions: source_table.actions + actions: source_table.actions, + exclude_columns: column_attnums_to_names(source_table.exclude_column_attnums, source_table_struct), + include_columns: column_attnums_to_names(source_table.include_column_attnums, source_table_struct) } end @@ -545,6 +553,19 @@ defmodule Sequin.Transforms do ) end + defp column_attnums_to_names(nil, _table), do: nil + + defp column_attnums_to_names(attnums, table) when is_list(attnums) and attnums != [] do + attnums + |> Enum.map(fn attnum -> + Enum.find(table.columns, &(&1.attnum == attnum)) + end) + |> Enum.filter(& &1) + |> Enum.map(& &1.name) + end + + defp column_attnums_to_names(_, _table), do: nil + defp group_column_names(%SourceTable{}, nil), do: nil defp group_column_names(%SourceTable{} = source_table, table) do diff --git a/lib/sequin/yaml_loader.ex b/lib/sequin/yaml_loader.ex index 55a105e74..274a22512 100644 --- a/lib/sequin/yaml_loader.ex +++ b/lib/sequin/yaml_loader.ex @@ -732,14 +732,26 @@ defmodule Sequin.YamlLoader do {:ok, source_table_struct} <- fetch_table(source_database.tables, source_schema, source_table, :source_table), {:ok, column_filters} <- - parse_column_filters(attrs["filters"], source_database, source_schema, source_table) do - source_table_config = %{ - "schema_name" => source_schema, - "table_name" => source_table, - "oid" => source_table_struct.oid, - "actions" => attrs["actions"] || [:insert, :update, :delete], - "column_filters" => column_filters - } + parse_column_filters(attrs["filters"], source_database, source_schema, source_table), + {:ok, column_selection} <- + parse_column_selection( + attrs["exclude_columns"], + attrs["include_columns"], + source_database, + source_schema, + source_table + ) do + source_table_config = + Map.merge( + %{ + "schema_name" => source_schema, + "table_name" => source_table, + "oid" => source_table_struct.oid, + "actions" => attrs["actions"] || [:insert, :update, :delete], + "column_filters" => column_filters + }, + column_selection + ) params = %{ name: name, @@ -780,6 +792,122 @@ defmodule Sequin.YamlLoader do defp parse_column_filters(_, _database, _schema, _table), do: {:error, Error.bad_request(message: "`filters` must be a list")} + # Helper to parse column selection (exclude_columns or include_columns) for WAL pipeline + defp parse_column_selection(nil, nil, _database, _schema, _table) do + {:ok, %{"exclude_column_attnums" => nil, "include_column_attnums" => nil}} + end + + defp parse_column_selection(exclude_columns, include_columns, database, schema, table_name) do + with {:ok, table} <- fetch_table(database.tables, schema, table_name, :column_selection_table) do + cond do + not is_nil(exclude_columns) and not is_nil(include_columns) -> + {:error, Error.bad_request(message: "Cannot specify both `exclude_columns` and `include_columns`")} + + not is_nil(exclude_columns) -> + exclude_columns + |> parse_column_names_to_attnums_with_table(table, "exclude_columns") + |> case do + {:ok, attnums} -> + case validate_not_excluding_all_columns(attnums, table, "exclude_columns") do + :ok -> {:ok, %{"exclude_column_attnums" => attnums, "include_column_attnums" => nil}} + {:error, error} -> {:error, error} + end + + {:error, error} -> + {:error, error} + end + + not is_nil(include_columns) -> + include_columns + |> parse_column_names_to_attnums_with_table(table, "include_columns") + |> case do + {:ok, attnums} -> + case validate_including_at_least_one_column(attnums, "include_columns") do + :ok -> {:ok, %{"exclude_column_attnums" => nil, "include_column_attnums" => attnums}} + {:error, error} -> {:error, error} + end + + {:error, error} -> + {:error, error} + end + + true -> + {:ok, %{"exclude_column_attnums" => nil, "include_column_attnums" => nil}} + end + end + end + + defp parse_column_names_to_attnums_with_table(column_names, table, field_name) when is_list(column_names) do + column_names + |> Enum.reduce_while({:ok, []}, fn column_name, {:ok, acc} -> + case Enum.find(table.columns, &(&1.name == column_name)) do + nil -> + {:halt, + {:error, + Error.validation( + summary: "Column '#{column_name}' not found in table '#{table.schema}.#{table.name}' for #{field_name}" + )}} + + column -> + {:cont, {:ok, [column.attnum | acc]}} + end + end) + |> case do + {:ok, parsed} -> {:ok, Enum.reverse(parsed)} + {:error, error} -> {:error, error} + end + end + + defp validate_not_excluding_all_columns(attnums, table, field_name) do + total_columns = length(table.columns) + + pk_attnums = + table.columns + |> Enum.filter(& &1.is_pk?) + |> Enum.map(& &1.attnum) + + excluded_pk_attnums = Enum.filter(attnums, &(&1 in pk_attnums)) + + cond do + excluded_pk_attnums != [] -> + excluded_pk_columns = + table.columns + |> Enum.filter(fn col -> col.attnum in excluded_pk_attnums end) + |> Enum.map_join(", ", & &1.name) + + {:error, + Error.bad_request( + message: + "Cannot exclude primary key columns in `#{field_name}`. " <> + "Primary key columns are required for change tracking. " <> + "Found excluded primary key columns: #{excluded_pk_columns}" + )} + + length(attnums) >= total_columns -> + {:error, + Error.bad_request( + message: + "Cannot exclude all columns in `#{field_name}`. " <> + "At least one column must be included in the sync. " <> + "Table has #{total_columns} column(s), but #{length(attnums)} column(s) were excluded." + )} + + true -> + :ok + end + end + + defp validate_including_at_least_one_column(attnums, field_name) do + if attnums == [] do + {:error, + Error.bad_request( + message: "Cannot include zero columns in `#{field_name}`. At least one column must be included in the sync." + )} + else + :ok + end + end + defp fetch_database(databases, name, _role) do case Enum.find(databases, &(&1.name == name)) do nil -> {:error, Error.not_found(entity: :database, params: %{name: name})} diff --git a/lib/sequin_web/live/sink_consumers/show.ex b/lib/sequin_web/live/sink_consumers/show.ex index 1a5c7a96a..8bfa4a47d 100644 --- a/lib/sequin_web/live/sink_consumers/show.ex +++ b/lib/sequin_web/live/sink_consumers/show.ex @@ -1396,21 +1396,22 @@ defmodule SequinWeb.SinkConsumersLive.Show do defp get_table_schema(_, _), do: nil defp consumer_title(%{sink: %{type: :azure_event_hub}}), do: "Azure Event Hub Sink" + defp consumer_title(%{sink: %{type: :benchmark}}), do: "Benchmark Sink" defp consumer_title(%{sink: %{type: :elasticsearch}}), do: "Elasticsearch Sink" defp consumer_title(%{sink: %{type: :gcp_pubsub}}), do: "GCP Pub/Sub Sink" defp consumer_title(%{sink: %{type: :http_push}}), do: "Webhook Sink" defp consumer_title(%{sink: %{type: :kafka}}), do: "Kafka Sink" + defp consumer_title(%{sink: %{type: :kinesis}}), do: "Kinesis Sink" + defp consumer_title(%{sink: %{type: :meilisearch}}), do: "Meilisearch Sink" defp consumer_title(%{sink: %{type: :nats}}), do: "NATS Sink" defp consumer_title(%{sink: %{type: :rabbitmq}}), do: "RabbitMQ Sink" defp consumer_title(%{sink: %{type: :redis_stream}}), do: "Redis Stream Sink" defp consumer_title(%{sink: %{type: :redis_string}}), do: "Redis String Sink" + defp consumer_title(%{sink: %{type: :s2}}), do: "S2 Sink" defp consumer_title(%{sink: %{type: :sequin_stream}}), do: "Sequin Stream Sink" defp consumer_title(%{sink: %{type: :sns}}), do: "SNS Sink" - defp consumer_title(%{sink: %{type: :kinesis}}), do: "Kinesis Sink" - defp consumer_title(%{sink: %{type: :s2}}), do: "S2 Sink" defp consumer_title(%{sink: %{type: :sqs}}), do: "SQS Sink" defp consumer_title(%{sink: %{type: :typesense}}), do: "Typesense Sink" - defp consumer_title(%{sink: %{type: :meilisearch}}), do: "Meilisearch Sink" defp put_health(%SinkConsumer{} = consumer) do with {:ok, health} <- Health.health(consumer), diff --git a/lib/sequin_web/live/wal_pipelines/form.ex b/lib/sequin_web/live/wal_pipelines/form.ex index 05f26d44f..ec635e27f 100644 --- a/lib/sequin_web/live/wal_pipelines/form.ex +++ b/lib/sequin_web/live/wal_pipelines/form.ex @@ -96,8 +96,11 @@ defmodule SequinWeb.WalPipelinesLive.Form do table = Sequin.Enum.find!(database.tables, &(&1.oid == source_table_oid)) toast_title = if socket.assigns.is_edit, do: "WAL Pipeline updated", else: "WAL Pipeline created" + source_table_params = List.first(params["source_tables"]) + with :ok <- Databases.verify_table_in_publication(database, source_table_oid), :ok <- verify_source_not_event_table(table), + :ok <- verify_column_selection(table, source_table_params), {:ok, wal_pipeline} <- create_or_update_wal_pipeline(socket, socket.assigns.wal_pipeline, params) do {:noreply, socket @@ -158,6 +161,56 @@ defmodule SequinWeb.WalPipelinesLive.Form do end end + def verify_column_selection(%PostgresDatabaseTable{} = table, source_table_params) do + exclude_attnums = source_table_params["exclude_column_attnums"] + include_attnums = source_table_params["include_column_attnums"] + total_columns = length(table.columns) + + pk_attnums = + table.columns + |> Enum.filter(& &1.is_pk?) + |> Enum.map(& &1.attnum) + + cond do + is_list(exclude_attnums) and exclude_attnums != [] -> + excluded_pk_attnums = Enum.filter(exclude_attnums, &(&1 in pk_attnums)) + + if excluded_pk_attnums == [] do + if length(exclude_attnums) >= total_columns do + {:error, + Error.invariant( + message: + "Cannot exclude all columns. At least one column must be included in the sync. " <> + "Table has #{total_columns} column(s), but #{length(exclude_attnums)} column(s) were excluded." + )} + else + :ok + end + else + excluded_pk_columns = + table.columns + |> Enum.filter(fn col -> col.attnum in excluded_pk_attnums end) + |> Enum.map_join(", ", & &1.name) + + {:error, + Error.invariant( + message: + "Cannot exclude primary key columns from sync. " <> + "Primary key columns are required for change tracking. " <> + "Found excluded primary key columns: #{excluded_pk_columns}" + )} + end + + is_list(include_attnums) and include_attnums != [] and length(include_attnums) == 0 -> + # This condition is technically unreachable since we check != [] above + {:error, + Error.invariant(message: "Cannot include zero columns. At least one column must be included in the sync.")} + + true -> + :ok + end + end + defp assign_databases(socket) do databases = Databases.list_dbs_for_account(current_account_id(socket), [:replication_slot]) assign(socket, databases: databases) @@ -231,7 +284,11 @@ defmodule SequinWeb.WalPipelinesLive.Form do %{ "oid" => form["tableOid"], "column_filters" => Enum.map(form["sourceTableFilters"], &ColumnFilter.from_external/1), - "actions" => source_table_actions + "actions" => source_table_actions, + "exclude_column_attnums" => + if(form["excludeColumnAttnums"] && form["excludeColumnAttnums"] != [], do: form["excludeColumnAttnums"]), + "include_column_attnums" => + if(form["includeColumnAttnums"] && form["includeColumnAttnums"] != [], do: form["includeColumnAttnums"]) } ] } @@ -267,6 +324,8 @@ defmodule SequinWeb.WalPipelinesLive.Form do "destinationTableOid" => wal_pipeline.destination_oid, "sourceTableActions" => (source_table && source_table.actions) || [:insert, :update, :delete], "sourceTableFilters" => source_table && Enum.map(source_table.column_filters, &ColumnFilter.to_external/1), + "excludeColumnAttnums" => (source_table && source_table.exclude_column_attnums) || [], + "includeColumnAttnums" => (source_table && source_table.include_column_attnums) || [], "retentionDays" => 30 } end diff --git a/lib/sequin_web/live/wal_pipelines/show.ex b/lib/sequin_web/live/wal_pipelines/show.ex index 9eebd48be..e030d1a0c 100644 --- a/lib/sequin_web/live/wal_pipelines/show.ex +++ b/lib/sequin_web/live/wal_pipelines/show.ex @@ -160,6 +160,8 @@ defmodule SequinWeb.WalPipelinesLive.Show do }, source_table: encode_source_table(source_table, source_table_info), source_filters: encode_source_filters(source_table.column_filters), + exclude_columns: encode_column_selection(source_table.exclude_column_attnums, source_table_info), + include_columns: encode_column_selection(source_table.include_column_attnums, source_table_info), destination_table: "#{destination_table_info.schema}.#{destination_table_info.name}", inserted_at: wal_pipeline.inserted_at, updated_at: wal_pipeline.updated_at, @@ -189,6 +191,19 @@ defmodule SequinWeb.WalPipelinesLive.Show do end) end + defp encode_column_selection(nil, _table), do: [] + + defp encode_column_selection(attnums, %PostgresDatabaseTable{} = table) when is_list(attnums) do + attnums + |> Enum.map(fn attnum -> + Enum.find(table.columns, &(&1.attnum == attnum)) + end) + |> Enum.filter(& &1) + |> Enum.map(& &1.name) + end + + defp encode_column_selection(_, _table), do: [] + defp assign_replica_identity(socket) do wal_pipeline = socket.assigns.wal_pipeline [source_table] = wal_pipeline.source_tables diff --git a/priv/repo/migrations/20260203141912_add_column_selection_to_source_tables.exs b/priv/repo/migrations/20260203141912_add_column_selection_to_source_tables.exs new file mode 100644 index 000000000..fe51a0b03 --- /dev/null +++ b/priv/repo/migrations/20260203141912_add_column_selection_to_source_tables.exs @@ -0,0 +1,70 @@ +defmodule Sequin.Repo.Migrations.AddColumnSelectionToSourceTables do + use Ecto.Migration + + @config_schema Application.compile_env(:sequin, [Sequin.Repo, :config_schema_prefix]) + + def change do + # Add column selection fields to WAL pipelines source_tables + execute( + """ + DO $$ + DECLARE + wp RECORD; + source_tables_updated jsonb; + source_table jsonb; + BEGIN + FOR wp IN + SELECT id, source_tables + FROM #{@config_schema}.wal_pipelines + WHERE source_tables IS NOT NULL + LOOP + source_tables_updated := '[]'::jsonb; + + FOR source_table IN + SELECT * FROM jsonb_array_elements(wp.source_tables) + LOOP + source_tables_updated := source_tables_updated || jsonb_build_array( + source_table || jsonb_build_object( + 'include_column_attnums', NULL, + 'exclude_column_attnums', NULL + ) + ); + END LOOP; + + UPDATE #{@config_schema}.wal_pipelines + SET source_tables = source_tables_updated + WHERE id = wp.id; + END LOOP; + END $$; + """, + """ + DO $$ + DECLARE + wp RECORD; + source_tables_updated jsonb; + source_table jsonb; + BEGIN + FOR wp IN + SELECT id, source_tables + FROM #{@config_schema}.wal_pipelines + WHERE source_tables IS NOT NULL + LOOP + source_tables_updated := '[]'::jsonb; + + FOR source_table IN + SELECT * FROM jsonb_array_elements(wp.source_tables) + LOOP + source_tables_updated := source_tables_updated || jsonb_build_array( + source_table - 'include_column_attnums' - 'exclude_column_attnums' + ); + END LOOP; + + UPDATE #{@config_schema}.wal_pipelines + SET source_tables = source_tables_updated + WHERE id = wp.id; + END LOOP; + END $$; + """ + ) + end +end diff --git a/test/sequin/consumers/column_filtering_test.exs b/test/sequin/consumers/column_filtering_test.exs new file mode 100644 index 000000000..c98b6a8a5 --- /dev/null +++ b/test/sequin/consumers/column_filtering_test.exs @@ -0,0 +1,174 @@ +defmodule Sequin.Consumers.ColumnFilteringTest do + use Sequin.DataCase, async: true + + alias Sequin.Consumers + alias Sequin.Runtime.SlotProcessor.Message + alias Sequin.Runtime.SlotProcessor.Message.Field + alias Sequin.WalPipeline.SourceTable + + describe "message_record/2 with column filtering" do + test "returns all columns when source_table is nil" do + message = %Message{ + action: :insert, + table_oid: 123, + fields: [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"}, + %Field{column_name: "email", column_attnum: 3, value: "alice@example.com"}, + %Field{column_name: "password", column_attnum: 4, value: "secret"} + ] + } + + record = Consumers.message_record(message, nil) + + assert map_size(record) == 4 + assert record["id"] == 1 + assert record["name"] == "Alice" + assert record["email"] == "alice@example.com" + assert record["password"] == "secret" + end + + test "returns only included columns when include_column_attnums is set" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [1, 2, 3] + } + + message = %Message{ + action: :insert, + table_oid: 123, + fields: [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"}, + %Field{column_name: "email", column_attnum: 3, value: "alice@example.com"}, + %Field{column_name: "password", column_attnum: 4, value: "secret"}, + %Field{column_name: "ssn", column_attnum: 5, value: "123-45-6789"} + ] + } + + record = Consumers.message_record(message, source_table) + + assert map_size(record) == 3 + assert record["id"] == 1 + assert record["name"] == "Alice" + assert record["email"] == "alice@example.com" + refute Map.has_key?(record, "password") + refute Map.has_key?(record, "ssn") + end + + test "excludes specified columns when exclude_column_attnums is set" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + exclude_column_attnums: [4, 5] + } + + message = %Message{ + action: :insert, + table_oid: 123, + fields: [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"}, + %Field{column_name: "email", column_attnum: 3, value: "alice@example.com"}, + %Field{column_name: "password", column_attnum: 4, value: "secret"}, + %Field{column_name: "ssn", column_attnum: 5, value: "123-45-6789"} + ] + } + + record = Consumers.message_record(message, source_table) + + assert map_size(record) == 3 + assert record["id"] == 1 + assert record["name"] == "Alice" + assert record["email"] == "alice@example.com" + refute Map.has_key?(record, "password") + refute Map.has_key?(record, "ssn") + end + end + + describe "message_changes/2 with column filtering" do + test "returns nil for insert action" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + exclude_column_attnums: [4] + } + + message = %Message{ + action: :insert, + table_oid: 123, + fields: [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "password", column_attnum: 4, value: "secret"} + ] + } + + assert Consumers.message_changes(message, source_table) == nil + end + + test "filters changes for update action" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + exclude_column_attnums: [4] + } + + message = %Message{ + action: :update, + table_oid: 123, + old_fields: [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"}, + %Field{column_name: "email", column_attnum: 3, value: "alice@old.com"}, + %Field{column_name: "password", column_attnum: 4, value: "old_secret"} + ], + fields: [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"}, + %Field{column_name: "email", column_attnum: 3, value: "alice@new.com"}, + %Field{column_name: "password", column_attnum: 4, value: "new_secret"} + ] + } + + changes = Consumers.message_changes(message, source_table) + + # Only email should be in changes (password is excluded) + assert map_size(changes) == 1 + assert changes["email"] == "alice@old.com" + refute Map.has_key?(changes, "password") + end + + test "filters changes with include_column_attnums" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [1, 2, 3] + } + + message = %Message{ + action: :update, + table_oid: 123, + old_fields: [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"}, + %Field{column_name: "email", column_attnum: 3, value: "alice@old.com"}, + %Field{column_name: "password", column_attnum: 4, value: "old_secret"} + ], + fields: [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice Updated"}, + %Field{column_name: "email", column_attnum: 3, value: "alice@new.com"}, + %Field{column_name: "password", column_attnum: 4, value: "new_secret"} + ] + } + + changes = Consumers.message_changes(message, source_table) + + assert map_size(changes) == 2 + assert changes["name"] == "Alice" + assert changes["email"] == "alice@old.com" + refute Map.has_key?(changes, "password") + end + end +end diff --git a/test/sequin/replication/source_table/column_selection_test.exs b/test/sequin/replication/source_table/column_selection_test.exs new file mode 100644 index 000000000..c3546194e --- /dev/null +++ b/test/sequin/replication/source_table/column_selection_test.exs @@ -0,0 +1,220 @@ +defmodule Sequin.WalPipeline.SourceTable.ColumnSelectionTest do + use Sequin.DataCase, async: true + + alias Sequin.Error.InvariantError + alias Sequin.Runtime.SlotProcessor.Message.Field + alias Sequin.WalPipeline.SourceTable + alias Sequin.WalPipeline.SourceTable.ColumnSelection + + describe "filter_fields/2" do + test "returns all fields when no column selection is configured" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: nil, + exclude_column_attnums: nil + } + + fields = [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"}, + %Field{column_name: "email", column_attnum: 3, value: "alice@example.com"}, + %Field{column_name: "password", column_attnum: 4, value: "secret"} + ] + + filtered = ColumnSelection.filter_fields(fields, source_table) + assert length(filtered) == 4 + assert filtered == fields + end + + test "returns all fields when column selection arrays are empty" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [], + exclude_column_attnums: [] + } + + fields = [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"} + ] + + filtered = ColumnSelection.filter_fields(fields, source_table) + assert length(filtered) == 2 + assert filtered == fields + end + + test "filters fields when include_column_attnums is specified" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [1, 2, 3], + exclude_column_attnums: nil + } + + fields = [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"}, + %Field{column_name: "email", column_attnum: 3, value: "alice@example.com"}, + %Field{column_name: "password", column_attnum: 4, value: "secret"}, + %Field{column_name: "ssn", column_attnum: 5, value: "123-45-6789"} + ] + + filtered = ColumnSelection.filter_fields(fields, source_table) + assert length(filtered) == 3 + assert Enum.map(filtered, & &1.column_name) == ["id", "name", "email"] + end + + test "filters fields when exclude_column_attnums is specified" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: nil, + exclude_column_attnums: [4, 5] + } + + fields = [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"}, + %Field{column_name: "email", column_attnum: 3, value: "alice@example.com"}, + %Field{column_name: "password", column_attnum: 4, value: "secret"}, + %Field{column_name: "ssn", column_attnum: 5, value: "123-45-6789"} + ] + + filtered = ColumnSelection.filter_fields(fields, source_table) + assert length(filtered) == 3 + assert Enum.map(filtered, & &1.column_name) == ["id", "name", "email"] + end + end + + describe "filter_column_attnums/2" do + test "returns all attnums when source_table is nil" do + attnums = [1, 2, 3, 4, 5] + filtered = ColumnSelection.filter_column_attnums(attnums, nil) + assert filtered == attnums + end + + test "returns all attnums when no column selection is configured" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: nil, + exclude_column_attnums: nil + } + + attnums = [1, 2, 3, 4, 5] + filtered = ColumnSelection.filter_column_attnums(attnums, source_table) + assert filtered == attnums + end + + test "filters attnums with include_column_attnums" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [1, 2, 3], + exclude_column_attnums: nil + } + + attnums = [1, 2, 3, 4, 5] + filtered = ColumnSelection.filter_column_attnums(attnums, source_table) + assert filtered == [1, 2, 3] + end + + test "filters attnums with exclude_column_attnums" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: nil, + exclude_column_attnums: [4, 5] + } + + attnums = [1, 2, 3, 4, 5] + filtered = ColumnSelection.filter_column_attnums(attnums, source_table) + assert filtered == [1, 2, 3] + end + end + + describe "filter_fields/2 with both include and exclude set" do + test "raises an error when both include_column_attnums and exclude_column_attnums are set" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [1, 2, 3], + exclude_column_attnums: [4, 5] + } + + fields = [ + %Field{column_name: "id", column_attnum: 1, value: 1}, + %Field{column_name: "name", column_attnum: 2, value: "Alice"} + ] + + assert_raise InvariantError, fn -> + ColumnSelection.filter_fields(fields, source_table) + end + end + end + + describe "filter_column_attnums/2 with both include and exclude set" do + test "raises an error when both include_column_attnums and exclude_column_attnums are set" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [1, 2, 3], + exclude_column_attnums: [4, 5] + } + + attnums = [1, 2, 3, 4, 5] + + assert_raise InvariantError, fn -> + ColumnSelection.filter_column_attnums(attnums, source_table) + end + end + end + + describe "has_column_selection?/1" do + test "returns false when no column selection is configured" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: nil, + exclude_column_attnums: nil + } + + refute ColumnSelection.has_column_selection?(source_table) + end + + test "returns false when column selection arrays are empty" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [], + exclude_column_attnums: [] + } + + refute ColumnSelection.has_column_selection?(source_table) + end + + test "returns true when include_column_attnums is specified" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [1, 2, 3], + exclude_column_attnums: nil + } + + assert ColumnSelection.has_column_selection?(source_table) + end + + test "returns true when exclude_column_attnums is specified" do + source_table = %SourceTable{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: nil, + exclude_column_attnums: [4, 5] + } + + assert ColumnSelection.has_column_selection?(source_table) + end + end +end diff --git a/test/sequin/replication/source_table/source_table_test.exs b/test/sequin/replication/source_table/source_table_test.exs new file mode 100644 index 000000000..e8a228da5 --- /dev/null +++ b/test/sequin/replication/source_table/source_table_test.exs @@ -0,0 +1,97 @@ +defmodule Sequin.WalPipeline.SourceTableTest do + use Sequin.DataCase, async: true + + alias Sequin.WalPipeline.SourceTable + + describe "changeset/2" do + test "accepts valid source table without column selection" do + attrs = %{ + oid: 123, + actions: [:insert, :update, :delete] + } + + changeset = SourceTable.changeset(%SourceTable{}, attrs) + assert changeset.valid? + end + + test "accepts valid source table with include_column_attnums" do + attrs = %{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [1, 2, 3] + } + + changeset = SourceTable.changeset(%SourceTable{}, attrs) + assert changeset.valid? + end + + test "accepts valid source table with exclude_column_attnums" do + attrs = %{ + oid: 123, + actions: [:insert, :update, :delete], + exclude_column_attnums: [4, 5] + } + + changeset = SourceTable.changeset(%SourceTable{}, attrs) + assert changeset.valid? + end + + test "accepts empty arrays for column selection" do + attrs = %{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [], + exclude_column_attnums: [] + } + + changeset = SourceTable.changeset(%SourceTable{}, attrs) + assert changeset.valid? + end + + test "rejects when both include_column_attnums and exclude_column_attnums are set" do + attrs = %{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [1, 2, 3], + exclude_column_attnums: [4, 5] + } + + changeset = SourceTable.changeset(%SourceTable{}, attrs) + refute changeset.valid? + assert "cannot be set when include_column_attnums is set" in errors_on(changeset).exclude_column_attnums + end + + test "accepts when one list is empty and the other is not" do + attrs = %{ + oid: 123, + actions: [:insert, :update, :delete], + include_column_attnums: [1, 2, 3], + exclude_column_attnums: [] + } + + changeset = SourceTable.changeset(%SourceTable{}, attrs) + assert changeset.valid? + end + end + + describe "JSON encoding" do + test "includes column selection fields in JSON output" do + source_table = %SourceTable{ + oid: 123, + schema_name: "public", + table_name: "users", + actions: [:insert, :update], + include_column_attnums: [1, 2, 3], + exclude_column_attnums: nil, + column_filters: [] + } + + json = Jason.encode!(source_table) + decoded = Jason.decode!(json) + + assert decoded["oid"] == 123 + assert decoded["include_column_attnums"] == [1, 2, 3] + assert decoded["exclude_column_attnums"] == nil + end + end +end diff --git a/test/sequin/table_reader_server_test.exs b/test/sequin/table_reader_server_test.exs index bc922d05c..aac3b9076 100644 --- a/test/sequin/table_reader_server_test.exs +++ b/test/sequin/table_reader_server_test.exs @@ -171,7 +171,8 @@ defmodule Sequin.Runtime.TableReaderServerTest do assert messages |> Enum.frequencies_by(& &1.commit_lsn) |> Map.values() == [3, 2] # Verify that the records match the last 5 inserted characters - messages = Enum.sort_by(messages, & &1.record_pks) + # Sort by updated_at to match character order (messages are collected in reverse batch order) + messages = Enum.sort_by(messages, fn msg -> msg.data.record["updated_at"] end, NaiveDateTime) for {message, character} <- Enum.zip(messages, Enum.drop(characters, 3)) do assert message.table_oid == table_oid diff --git a/test/sequin/yaml_loader_test.exs b/test/sequin/yaml_loader_test.exs index 63ded45aa..01c22938a 100644 --- a/test/sequin/yaml_loader_test.exs +++ b/test/sequin/yaml_loader_test.exs @@ -2615,4 +2615,48 @@ defmodule Sequin.YamlLoaderTest do end end end + + describe "change_retentions column selection" do + test "errors when trying to exclude primary key columns" do + assert_raise RuntimeError, ~r/Cannot exclude primary key columns/, fn -> + YamlLoader.apply_from_yml!(""" + #{account_and_db_yml()} + + change_retentions: + - name: "test-pipeline" + source_database: "test-db" + source_table_schema: "public" + source_table_name: "Characters" + destination_database: "test-db" + destination_table_schema: "public" + destination_table_name: "sequin_events" + exclude_columns: + - id + """) + end + end + + test "allows excluding non-primary key columns" do + assert :ok = + YamlLoader.apply_from_yml!(""" + #{account_and_db_yml()} + + change_retentions: + - name: "test-pipeline" + source_database: "test-db" + source_table_schema: "public" + source_table_name: "Characters" + destination_database: "test-db" + destination_table_schema: "public" + destination_table_name: "sequin_events" + exclude_columns: + - name + """) + + assert [wal_pipeline] = Repo.all(WalPipeline) + assert [source_table] = wal_pipeline.source_tables + assert is_list(source_table.exclude_column_attnums) + assert length(source_table.exclude_column_attnums) == 1 + end + end end diff --git a/test/sequin_web/live/wal_pipelines/form_test.exs b/test/sequin_web/live/wal_pipelines/form_test.exs new file mode 100644 index 000000000..86cd35689 --- /dev/null +++ b/test/sequin_web/live/wal_pipelines/form_test.exs @@ -0,0 +1,97 @@ +defmodule SequinWeb.WalPipelinesLive.FormTest do + use SequinWeb.ConnCase, async: true + + alias Sequin.Databases.PostgresDatabaseTable + alias Sequin.Error.InvariantError + alias SequinWeb.WalPipelinesLive.Form + + describe "verify_column_selection/2" do + test "returns :ok when no column selection is configured" do + table = %PostgresDatabaseTable{ + columns: [ + %PostgresDatabaseTable.Column{attnum: 1, name: "id", is_pk?: true, type: "integer"}, + %PostgresDatabaseTable.Column{attnum: 2, name: "name", is_pk?: false, type: "text"} + ] + } + + source_table_params = %{ + "exclude_column_attnums" => nil, + "include_column_attnums" => nil + } + + assert Form.verify_column_selection(table, source_table_params) == :ok + end + + test "returns error when trying to exclude primary key columns" do + table = %PostgresDatabaseTable{ + columns: [ + %PostgresDatabaseTable.Column{attnum: 1, name: "id", is_pk?: true, type: "integer"}, + %PostgresDatabaseTable.Column{attnum: 2, name: "name", is_pk?: false, type: "text"}, + %PostgresDatabaseTable.Column{attnum: 3, name: "email", is_pk?: false, type: "text"} + ] + } + + source_table_params = %{ + "exclude_column_attnums" => [1, 3], + "include_column_attnums" => nil + } + + assert {:error, %InvariantError{} = error} = Form.verify_column_selection(table, source_table_params) + assert error.message =~ "Cannot exclude primary key columns" + assert error.message =~ "id" + end + + test "returns error when trying to exclude all columns" do + table = %PostgresDatabaseTable{ + columns: [ + %PostgresDatabaseTable.Column{attnum: 1, name: "id", is_pk?: true, type: "integer"}, + %PostgresDatabaseTable.Column{attnum: 2, name: "name", is_pk?: false, type: "text"} + ] + } + + source_table_params = %{ + "exclude_column_attnums" => [2], + "include_column_attnums" => nil + } + + # This should pass because we're only excluding non-PK columns + assert Form.verify_column_selection(table, source_table_params) == :ok + end + + test "returns :ok when excluding non-primary key columns" do + table = %PostgresDatabaseTable{ + columns: [ + %PostgresDatabaseTable.Column{attnum: 1, name: "id", is_pk?: true, type: "integer"}, + %PostgresDatabaseTable.Column{attnum: 2, name: "name", is_pk?: false, type: "text"}, + %PostgresDatabaseTable.Column{attnum: 3, name: "password", is_pk?: false, type: "text"} + ] + } + + source_table_params = %{ + "exclude_column_attnums" => [3], + "include_column_attnums" => nil + } + + assert Form.verify_column_selection(table, source_table_params) == :ok + end + + test "returns error when including zero columns" do + table = %PostgresDatabaseTable{ + columns: [ + %PostgresDatabaseTable.Column{attnum: 1, name: "id", is_pk?: true, type: "integer"}, + %PostgresDatabaseTable.Column{attnum: 2, name: "name", is_pk?: false, type: "text"} + ] + } + + source_table_params = %{ + "exclude_column_attnums" => nil, + # Non-existent column + "include_column_attnums" => [999] + } + + # The validation doesn't check for non-existent columns, it only checks for empty lists + # Empty list means "not set" in our validation logic + assert Form.verify_column_selection(table, source_table_params) == :ok + end + end +end