Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions docs/core/map.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ MapResult<OrderResult> result = future.get();
| Method | Description |
|--------|-------------|
| `getResult(i)` | Result at index `i`, or `null` if that item failed |
| `getError(i)` | `ErrorObject` at index `i`, or `null` if that item succeeded |
| `getError(i)` | `MapError` at index `i`, or `null` if that item succeeded |
| `getItem(i)` | The `MapResultItem` at index `i` with status, result, and error |
| `allSucceeded()` | `true` if every item succeeded |
| `size()` | Number of items in the result |
| `items()` | All result items as an unmodifiable list |
| `results()` | All results as an unmodifiable list (nulls for failed items) |
| `succeeded()` | Only the non-null (successful) results |
| `failed()` | Only the non-null `ErrorObject`s |
| `failed()` | Only the non-null `MapError`s |
| `completionReason()` | Why the operation completed (`ALL_COMPLETED`, `MIN_SUCCESSFUL_REACHED`, `FAILURE_TOLERANCE_EXCEEDED`) |

### MapResultItem
Expand All @@ -59,7 +59,17 @@ Each `MapResultItem<T>` contains:
|-------|-------------|
| `status()` | `SUCCEEDED`, `FAILED`, or `NOT_STARTED` |
| `result()` | The result value, or `null` if failed/not started |
| `error()` | The error details as `ErrorObject`, or `null` if succeeded/not started |
| `error()` | The error details as `MapError`, or `null` if succeeded/not started |

### MapError

Failed items store error details as `MapError`, a serializable record that survives checkpoint-and-replay cycles:

| Field | Description |
|-------|-------------|
| `errorType()` | Fully qualified exception class name (e.g., `java.lang.RuntimeException`) |
| `errorMessage()` | The exception message |
| `stackTrace()` | Stack trace frames as a list of strings, or `null` |

### Error Isolation

Expand Down Expand Up @@ -87,9 +97,11 @@ var config = MapConfig.builder()
.build();

var result = ctx.map("process-orders", items, OrderResult.class,
(orderId, index, childCtx) -> process(childCtx, orderId), config);
(orderId, index, childCtx) -> process(orderId, childCtx), config);
```

`MapConfig` also supports a custom `serDes` for serialization via `.serDes(customSerDes)`. By default, the context's serializer is used. `maxConcurrency` must be at least 1 if set.

#### Concurrency Limiting

`maxConcurrency` controls how many items execute concurrently. When set, items beyond the limit are queued and started as earlier items complete. Default is `null` (unlimited).
Expand Down Expand Up @@ -158,7 +170,7 @@ The function passed to `map()` is a `MapFunction<I, O>`:
```java
@FunctionalInterface
public interface MapFunction<I, O> {
O apply(I item, int index, DurableContext context) throws Exception;
O apply(I item, int index, DurableContext context);
}
```

Expand Down
14 changes: 11 additions & 3 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,12 @@ context.step("name", Type.class, supplier,
│ - StepOperation<T> │ │ - Queues requests │
│ - WaitOperation │ │ - Batches API calls (750KB) │
│ - WaitForConditionOperation │ │ │
│ - execute() / get() │ │ - Notifies via callback │
└──────────────────────────────┘ └──────────────────────────────┘
│ - ConcurrencyOperation<T> │ │ - Notifies via callback │
│ - MapOperation<I,O> │ └──────────────────────────────┘
│ - ParallelOperation │
│ - ChildContextOperation<T> │
│ - execute() / get() │
└──────────────────────────────┘
┌──────────────────────────────┐
Expand Down Expand Up @@ -235,7 +239,11 @@ software.amazon.lambda.durable
│ ├── InvokeOperation<T> # Invoke logic
│ ├── CallbackOperation<T> # Callback logic
│ ├── WaitOperation # Wait logic
│ └── WaitForConditionOperation<T> # Polling condition logic
│ ├── WaitForConditionOperation<T> # Polling condition logic
│ ├── ConcurrencyOperation<T> # Shared base for map/parallel
│ ├── MapOperation<I,O> # Map operation logic
│ ├── ParallelOperation # Parallel operation logic
│ └── ChildContextOperation<T> # Per-item child context execution
├── logging/
│ ├── DurableLogger # Context-aware logger wrapper (MDC-based)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.HashSet;
import java.util.List;
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.model.CompletionReason;
import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;

Expand Down Expand Up @@ -54,7 +54,7 @@ void mapWithEmptyCollection_returnsEmptyMapResult() {

assertEquals(0, result.size());
assertTrue(result.allSucceeded());
assertEquals(CompletionReason.ALL_COMPLETED, result.completionReason());
assertEquals(ConcurrencyCompletionStatus.ALL_COMPLETED, result.completionReason());

return "done";
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;
import software.amazon.lambda.durable.model.CompletionReason;
import software.amazon.lambda.durable.model.ConcurrencyCompletionStatus;
import software.amazon.lambda.durable.model.ExecutionStatus;
import software.amazon.lambda.durable.model.MapResultItem;
import software.amazon.lambda.durable.testing.LocalDurableTestRunner;
Expand Down Expand Up @@ -81,7 +81,7 @@ void testMapPartialFailure_failedItemDoesNotPreventOthers() {
assertNull(result.getError(0));
assertNull(result.getError(2));

assertEquals(CompletionReason.ALL_COMPLETED, result.completionReason());
assertEquals(ConcurrencyCompletionStatus.ALL_COMPLETED, result.completionReason());

return "done";
});
Expand Down Expand Up @@ -252,7 +252,7 @@ void testMapWithToleratedFailureCount_earlyTermination() {
},
config);

assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
assertFalse(result.allSucceeded());
assertEquals(5, result.size());
assertEquals("OK", result.getResult(0));
Expand All @@ -279,7 +279,7 @@ void testMapWithMinSuccessful_earlyTermination() {
var result = context.map(
"min-successful", items, String.class, (item, index, ctx) -> item.toUpperCase(), config);

assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason());
assertEquals(ConcurrencyCompletionStatus.MIN_SUCCESSFUL_REACHED, result.completionReason());
assertEquals(5, result.size());
assertEquals("A", result.getResult(0));
assertEquals("B", result.getResult(1));
Expand Down Expand Up @@ -419,7 +419,7 @@ void testMapUnlimitedConcurrencyWithToleratedFailureCount() {
},
config);

assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
assertFalse(result.allSucceeded());
return "done";
});
Expand All @@ -442,7 +442,7 @@ void testMapReplayWithFailedBranches() {
return item.toUpperCase();
});

// Errors survive replay since they are stored as ErrorObject (not raw Throwable)
// Errors survive replay since they are stored as MapError (not raw Throwable)
assertEquals("OK", result.getResult(0));
assertEquals("OK2", result.getResult(2));
return "done";
Expand Down Expand Up @@ -531,7 +531,7 @@ void testMapWithAllSuccessfulCompletionConfig_stopsOnFirstFailure() {
},
config);

assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
assertEquals("OK1", result.getResult(0));
assertNotNull(result.getError(1));
// Items after the failure should be NOT_STARTED
Expand Down Expand Up @@ -622,14 +622,51 @@ void testMapWithToleratedFailurePercentage() {
},
config);

assertEquals(CompletionReason.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
return "done";
});

var result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
}

@Test
void testMapWithToleratedFailurePercentage_replay() {
var executionCount = new AtomicInteger(0);

var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var items = List.of("ok1", "FAIL1", "ok2", "FAIL2", "ok3", "FAIL3", "ok4");
var config = MapConfig.builder()
.completionConfig(CompletionConfig.toleratedFailurePercentage(0.3))
.build();
var result = context.map(
"pct-fail-replay",
items,
String.class,
(item, index, ctx) -> {
executionCount.incrementAndGet();
if (item.startsWith("FAIL")) {
throw new RuntimeException("failed: " + item);
}
return item.toUpperCase();
},
config);

assertEquals(ConcurrencyCompletionStatus.FAILURE_TOLERANCE_EXCEEDED, result.completionReason());
return "done";
});

var result1 = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result1.getStatus());
var firstRunCount = executionCount.get();

// Replay — with unlimited concurrency, children replay simultaneously.
// Verify completionReason is consistent and no re-execution occurs.
var result2 = runner.run("test");
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
}

@Test
void testMapAsyncWithWaitInsideBranches() {
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
Expand Down Expand Up @@ -747,7 +784,7 @@ void testMapWithMinSuccessful_replay() {
},
config);

assertEquals(CompletionReason.MIN_SUCCESSFUL_REACHED, result.completionReason());
assertEquals(ConcurrencyCompletionStatus.MIN_SUCCESSFUL_REACHED, result.completionReason());
assertEquals("A", result.getResult(0));
assertEquals("B", result.getResult(1));
return "done";
Expand Down Expand Up @@ -826,4 +863,24 @@ void testMapWithLargeResult_replayChildren() {
assertEquals(ExecutionStatus.SUCCEEDED, result2.getStatus());
assertEquals(firstRunCount, executionCount.get(), "Map functions should not re-execute on replay");
}

@Test
void testMapWithNullResults() {
var runner = LocalDurableTestRunner.create(String.class, (input, context) -> {
var items = List.of("a", "b", "c");
var result = context.map("null-map", items, String.class, (item, index, ctx) -> null);

assertTrue(result.allSucceeded());
assertEquals(3, result.size());
for (int i = 0; i < result.size(); i++) {
assertEquals(MapResultItem.Status.SUCCEEDED, result.getItem(i).status());
assertNull(result.getResult(i));
assertNull(result.getError(i));
}
return "done";
});

var result = runner.runUntilComplete("test");
assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
public enum ConcurrencyCompletionStatus {
ALL_COMPLETED,
MIN_SUCCESSFUL_REACHED,
MIN_SUCCESSFUL_NOT_REACHED,
FAILURE_TOLERANCE_EXCEEDED;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*
* <p>Holds ordered results from a map operation. Each index corresponds to the input item at the same position. Each
* item is represented as a {@link MapResultItem} containing its status, result, and error. Includes the
* {@link CompletionReason} indicating why the operation completed.
* {@link ConcurrencyCompletionStatus} indicating why the operation completed.
*
* <p>Errors are stored as {@link MapError} rather than raw Throwable, so they survive serialization across
* checkpoint-and-replay cycles without requiring AWS SDK-specific Jackson modules.
Expand All @@ -19,17 +19,17 @@
* @param completionReason why the operation completed
* @param <T> the result type of each item
*/
public record MapResult<T>(List<MapResultItem<T>> items, CompletionReason completionReason) {
public record MapResult<T>(List<MapResultItem<T>> items, ConcurrencyCompletionStatus completionReason) {

/** Compact constructor that applies defensive copy and defaults. */
public MapResult {
items = items != null ? List.copyOf(items) : Collections.emptyList();
completionReason = completionReason != null ? completionReason : CompletionReason.ALL_COMPLETED;
completionReason = completionReason != null ? completionReason : ConcurrencyCompletionStatus.ALL_COMPLETED;
}

/** Returns an empty MapResult with no items. */
public static <T> MapResult<T> empty() {
return new MapResult<>(Collections.emptyList(), CompletionReason.ALL_COMPLETED);
return new MapResult<>(Collections.emptyList(), ConcurrencyCompletionStatus.ALL_COMPLETED);
}

/** Returns the result item at the given index. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,8 @@ private void checkpointSuccess(T result) {
}

var serialized = serializeResult(result);
var serializedBytes = serialized.getBytes(StandardCharsets.UTF_8);

if (serializedBytes.length < LARGE_RESULT_THRESHOLD) {
if (serialized == null || serialized.getBytes(StandardCharsets.UTF_8).length < LARGE_RESULT_THRESHOLD) {
sendOperationUpdate(
OperationUpdate.builder().action(OperationAction.SUCCEED).payload(serialized));
} else {
Expand Down
Loading
Loading