Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions events/dynamodb_helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.

package events

import (
"encoding/json"
"fmt"
)

// UnmarshalStreamImage unmarshals a stream image (a map of DynamoDBAttributeValue
// keyed by attribute name, as found in DynamoDBStreamRecord.Keys, NewImage and
// OldImage) into the provided destination value.
//
// The destination is decoded using the standard encoding/json package, so
// `json` struct tags work as expected. This sidesteps the long-standing
// limitation that DynamoDBAttributeValue is not directly compatible with the
// dynamodbattribute helpers shipped by the AWS SDK (see issue #58).
//
// Typical usage:
//
// type MyItem struct {
// ID string `json:"id"`
// Name string `json:"name"`
// }
//
// var item MyItem
// if err := events.UnmarshalStreamImage(record.Change.NewImage, &item); err != nil {
// return err
// }
//
// Note that this helper does NOT honor `dynamodbav` tags used by the AWS SDK's
// dynamodbattribute package. For SDK-tag-aware decoding, use ToDynamoDBJSON to
// emit canonical DynamoDB JSON and feed it to your SDK of choice.
func UnmarshalStreamImage(image map[string]DynamoDBAttributeValue, out interface{}) error {
flat := make(map[string]interface{}, len(image))
for k, v := range image {
raw, err := flattenAttributeValue(v)
if err != nil {
return fmt.Errorf("UnmarshalStreamImage: %q: %w", k, err)
}
flat[k] = raw
}

encoded, err := json.Marshal(flat)
if err != nil {
return fmt.Errorf("UnmarshalStreamImage: encode: %w", err)
}
if err := json.Unmarshal(encoded, out); err != nil {
return fmt.Errorf("UnmarshalStreamImage: decode: %w", err)
}
return nil
}

// ToDynamoDBJSON returns the canonical DynamoDB JSON wire form of an attribute
// value (for example {"S":"hello"} or {"N":"123"}). The returned bytes are
// directly compatible with json.Unmarshal-ing into the AttributeValue type
// defined by either aws-sdk-go (service/dynamodb.AttributeValue) or
// aws-sdk-go-v2 (service/dynamodb/types.AttributeValueMemberX), via the
// standard encoding/json package.
//
// This avoids forcing aws-lambda-go to take a hard dependency on either SDK
// version while still giving callers a stable bridge into SDK types.
func (av DynamoDBAttributeValue) ToDynamoDBJSON() ([]byte, error) {
return av.MarshalJSON()
}

// ToDynamoDBJSONMap returns the canonical DynamoDB JSON wire form of an
// attribute-value map, suitable for json.Unmarshal-ing into
// map[string]*dynamodb.AttributeValue (SDK v1) or
// map[string]types.AttributeValue (SDK v2).
func ToDynamoDBJSONMap(image map[string]DynamoDBAttributeValue) ([]byte, error) {
return json.Marshal(image)
}

// ToDynamoDBJSON returns the canonical DynamoDB JSON wire form of this stream
// record's Keys, NewImage and OldImage, structured as a top-level object with
// those three keys (any of which may be omitted when empty). This is a
// convenience for callers who want a single round-trip into SDK types:
//
// raw, _ := record.Change.ToDynamoDBJSON()
// // json.Unmarshal(raw, &someSDKShape)
func (r DynamoDBStreamRecord) ToDynamoDBJSON() ([]byte, error) {
envelope := struct {
Keys map[string]DynamoDBAttributeValue `json:"Keys,omitempty"`
NewImage map[string]DynamoDBAttributeValue `json:"NewImage,omitempty"`
OldImage map[string]DynamoDBAttributeValue `json:"OldImage,omitempty"`
}{
Keys: r.Keys,
NewImage: r.NewImage,
OldImage: r.OldImage,
}
return json.Marshal(envelope)
}

// flattenAttributeValue converts a DynamoDBAttributeValue into a plain Go
// value (string, float64, bool, []byte, []interface{}, map[string]interface{}
// or nil) so it can be re-encoded as ordinary JSON for downstream
// json.Unmarshal calls. Numbers are decoded with json.Number to preserve
// precision when the destination uses json.Number / json.Decoder.UseNumber.
func flattenAttributeValue(av DynamoDBAttributeValue) (interface{}, error) {
switch av.DataType() {
case DataTypeNull:
return nil, nil
case DataTypeString:
return av.String(), nil
case DataTypeNumber:
return json.Number(av.Number()), nil
case DataTypeBoolean:
return av.Boolean(), nil
case DataTypeBinary:
// Mirror DynamoDB's wire shape: binaries are base64 strings on the wire.
return av.Binary(), nil
case DataTypeStringSet:
return av.StringSet(), nil
case DataTypeNumberSet:
ns := av.NumberSet()
out := make([]json.Number, len(ns))
for i, n := range ns {
out[i] = json.Number(n)
}
return out, nil
case DataTypeBinarySet:
return av.BinarySet(), nil
case DataTypeList:
list := av.List()
out := make([]interface{}, len(list))
for i, item := range list {
v, err := flattenAttributeValue(item)
if err != nil {
return nil, err
}
out[i] = v
}
return out, nil
case DataTypeMap:
m := av.Map()
out := make(map[string]interface{}, len(m))
for k, item := range m {
v, err := flattenAttributeValue(item)
if err != nil {
return nil, err
}
out[k] = v
}
return out, nil
default:
return nil, fmt.Errorf("unsupported DynamoDB data type %v", av.DataType())
}
}
176 changes: 176 additions & 0 deletions events/dynamodb_helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.

package events

import (
"encoding/json"
"io/ioutil"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestUnmarshalStreamImage_ScalarFields(t *testing.T) {
image := map[string]DynamoDBAttributeValue{
"id": NewStringAttribute("abc-123"),
"count": NewNumberAttribute("42"),
"score": NewNumberAttribute("3.14"),
"active": NewBooleanAttribute(true),
"tags": NewStringSetAttribute([]string{"alpha", "beta"}),
}

type item struct {
ID string `json:"id"`
Count int `json:"count"`
Score float64 `json:"score"`
Active bool `json:"active"`
Tags []string `json:"tags"`
}

var got item
require.NoError(t, UnmarshalStreamImage(image, &got))
assert.Equal(t, "abc-123", got.ID)
assert.Equal(t, 42, got.Count)
assert.InDelta(t, 3.14, got.Score, 1e-9)
assert.True(t, got.Active)
assert.ElementsMatch(t, []string{"alpha", "beta"}, got.Tags)
}

func TestUnmarshalStreamImage_NestedMapAndList(t *testing.T) {
image := map[string]DynamoDBAttributeValue{
"user": NewMapAttribute(map[string]DynamoDBAttributeValue{
"name": NewStringAttribute("Joe"),
"age": NewNumberAttribute("35"),
}),
"items": NewListAttribute([]DynamoDBAttributeValue{
NewStringAttribute("Cookies"),
NewStringAttribute("Coffee"),
}),
}

type user struct {
Name string `json:"name"`
Age int `json:"age"`
}
type item struct {
User user `json:"user"`
Items []string `json:"items"`
}

var got item
require.NoError(t, UnmarshalStreamImage(image, &got))
assert.Equal(t, "Joe", got.User.Name)
assert.Equal(t, 35, got.User.Age)
assert.Equal(t, []string{"Cookies", "Coffee"}, got.Items)
}

func TestUnmarshalStreamImage_NullAttribute(t *testing.T) {
image := map[string]DynamoDBAttributeValue{
"deleted_at": NewNullAttribute(),
"name": NewStringAttribute("ada"),
}

type item struct {
Name string `json:"name"`
DeletedAt *string `json:"deleted_at"`
}

var got item
require.NoError(t, UnmarshalStreamImage(image, &got))
assert.Equal(t, "ada", got.Name)
assert.Nil(t, got.DeletedAt)
}

func TestUnmarshalStreamImage_FromTestdata(t *testing.T) {
raw, err := ioutil.ReadFile("./testdata/dynamodb-event.json")
require.NoError(t, err)

var evt DynamoDBEvent
require.NoError(t, json.Unmarshal(raw, &evt))
require.NotEmpty(t, evt.Records)

first := evt.Records[0].Change.NewImage
type partial struct {
Val string `json:"val"`
Key string `json:"key"`
}
var got partial
require.NoError(t, UnmarshalStreamImage(first, &got))
assert.Equal(t, "data", got.Val)
assert.Equal(t, "binary", got.Key)
}

func TestToDynamoDBJSON_AttributeValueRoundTrip(t *testing.T) {
av := NewStringAttribute("hello")
raw, err := av.ToDynamoDBJSON()
require.NoError(t, err)

var decoded DynamoDBAttributeValue
require.NoError(t, json.Unmarshal(raw, &decoded))
assert.Equal(t, "hello", decoded.String())
}

func TestToDynamoDBJSONMap_PreservesShape(t *testing.T) {
image := map[string]DynamoDBAttributeValue{
"id": NewStringAttribute("k1"),
"qty": NewNumberAttribute("7"),
}

raw, err := ToDynamoDBJSONMap(image)
require.NoError(t, err)

var decoded map[string]DynamoDBAttributeValue
require.NoError(t, json.Unmarshal(raw, &decoded))
assert.Equal(t, "k1", decoded["id"].String())
assert.Equal(t, "7", decoded["qty"].Number())
}

func TestStreamRecord_ToDynamoDBJSON_OmitsEmpty(t *testing.T) {
rec := DynamoDBStreamRecord{
Keys: map[string]DynamoDBAttributeValue{
"pk": NewStringAttribute("k1"),
},
}

raw, err := rec.ToDynamoDBJSON()
require.NoError(t, err)

var envelope map[string]json.RawMessage
require.NoError(t, json.Unmarshal(raw, &envelope))
_, hasKeys := envelope["Keys"]
_, hasNew := envelope["NewImage"]
_, hasOld := envelope["OldImage"]
assert.True(t, hasKeys)
assert.False(t, hasNew)
assert.False(t, hasOld)
}

func TestStreamRecord_ToDynamoDBJSON_AllSections(t *testing.T) {
rec := DynamoDBStreamRecord{
Keys: map[string]DynamoDBAttributeValue{
"pk": NewStringAttribute("k1"),
},
NewImage: map[string]DynamoDBAttributeValue{
"pk": NewStringAttribute("k1"),
"qty": NewNumberAttribute("7"),
},
OldImage: map[string]DynamoDBAttributeValue{
"pk": NewStringAttribute("k1"),
"qty": NewNumberAttribute("3"),
},
}

raw, err := rec.ToDynamoDBJSON()
require.NoError(t, err)

var envelope struct {
Keys map[string]DynamoDBAttributeValue `json:"Keys"`
NewImage map[string]DynamoDBAttributeValue `json:"NewImage"`
OldImage map[string]DynamoDBAttributeValue `json:"OldImage"`
}
require.NoError(t, json.Unmarshal(raw, &envelope))
assert.Equal(t, "k1", envelope.Keys["pk"].String())
assert.Equal(t, "7", envelope.NewImage["qty"].Number())
assert.Equal(t, "3", envelope.OldImage["qty"].Number())
}
Loading