Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,12 +201,13 @@ void testAgainstRealLambda() {
| Class | Responsibility |
|-------|----------------|
| `DurableHandler<I,O>` | Lambda entry point, extend this |
| `DurableContext` | User API: `step()`, `wait()`, `map()` |
| `DurableContext` | User API: `step()`, `wait()`, `map()`, `waitForCondition()` |
| `DurableExecutor` | Orchestrates execution lifecycle |
| `ExecutionManager` | Thread coordination, state management |
| `CheckpointBatcher` | Batches checkpoint API calls (750KB limit) |
| `StepOperation` | Executes steps with retry logic |
| `WaitOperation` | Handles wait checkpointing |
| `WaitForConditionOperation` | Polls a condition function with configurable backoff |
| `MapOperation` | Applies a function across items concurrently via child contexts |
| `BaseConcurrentOperation` | Shared base for map/parallel: concurrency limiting, completion evaluation |

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Your durable function extends `DurableHandler<I, O>` and implements `handleReque
- `ctx.invoke()` – Invoke another Lambda function and wait for the result
- `ctx.runInChildContext()` – Run an isolated child context with its own checkpoint log
- `ctx.map()` – Apply a function to each item in a collection concurrently
- `ctx.waitForCondition()` – Poll a condition function until it signals done, suspending between polls

## Quick Start

Expand Down Expand Up @@ -94,6 +95,7 @@ See [Deploy Lambda durable functions with Infrastructure as Code](https://docs.a
- [<u>Invoke</u>](docs/core/invoke.md) - Call other durable functions
- [<u>Child Contexts</u>](docs/core/child-contexts.md) - Organize complex workflows into isolated units
- [<u>Map</u>](docs/core/map.md) - Apply a function across a collection concurrently
- [<u>Wait for Condition</u>](docs/core/wait-for-condition.md) - Poll a condition until it's met, with configurable backoff

**Examples**

Expand Down
1 change: 1 addition & 0 deletions docs/advanced/error-handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ DurableExecutionException - General durable exception
├── CallbackException - General callback exception
│ ├── CallbackFailedException - External system sent an error response to the callback. Handle the error or propagate failure
│ └── CallbackTimeoutException - Callback exceeded its timeout duration. Handle the error or propagate the failure
├── WaitForConditionException - waitForCondition exceeded max polling attempts or failed. Catch to implement fallback logic.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with a subclass WaitForConditionFailedException?

└── ChildContextFailedException - Child context failed and the original exception could not be reconstructed
```

Expand Down
130 changes: 130 additions & 0 deletions docs/core/wait-for-condition.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
## waitForCondition() – Poll Until a Condition is Met

`waitForCondition` repeatedly calls a check function until it signals done. Between polls, the Lambda suspends without consuming compute. State is checkpointed after each check, so progress survives interruptions.

```java
// Poll an order status until it ships
var status = ctx.waitForCondition(
"wait-for-shipment",
String.class,
(currentStatus, stepCtx) -> {
var latest = orderService.getStatus(orderId);
return "SHIPPED".equals(latest)
? WaitForConditionResult.stopPolling(latest)
: WaitForConditionResult.continuePolling(latest);
},
"PENDING");
```

The check function receives the current state and a `StepContext`, and returns a `WaitForConditionResult`:
- `WaitForConditionResult.stopPolling(value)` — condition met, return `value` as the final result
- `WaitForConditionResult.continuePolling(value)` — keep polling, pass `value` to the next check

The `initialState` parameter (`"PENDING"` above) is passed to the first check invocation.

## waitForConditionAsync() – Non-Blocking Polling

`waitForConditionAsync()` starts polling but returns a `DurableFuture<T>` immediately, allowing other operations to run concurrently.

```java
DurableFuture<String> shipmentFuture = ctx.waitForConditionAsync(
"wait-for-shipment",
String.class,
(status, stepCtx) -> {
var latest = orderService.getStatus(orderId);
return "SHIPPED".equals(latest)
? WaitForConditionResult.stopPolling(latest)
: WaitForConditionResult.continuePolling(latest);
},
"PENDING");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if initialState is still useful to users. Looks like users can always directly pass their initial state into the lambda without a parameter.


// Do other work while polling runs
var invoice = ctx.step("generate-invoice", String.class, stepCtx -> generateInvoice(orderId));

// Block until the condition is met
var shipmentStatus = shipmentFuture.get();
```

## Wait Strategies

The wait strategy controls the delay between polls. By default, `waitForCondition` uses exponential backoff (60 max attempts, 5s initial delay, 300s max delay, 1.5x backoff rate, FULL jitter).

Use `WaitStrategies` to configure a different strategy:

```java
// Fixed 30-second delay, up to 10 attempts
var config = WaitForConditionConfig.<String>builder()
.waitStrategy(WaitStrategies.fixedDelay(10, Duration.ofSeconds(30)))
.build();

var result = ctx.waitForCondition("poll-status", String.class, checkFunc, "PENDING", config);
```

```java
// Custom exponential backoff
var config = WaitForConditionConfig.<String>builder()
.waitStrategy(WaitStrategies.exponentialBackoff(
20, // max attempts
Duration.ofSeconds(2), // initial delay
Duration.ofSeconds(60), // max delay
2.0, // backoff rate
JitterStrategy.HALF)) // jitter
.build();
```

| Factory Method | Description |
|----------------|-------------|
| `WaitStrategies.defaultStrategy()` | Exponential backoff: 60 attempts, 5s initial, 300s max, 1.5x rate, FULL jitter |
| `WaitStrategies.exponentialBackoff(...)` | Custom exponential backoff with configurable parameters |
| `WaitStrategies.fixedDelay(maxAttempts, delay)` | Constant delay between polls |
| `WaitStrategies.Presets.DEFAULT` | Same as `defaultStrategy()`, as a static constant |

## Configuration

`WaitForConditionConfig` holds optional parameters. All fields have sensible defaults, so you only need it when customizing:

```java
var config = WaitForConditionConfig.<String>builder()
.waitStrategy(WaitStrategies.fixedDelay(10, Duration.ofSeconds(5)))
.serDes(new CustomSerDes())
.build();
```

| Option | Default | Description |
|--------|---------|-------------|
| `waitStrategy()` | Exponential backoff (see above) | Controls delay between polls and max attempts |
| `serDes()` | Handler default | Custom serialization for checkpointing state |

## Error Handling

| Exception | When Thrown |
|-----------|-------------|
| `WaitForConditionException` | Max attempts exceeded (thrown by the wait strategy) |
| `SerDesException` | Checkpointed state fails to deserialize on replay |
| User's exception | Check function throws — propagated through `get()` |

```java
try {
var result = ctx.waitForCondition("poll", String.class, checkFunc, "initial");
} catch (WaitForConditionException e) {
// Max attempts exceeded — condition was never met
} catch (IllegalStateException e) {
// Check function threw this — handle accordingly
}
```

## Custom Wait Strategies

You can write a custom strategy by implementing `WaitForConditionWaitStrategy<T>`:

```java
WaitForConditionWaitStrategy<String> customStrategy = (state, attempt) -> {
// Vary delay based on state
if ("ALMOST_READY".equals(state)) {
return Duration.ofSeconds(2); // Poll faster when close
}
return Duration.ofSeconds(30); // Otherwise poll slowly
};
```

The strategy receives the current state and attempt number, and returns a `Duration`. Throw `WaitForConditionException` to stop polling with an error.
17 changes: 12 additions & 5 deletions docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,16 @@ context.step("name", Type.class, supplier,
│ - User-facing API │ │ - State (ops, token) │
│ - step(), stepAsync(), etc │ │ - Thread coordination │
│ - wait(), waitAsync() │ │ - Checkpoint batching │
│ - Operation ID counter │ │ - Checkpoint response handling │
└──────────────────────────────┘ │ - Polling │
└─────────────────────────────────┘
│ - waitForCondition() │ │ - Checkpoint response handling │
│ - Operation ID counter │ │ - Polling │
└──────────────────────────────┘ └─────────────────────────────────┘
│ │
▼ ▼
┌──────────────────────────────┐ ┌──────────────────────────────┐
│ Operations │ │ CheckpointBatcher │
│ - StepOperation<T> │ │ - Queues requests │
│ - WaitOperation │ │ - Batches API calls (750KB) │
│ - WaitForConditionOperation │ │ │
│ - execute() / get() │ │ - Notifies via callback │
└──────────────────────────────┘ └──────────────────────────────┘
Expand Down Expand Up @@ -233,7 +234,8 @@ software.amazon.lambda.durable
│ ├── StepOperation<T> # Step logic
│ ├── InvokeOperation<T> # Invoke logic
│ ├── CallbackOperation<T> # Callback logic
│ └── WaitOperation # Wait logic
│ ├── WaitOperation # Wait logic
│ └── WaitForConditionOperation<T> # Polling condition logic
├── logging/
│ ├── DurableLogger # Context-aware logger wrapper (MDC-based)
Expand All @@ -243,7 +245,9 @@ software.amazon.lambda.durable
│ ├── RetryStrategy # Interface
│ ├── RetryStrategies # Presets
│ ├── RetryDecision # shouldRetry + delay
│ └── JitterStrategy # Jitter options
│ ├── JitterStrategy # Jitter options
│ ├── WaitForConditionWaitStrategy # Polling delay interface
│ └── WaitStrategies # Polling strategy factory + Presets
├── client/
│ ├── DurableExecutionClient # Interface
Expand All @@ -264,6 +268,7 @@ software.amazon.lambda.durable
├── NonDeterministicExecutionException
├── StepFailedException
├── StepInterruptedException
├── WaitForConditionException
└── SerDesException
```

Expand Down Expand Up @@ -356,6 +361,7 @@ sequenceDiagram
DurableExecutionException (base)
├── StepFailedException # Step failed after all retries
├── StepInterruptedException # Step interrupted (AT_MOST_ONCE)
├── WaitForConditionException # Polling exceeded max attempts
├── NonDeterministicExecutionException # Replay mismatch
└── SerDesException # Serialization error

Expand All @@ -366,6 +372,7 @@ SuspendExecutionException # Internal: triggers suspension (not user-facin
|-----------|---------|----------|
| `StepFailedException` | Step throws after exhausting retries | Catch in handler or let fail |
| `StepInterruptedException` | AT_MOST_ONCE step interrupted mid-execution | Treat as failure |
| `WaitForConditionException` | waitForCondition exceeded max polling attempts | Catch in handler or let fail |
| `NonDeterministicExecutionException` | Replay finds different operation than expected | Bug in handler (non-deterministic code) |
| `SerDesException` | Jackson fails to serialize/deserialize | Fix data model or custom SerDes |

Expand Down
Loading
Loading