Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/sequin/consumers/sink_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ defmodule Sequin.Consumers.SinkConsumer do
|> cast_embed(:source, required: true)
|> cast_embed(:source_tables)
|> put_defaults()
|> validate_type()
|> validate_message_grouping()
|> validate_enrichment()
|> validate_required([:name, :status, :replication_slot_id, :batch_size])
Expand All @@ -226,6 +227,16 @@ defmodule Sequin.Consumers.SinkConsumer do
|> Sequin.Changeset.annotations_check_constraint()
end

defp validate_type(changeset) do
sink = get_field(changeset, :sink)

if sink && sink.type == :benchmark && Application.get_env(:sequin, :env) == :prod do
add_error(changeset, :type, "invalid type: #{inspect(sink.type)}")
else
changeset
end
end

defp validate_message_grouping(changeset) do
message_grouping = get_field(changeset, :message_grouping)
source_tables = get_field(changeset, :source_tables) || []
Expand Down
25 changes: 23 additions & 2 deletions lib/sequin/runtime/sqs_pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,20 @@ defmodule Sequin.Runtime.SqsPipeline do
def handle_batch(:default, messages, %{batch_key: queue_url}, context) do
%{
consumer: %SinkConsumer{} = consumer,
sqs_client: sqs_client,
test_pid: test_pid
} = context

setup_allowances(test_pid)

# Credentials may have expired if we are using task role
context = maybe_refresh_client(context)

sqs_messages =
Enum.map(messages, fn %{data: data} ->
build_sqs_message(consumer, data)
end)

case SQS.send_messages(sqs_client, queue_url, sqs_messages) do
case SQS.send_messages(context.sqs_client, queue_url, sqs_messages) do
:ok ->
{:ok, messages, context}

Expand Down Expand Up @@ -94,6 +96,25 @@ defmodule Sequin.Runtime.SqsPipeline do
end
end

defp maybe_refresh_client(%{consumer: %SinkConsumer{} = consumer} = context) do
# Only refresh for task role credentials, as they expire
# Explicit credentials keep using the same client
if consumer.sink.use_task_role do
case SqsSink.aws_client(consumer.sink) do
{:ok, fresh_client} ->
Map.put(context, :sqs_client, fresh_client)

{:error, reason} ->
# Log but continue (may be transient)
Logger.warning("Failed to refresh AWS client for task role: #{inspect(reason)}")
context
end
else
# Not using task roles, no refresh needed
context
end
end

defp setup_allowances(nil), do: :ok

defp setup_allowances(test_pid) do
Expand Down
1 change: 1 addition & 0 deletions lib/sequin_web/live/sink_consumers/show.ex
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,7 @@ defmodule SequinWeb.SinkConsumersLive.Show do
defp consumer_title(%{sink: %{type: :sqs}}), do: "SQS Sink"
defp consumer_title(%{sink: %{type: :typesense}}), do: "Typesense Sink"
defp consumer_title(%{sink: %{type: :meilisearch}}), do: "Meilisearch Sink"
defp consumer_title(%{sink: %{type: :benchmark}}), do: "Benchmark Sink"

defp put_health(%SinkConsumer{} = consumer) do
with {:ok, health} <- Health.health(consumer),
Expand Down
Loading