diff --git a/sqs/sqs.go b/sqs/sqs.go index 210a141..8bc1af9 100644 --- a/sqs/sqs.go +++ b/sqs/sqs.go @@ -10,12 +10,60 @@ import ( // Client all things SQS type Client struct { - queueURL string - client *sqs.SQS + queueURL string + client *sqs.SQS + visibilityTimeout int64 + maxMessages int64 + waitTimeSeconds int64 } -// NewClient creates a SQS client. -func NewClient(awsAccessKeyID string, awsSecretAccessKey string, queueURL string) (*Client, error) { +// Option configures the SQS client. +type Option func(*Client) + +// WithVisibilityTimeout sets the visibility timeout in seconds for received messages. +// Valid range is 0–43200 (12 hours). Default is 120 seconds. +func WithVisibilityTimeout(seconds int64) Option { + return func(c *Client) { + if seconds < 0 { + seconds = 0 + } + if seconds > 43200 { + seconds = 43200 + } + c.visibilityTimeout = seconds + } +} + +// WithMaxMessages sets the maximum number of messages to receive per poll. +// Valid range is 1-10. Default is 10. +func WithMaxMessages(n int64) Option { + return func(c *Client) { + if n < 1 { + n = 1 + } + if n > 10 { + n = 10 + } + c.maxMessages = n + } +} + +// WithWaitTimeSeconds sets the long-poll wait time in seconds. +// Valid range is 0–20. Default is 20 seconds. +func WithWaitTimeSeconds(seconds int64) Option { + return func(c *Client) { + if seconds < 0 { + seconds = 0 + } + if seconds > 20 { + seconds = 20 + } + c.waitTimeSeconds = seconds + } +} + +// NewClient creates a SQS client. Options can override defaults. +func NewClient(awsAccessKeyID string, awsSecretAccessKey string, queueURL string, opts ...Option) (*Client, error) { sess, err := session.NewSession() if err != nil { @@ -27,31 +75,51 @@ func NewClient(awsAccessKeyID string, awsSecretAccessKey string, queueURL string Region: aws.String("us-east-1"), } - return &Client{ - queueURL: queueURL, - client: sqs.New(sess, awsConfig), - }, nil + c := &Client{ + queueURL: queueURL, + client: sqs.New(sess, awsConfig), + visibilityTimeout: 120, + maxMessages: 10, + waitTimeSeconds: 20, + } + + for _, opt := range opts { + opt(c) + } + + return c, nil } -// Receive receive a message from the queue. +// Receive receives a single message from the queue. +// It polls SQS with MaxNumberOfMessages=1 so no other messages are hidden. func (c *Client) Receive() (msg *sqs.Message, err error) { - var out *sqs.ReceiveMessageOutput - out, err = c.client.ReceiveMessage(&sqs.ReceiveMessageInput{ + msgs, err := c.receiveMessages(1) + if err != nil { + return nil, err + } + if len(msgs) == 0 { + return nil, nil + } + return msgs[0], nil +} + +// ReceiveBatch receives up to MaxMessages messages from the queue. +func (c *Client) ReceiveBatch() ([]*sqs.Message, error) { + return c.receiveMessages(c.maxMessages) +} + +func (c *Client) receiveMessages(maxMessages int64) ([]*sqs.Message, error) { + out, err := c.client.ReceiveMessage(&sqs.ReceiveMessageInput{ QueueUrl: aws.String(c.queueURL), - MaxNumberOfMessages: aws.Int64(1), - VisibilityTimeout: aws.Int64(36000), - WaitTimeSeconds: aws.Int64(20), + MaxNumberOfMessages: aws.Int64(maxMessages), + VisibilityTimeout: aws.Int64(c.visibilityTimeout), + WaitTimeSeconds: aws.Int64(c.waitTimeSeconds), }) if err != nil { - err = errors.Wrap(err, "receiving sqs message failed") - return + return nil, errors.Wrap(err, "receiving sqs message failed") } - if len(out.Messages) <= 0 { - return nil, nil - } - msg = out.Messages[0] - return + return out.Messages, nil } // Delete deletes a message from the queue.