Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 89 additions & 21 deletions sqs/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,60 @@ import (

// Client all things SQS
type Client struct {
queueURL string
client *sqs.SQS
queueURL string
client *sqs.SQS
visibilityTimeout int64
maxMessages int64
waitTimeSeconds int64
}

// NewClient creates a SQS client.
func NewClient(awsAccessKeyID string, awsSecretAccessKey string, queueURL string) (*Client, error) {
// Option configures the SQS client.
type Option func(*Client)

// WithVisibilityTimeout sets the visibility timeout in seconds for received messages.
// Valid range is 0–43200 (12 hours). Default is 120 seconds.
func WithVisibilityTimeout(seconds int64) Option {
return func(c *Client) {
if seconds < 0 {
seconds = 0
}
if seconds > 43200 {
seconds = 43200
}
c.visibilityTimeout = seconds
}
}

// WithMaxMessages sets the maximum number of messages to receive per poll.
// Valid range is 1-10. Default is 10.
func WithMaxMessages(n int64) Option {
return func(c *Client) {
if n < 1 {
n = 1
}
if n > 10 {
n = 10
}
c.maxMessages = n
}
}

// WithWaitTimeSeconds sets the long-poll wait time in seconds.
// Valid range is 0–20. Default is 20 seconds.
func WithWaitTimeSeconds(seconds int64) Option {
return func(c *Client) {
if seconds < 0 {
seconds = 0
}
if seconds > 20 {
seconds = 20
}
c.waitTimeSeconds = seconds
}
}

// NewClient creates a SQS client. Options can override defaults.
func NewClient(awsAccessKeyID string, awsSecretAccessKey string, queueURL string, opts ...Option) (*Client, error) {

sess, err := session.NewSession()
if err != nil {
Expand All @@ -27,31 +75,51 @@ func NewClient(awsAccessKeyID string, awsSecretAccessKey string, queueURL string
Region: aws.String("us-east-1"),
}

return &Client{
queueURL: queueURL,
client: sqs.New(sess, awsConfig),
}, nil
c := &Client{
queueURL: queueURL,
client: sqs.New(sess, awsConfig),
visibilityTimeout: 120,
maxMessages: 10,
waitTimeSeconds: 20,
}

for _, opt := range opts {
opt(c)
}

return c, nil
}

// Receive receive a message from the queue.
// Receive receives a single message from the queue.
// It polls SQS with MaxNumberOfMessages=1 so no other messages are hidden.
func (c *Client) Receive() (msg *sqs.Message, err error) {
var out *sqs.ReceiveMessageOutput
out, err = c.client.ReceiveMessage(&sqs.ReceiveMessageInput{
msgs, err := c.receiveMessages(1)
if err != nil {
return nil, err
}
if len(msgs) == 0 {
return nil, nil
}
return msgs[0], nil
}

// ReceiveBatch receives up to MaxMessages messages from the queue.
func (c *Client) ReceiveBatch() ([]*sqs.Message, error) {
return c.receiveMessages(c.maxMessages)
}

func (c *Client) receiveMessages(maxMessages int64) ([]*sqs.Message, error) {
out, err := c.client.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: aws.String(c.queueURL),
MaxNumberOfMessages: aws.Int64(1),
VisibilityTimeout: aws.Int64(36000),
WaitTimeSeconds: aws.Int64(20),
MaxNumberOfMessages: aws.Int64(maxMessages),
VisibilityTimeout: aws.Int64(c.visibilityTimeout),
WaitTimeSeconds: aws.Int64(c.waitTimeSeconds),
})
if err != nil {
err = errors.Wrap(err, "receiving sqs message failed")
return
return nil, errors.Wrap(err, "receiving sqs message failed")
}

if len(out.Messages) <= 0 {
return nil, nil
}
msg = out.Messages[0]
return
return out.Messages, nil
}

// Delete deletes a message from the queue.
Expand Down