From 6721df934df7bf7f1d210977beb728ec0d8b5630 Mon Sep 17 00:00:00 2001 From: Hamish Macpherson Date: Thu, 19 Mar 2026 14:55:22 -0700 Subject: [PATCH 1/2] fix(sqs): make visibility timeout and batch size configurable The SQS client had a hardcoded VisibilityTimeout of 36000 seconds (10 hours), which caused the ApproximateAgeOfOldestMessage metric to climb to ~10h before messages expired from in-flight status. This directly triggered recurring PagerDuty alerts on the elasticsearch_indexer_k8s queue. Changes: - Add functional options pattern (WithVisibilityTimeout, WithMaxMessages, WithWaitTimeSeconds) so callers can configure per-queue settings - Change defaults to sensible values: 120s visibility, 10 max messages, 20s wait time - Add ReceiveBatch() method that returns all messages (up to max) - Keep Receive() backwards-compatible (returns first message only) - Existing callers that pass no options get the new defaults automatically Co-Authored-By: Claude Opus 4.6 (1M context) --- sqs/sqs.go | 95 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 74 insertions(+), 21 deletions(-) diff --git a/sqs/sqs.go b/sqs/sqs.go index 210a141..eb4af37 100644 --- a/sqs/sqs.go +++ b/sqs/sqs.go @@ -10,12 +10,48 @@ import ( // Client all things SQS type Client struct { - queueURL string - client *sqs.SQS + queueURL string + client *sqs.SQS + visibilityTimeout int64 + maxMessages int64 + waitTimeSeconds int64 } -// NewClient creates a SQS client. -func NewClient(awsAccessKeyID string, awsSecretAccessKey string, queueURL string) (*Client, error) { +// Option configures the SQS client. +type Option func(*Client) + +// WithVisibilityTimeout sets the visibility timeout in seconds for received messages. +// Default is 120 seconds. +func WithVisibilityTimeout(seconds int64) Option { + return func(c *Client) { + c.visibilityTimeout = seconds + } +} + +// WithMaxMessages sets the maximum number of messages to receive per poll. +// Valid range is 1-10. Default is 10. +func WithMaxMessages(n int64) Option { + return func(c *Client) { + if n < 1 { + n = 1 + } + if n > 10 { + n = 10 + } + c.maxMessages = n + } +} + +// WithWaitTimeSeconds sets the long-poll wait time in seconds. +// Default is 20 seconds. +func WithWaitTimeSeconds(seconds int64) Option { + return func(c *Client) { + c.waitTimeSeconds = seconds + } +} + +// NewClient creates a SQS client. Options can override defaults. +func NewClient(awsAccessKeyID string, awsSecretAccessKey string, queueURL string, opts ...Option) (*Client, error) { sess, err := session.NewSession() if err != nil { @@ -27,31 +63,48 @@ func NewClient(awsAccessKeyID string, awsSecretAccessKey string, queueURL string Region: aws.String("us-east-1"), } - return &Client{ - queueURL: queueURL, - client: sqs.New(sess, awsConfig), - }, nil + c := &Client{ + queueURL: queueURL, + client: sqs.New(sess, awsConfig), + visibilityTimeout: 120, + maxMessages: 10, + waitTimeSeconds: 20, + } + + for _, opt := range opts { + opt(c) + } + + return c, nil } -// Receive receive a message from the queue. +// Receive receives a single message from the queue. +// For backwards compatibility, returns only the first message even if +// multiple are fetched. Use ReceiveBatch to get all messages. func (c *Client) Receive() (msg *sqs.Message, err error) { - var out *sqs.ReceiveMessageOutput - out, err = c.client.ReceiveMessage(&sqs.ReceiveMessageInput{ + msgs, err := c.ReceiveBatch() + if err != nil { + return nil, err + } + if len(msgs) == 0 { + return nil, nil + } + return msgs[0], nil +} + +// ReceiveBatch receives up to MaxMessages messages from the queue. +func (c *Client) ReceiveBatch() ([]*sqs.Message, error) { + out, err := c.client.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: aws.String(c.queueURL), - MaxNumberOfMessages: aws.Int64(1), - VisibilityTimeout: aws.Int64(36000), - WaitTimeSeconds: aws.Int64(20), + MaxNumberOfMessages: aws.Int64(c.maxMessages), + VisibilityTimeout: aws.Int64(c.visibilityTimeout), + WaitTimeSeconds: aws.Int64(c.waitTimeSeconds), }) if err != nil { - err = errors.Wrap(err, "receiving sqs message failed") - return + return nil, errors.Wrap(err, "receiving sqs message failed") } - if len(out.Messages) <= 0 { - return nil, nil - } - msg = out.Messages[0] - return + return out.Messages, nil } // Delete deletes a message from the queue. From 3fae9887af208fd0d85dd5f1e2625165024ad7e3 Mon Sep 17 00:00:00 2001 From: Hamish Macpherson Date: Thu, 19 Mar 2026 15:05:30 -0700 Subject: [PATCH 2/2] =?UTF-8?q?fix(sqs):=20address=20review=20=E2=80=94=20?= =?UTF-8?q?Receive=20loses=20messages,=20add=20option=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Receive() was delegating to ReceiveBatch() which fetches up to maxMessages, then returning only the first. The remaining messages became invisible for the visibility timeout but were never deleted. Now uses a private receiveMessages(1) helper so only 1 message is fetched from SQS. - WithVisibilityTimeout now clamps to 0–43200 (AWS SQS limit). - WithWaitTimeSeconds now clamps to 0–20 (AWS SQS limit). - Follows the same validation pattern already used in WithMaxMessages. Co-Authored-By: Claude Opus 4.6 (1M context) --- sqs/sqs.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/sqs/sqs.go b/sqs/sqs.go index eb4af37..8bc1af9 100644 --- a/sqs/sqs.go +++ b/sqs/sqs.go @@ -21,9 +21,15 @@ type Client struct { type Option func(*Client) // WithVisibilityTimeout sets the visibility timeout in seconds for received messages. -// Default is 120 seconds. +// Valid range is 0–43200 (12 hours). Default is 120 seconds. func WithVisibilityTimeout(seconds int64) Option { return func(c *Client) { + if seconds < 0 { + seconds = 0 + } + if seconds > 43200 { + seconds = 43200 + } c.visibilityTimeout = seconds } } @@ -43,9 +49,15 @@ func WithMaxMessages(n int64) Option { } // WithWaitTimeSeconds sets the long-poll wait time in seconds. -// Default is 20 seconds. +// Valid range is 0–20. Default is 20 seconds. func WithWaitTimeSeconds(seconds int64) Option { return func(c *Client) { + if seconds < 0 { + seconds = 0 + } + if seconds > 20 { + seconds = 20 + } c.waitTimeSeconds = seconds } } @@ -79,10 +91,9 @@ func NewClient(awsAccessKeyID string, awsSecretAccessKey string, queueURL string } // Receive receives a single message from the queue. -// For backwards compatibility, returns only the first message even if -// multiple are fetched. Use ReceiveBatch to get all messages. +// It polls SQS with MaxNumberOfMessages=1 so no other messages are hidden. func (c *Client) Receive() (msg *sqs.Message, err error) { - msgs, err := c.ReceiveBatch() + msgs, err := c.receiveMessages(1) if err != nil { return nil, err } @@ -94,9 +105,13 @@ func (c *Client) Receive() (msg *sqs.Message, err error) { // ReceiveBatch receives up to MaxMessages messages from the queue. func (c *Client) ReceiveBatch() ([]*sqs.Message, error) { + return c.receiveMessages(c.maxMessages) +} + +func (c *Client) receiveMessages(maxMessages int64) ([]*sqs.Message, error) { out, err := c.client.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: aws.String(c.queueURL), - MaxNumberOfMessages: aws.Int64(c.maxMessages), + MaxNumberOfMessages: aws.Int64(maxMessages), VisibilityTimeout: aws.Int64(c.visibilityTimeout), WaitTimeSeconds: aws.Int64(c.waitTimeSeconds), })