Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import software.amazon.lambda.durable.DurableFuture;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.ParallelConfig;
import software.amazon.lambda.durable.model.ParallelResult;

/**
* Example demonstrating parallel branch execution with the Durable Execution SDK.
Expand Down Expand Up @@ -38,8 +39,9 @@ public Output handleRequest(Input input, DurableContext context) {
var config = ParallelConfig.builder().build();

var futures = new ArrayList<DurableFuture<String>>(items.size());
var parallel = context.parallel("process-items", config);

try (var parallel = context.parallel("process-items", config)) {
try (parallel) {
for (var item : items) {
var future = parallel.branch("process-" + item, String.class, branchCtx -> {
branchCtx.getLogger().info("Processing item: {}", item);
Expand All @@ -49,7 +51,12 @@ public Output handleRequest(Input input, DurableContext context) {
}
} // join() called here via AutoCloseable

logger.info("All branches complete, collecting results");
ParallelResult parallelResult = parallel.get();
logger.info(
"Parallel complete: total={}, succeeded={}, failed={}",
parallelResult.getTotalBranches(),
parallelResult.getSucceededBranches(),
parallelResult.getFailedBranches());

var results = futures.stream().map(DurableFuture::get).toList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.ParallelConfig;
import software.amazon.lambda.durable.StepConfig;
import software.amazon.lambda.durable.model.ParallelResult;
import software.amazon.lambda.durable.retry.RetryStrategies;

/**
Expand All @@ -24,22 +25,24 @@
public class ParallelFailureToleranceExample
extends DurableHandler<ParallelFailureToleranceExample.Input, ParallelFailureToleranceExample.Output> {

public record Input(List<String> services, int toleratedFailures) {}
public record Input(List<String> services, int toleratedFailures, int minSuccessful) {}

public record Output(List<String> succeeded, List<String> failed) {}
public record Output(int succeeded, int failed) {}

@Override
public Output handleRequest(Input input, DurableContext context) {
var logger = context.getLogger();
logger.info("Starting parallel execution with toleratedFailureCount={}", input.toleratedFailures());

var config = ParallelConfig.builder()
.minSuccessful(input.minSuccessful())
.toleratedFailureCount(input.toleratedFailures())
.build();

var futures = new ArrayList<DurableFuture<String>>(input.services().size());
var parallel = context.parallel("call-services", config);

try (var parallel = context.parallel("call-services", config)) {
try (parallel) {
for (var service : input.services()) {
var future = parallel.branch("call-" + service, String.class, branchCtx -> {
return branchCtx.step(
Expand All @@ -59,20 +62,17 @@ public Output handleRequest(Input input, DurableContext context) {
}
}

var succeeded = new ArrayList<String>();
var failed = new ArrayList<String>();
ParallelResult parallelResult = parallel.get();
logger.info(
"Parallel complete: succeeded={}, failed={}, status={}",
parallelResult.getSucceededBranches(),
parallelResult.getFailedBranches(),
parallelResult.getCompletionStatus().isSucceeded() ? "succeeded" : "failed");

for (int i = 0; i < futures.size(); i++) {
try {
var result = futures.get(i).get();
succeeded.add(result);
} catch (Exception e) {
failed.add(input.services().get(i));
logger.info("Branch failed for service {}: {}", input.services().get(i), e.getMessage());
}
}
var succeeded = parallelResult.getSucceededBranches();
var failed = parallelResult.getFailedBranches();

logger.info("Completed: {} succeeded, {} failed", succeeded.size(), failed.size());
logger.info("Completed: {} succeeded, {} failed", succeeded, failed);
return new Output(succeeded, failed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import software.amazon.lambda.durable.DurableFuture;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.ParallelConfig;
import software.amazon.lambda.durable.model.ParallelResult;

/**
* Example demonstrating parallel branches where some branches include wait operations.
Expand All @@ -29,7 +30,7 @@ public class ParallelWithWaitExample

public record Input(String userId, String message) {}

public record Output(List<String> deliveries) {}
public record Output(List<String> deliveries, int success, int faiure) {}

@Override
public Output handleRequest(Input input, DurableContext context) {
Expand All @@ -38,8 +39,9 @@ public Output handleRequest(Input input, DurableContext context) {

var config = ParallelConfig.builder().build();
var futures = new ArrayList<DurableFuture<String>>(3);
var parallel = context.parallel("notify", config);

try (var parallel = context.parallel("notify", config)) {
try (parallel) {

// Branch 1: email — no wait, deliver immediately
futures.add(parallel.branch("email", String.class, ctx -> {
Expand All @@ -60,10 +62,12 @@ public Output handleRequest(Input input, DurableContext context) {
}));
}

ParallelResult result = parallel.get();

var deliveries = futures.stream().map(DurableFuture::get).toList();
logger.info("All {} notifications delivered", deliveries.size());
// Test replay
context.wait("wait for finalization", Duration.ofSeconds(5));
return new Output(deliveries);
return new Output(deliveries, result.getSucceededBranches(), result.getFailedBranches());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,28 @@ void succeedsWhenFailuresAreWithinTolerance() {
var runner = LocalDurableTestRunner.create(ParallelFailureToleranceExample.Input.class, handler);

// 2 good services, 1 bad — toleratedFailureCount=1 so the parallel op still succeeds
var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "bad-svc-b", "svc-c"), 1);
var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "bad-svc-b", "svc-c"), 1, -1);
var result = runner.runUntilComplete(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

var output = result.getResult(ParallelFailureToleranceExample.Output.class);
assertEquals(2, output.succeeded().size());
assertEquals(1, output.failed().size());
assertTrue(output.succeeded().contains("ok:svc-a"));
assertTrue(output.succeeded().contains("ok:svc-c"));
assertTrue(output.failed().contains("bad-svc-b"));
assertEquals(2, output.succeeded());
assertEquals(1, output.failed());
}

@Test
void succeedsWhenAllBranchesSucceed() {
var handler = new ParallelFailureToleranceExample();
var runner = LocalDurableTestRunner.create(ParallelFailureToleranceExample.Input.class, handler);

var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "svc-b", "svc-c"), 2);
var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "svc-b", "svc-c"), 2, -1);
var result = runner.runUntilComplete(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

var output = result.getResult(ParallelFailureToleranceExample.Output.class);
assertEquals(3, output.succeeded().size());
assertTrue(output.failed().isEmpty());
assertEquals(3, output.succeeded());
}

@Test
Expand All @@ -51,13 +47,13 @@ void failsWhenFailuresExceedTolerance() {
var runner = LocalDurableTestRunner.create(ParallelFailureToleranceExample.Input.class, handler);

// 2 bad services, toleratedFailureCount=1 — second failure exceeds tolerance
var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "bad-svc-b", "bad-svc-c"), 1);
var input = new ParallelFailureToleranceExample.Input(List.of("svc-a", "bad-svc-b", "bad-svc-c"), 1, 2);
var result = runner.runUntilComplete(input);

assertEquals(ExecutionStatus.SUCCEEDED, result.getStatus());

var output = result.getResult(ParallelFailureToleranceExample.Output.class);
assertEquals(2, output.failed().size());
assertEquals(1, output.succeeded().size());
assertEquals(2, output.failed());
assertEquals(1, output.succeeded());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ void completesAfterManuallyAdvancingWaits() {

var output = result.getResult(ParallelWithWaitExample.Output.class);
assertEquals(List.of("email:world", "sms:world", "push:world"), output.deliveries());
assertEquals(3, output.success());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,25 @@
package software.amazon.lambda.durable;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import software.amazon.lambda.durable.model.ParallelResult;
import software.amazon.lambda.durable.operation.ParallelOperation;

/** User-facing context for managing parallel branch execution within a durable function. */
public class ParallelContext implements AutoCloseable {
public class ParallelContext implements AutoCloseable, DurableFuture<ParallelResult> {

private final ParallelOperation<?> parallelOperation;
private final ParallelOperation parallelOperation;
private final DurableContext durableContext;
private boolean joined;
private final AtomicBoolean joined = new AtomicBoolean(false);

/**
* Creates a new ParallelContext.
*
* @param parallelOperation the underlying parallel operation managing concurrency
* @param durableContext the durable context for creating child operations
*/
public ParallelContext(ParallelOperation<?> parallelOperation, DurableContext durableContext) {
public ParallelContext(ParallelOperation parallelOperation, DurableContext durableContext) {
this.parallelOperation = Objects.requireNonNull(parallelOperation, "parallelOperation cannot be null");
this.durableContext = Objects.requireNonNull(durableContext, "durableContext cannot be null");
}
Expand Down Expand Up @@ -49,7 +51,7 @@ public <T> DurableFuture<T> branch(String name, Class<T> resultType, Function<Du
* @throws IllegalStateException if called after {@link #join()}
*/
public <T> DurableFuture<T> branch(String name, TypeToken<T> resultType, Function<DurableContext, T> func) {
if (joined) {
if (joined.get()) {
throw new IllegalStateException("Cannot add branches after join() has been called");
}
return parallelOperation.addItem(
Expand All @@ -66,11 +68,23 @@ public <T> DurableFuture<T> branch(String name, TypeToken<T> resultType, Functio
* @throws software.amazon.lambda.durable.exception.ConcurrencyExecutionException if failure threshold exceeded
*/
public void join() {
if (joined) {
if (!joined.compareAndSet(false, true)) {
return;
}
joined = true;
parallelOperation.get();
parallelOperation.join();
}

/**
* Blocks until the parallel operation completes and returns the {@link ParallelResult}.
*
* <p>Calling {@code get()} implicitly calls {@code join()} if it has not been called yet.
*
* @return the {@link ParallelResult} summarising branch outcomes
*/
@Override
public ParallelResult get() {
joined.set(true);
return parallelOperation.get();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,9 +558,8 @@ public ParallelContext parallel(String name, ParallelConfig config) {
Objects.requireNonNull(config, "config cannot be null");
var operationId = nextOperationId();

var parallelOp = new ParallelOperation<>(
var parallelOp = new ParallelOperation(
OperationIdentifier.of(operationId, name, OperationType.CONTEXT, OperationSubType.PARALLEL),
TypeToken.get(Void.class),
getDurableConfig().getSerDes(),
this,
config.maxConcurrency(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package software.amazon.lambda.durable.model;

/**
* Summary result of a parallel operation.
*
* <p>Captures the aggregate outcome of a parallel execution: how many branches were registered, how many succeeded, how
* many failed, and why the operation completed.
*/
public class ParallelResult {

private final int totalBranches;
private final int succeededBranches;
private final int failedBranches;
private final ConcurrencyCompletionStatus completionStatus;

public ParallelResult(
int totalBranches,
int succeededBranches,
int failedBranches,
ConcurrencyCompletionStatus completionStatus) {
this.totalBranches = totalBranches;
this.succeededBranches = succeededBranches;
this.failedBranches = failedBranches;
this.completionStatus = completionStatus;
}

/** Returns the total number of branches registered before {@code join()} was called. */
public int getTotalBranches() {
return totalBranches;
}

/** Returns the number of branches that completed without throwing. */
public int getSucceededBranches() {
return succeededBranches;
}

/** Returns the number of branches that threw an exception. */
public int getFailedBranches() {
return failedBranches;
}

/** Returns the status indicating why the parallel operation completed. */
public ConcurrencyCompletionStatus getCompletionStatus() {
return completionStatus;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public abstract class ConcurrencyOperation<T> extends BaseDurableOperation<T> {
private final Set<String> completedOperations = Collections.synchronizedSet(new HashSet<String>());
private OperationIdGenerator operationIdGenerator;
private final DurableContextImpl rootContext;
private ConcurrencyCompletionStatus completionStatus;

protected ConcurrencyOperation(
OperationIdentifier operationIdentifier,
Expand Down Expand Up @@ -203,9 +204,9 @@ public void onItemComplete(ChildContextOperation<?> child) {
}
runningCount.decrementAndGet();

var status = canComplete();
if (status != null) {
handleComplete(status);
this.completionStatus = canComplete();
if (this.completionStatus != null) {
handleComplete(this.completionStatus);
} else {
executeNextItemIfAllowed();
}
Expand Down Expand Up @@ -245,17 +246,13 @@ private void handleComplete(ConcurrencyCompletionStatus status) {
* Blocks the calling thread until the concurrency operation reaches a terminal state. Validates item count, handles
* zero-branch case, then delegates to {@code waitForOperationCompletion()} from BaseDurableOperation.
*/
protected void join() {
public void join() {
validateItemCount();
isJoined.set(true);
if (childOperations.isEmpty()) {
return;
}

synchronized (this) {
var status = canComplete();
if (status != null) {
handleComplete(status);
this.completionStatus = canComplete();
if (this.completionStatus != null) {
handleComplete(this.completionStatus);
}
}

Expand All @@ -274,6 +271,10 @@ protected int getTotalItems() {
return childOperations.size();
}

protected ConcurrencyCompletionStatus getCompletionStatus() {
return completionStatus;
}

protected List<ChildContextOperation<?>> getChildOperations() {
return Collections.unmodifiableList(childOperations);
}
Expand Down
Loading
Loading