diff --git a/README.md b/README.md index fc7f2b4..e26e8e7 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ Prerequisites: * [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs. * [EnvConfig](src/EnvConfig) - Load client configuration from TOML files with programmatic overrides * [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource. +* [NexusCancellation](src/NexusCancellation) - Demonstrates how to cancel a running Nexus operation from a caller workflow. * [NexusContextPropagation](src/NexusContextPropagation) - Context propagation through Nexus services. * [NexusMultiArg](src/NexusMultiArg) - Nexus service implementation calling a workflow with multiple arguments. * [NexusSimple](src/NexusSimple) - Simple Nexus service implementation. diff --git a/TemporalioSamples.sln b/TemporalioSamples.sln index 44388c5..0189a4b 100644 --- a/TemporalioSamples.sln +++ b/TemporalioSamples.sln @@ -103,6 +103,10 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Updatable EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.Dsl", "src\Dsl\TemporalioSamples.Dsl.csproj", "{AF077751-E4B9-4696-93CB-74653F0BB6C4}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TemporalioSamples.NexusCancellation", "src\NexusCancellation\TemporalioSamples.NexusCancellation.csproj", "{6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "NexusCancellation", "NexusCancellation", "{7123C63D-3158-4C9A-8EAD-6D4F1295BC04}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -617,6 +621,18 @@ Global {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x64.Build.0 = Release|Any CPU {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x86.ActiveCfg = Release|Any CPU {AF077751-E4B9-4696-93CB-74653F0BB6C4}.Release|x86.Build.0 = Release|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Debug|x64.ActiveCfg = Debug|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Debug|x64.Build.0 = Debug|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Debug|x86.ActiveCfg = Debug|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Debug|x86.Build.0 = Debug|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Release|Any CPU.Build.0 = Release|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Release|x64.ActiveCfg = Release|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Release|x64.Build.0 = Release|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Release|x86.ActiveCfg = Release|Any CPU + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -668,5 +684,7 @@ Global {B37B3E98-4B04-48B8-9017-F0EDEDC7BD98} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {5D02DFEA-DC08-4B7B-8E26-EDAC1942D347} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} {AF077751-E4B9-4696-93CB-74653F0BB6C4} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} + {6D0BE4C4-9C4F-4A3D-78F1-B0B761568559} = {7123C63D-3158-4C9A-8EAD-6D4F1295BC04} + {7123C63D-3158-4C9A-8EAD-6D4F1295BC04} = {1A647B41-53D0-4638-AE5A-6630BAAE45FC} EndGlobalSection EndGlobal diff --git a/src/NexusCancellation/Caller/HelloCallerWorkflow.workflow.cs b/src/NexusCancellation/Caller/HelloCallerWorkflow.workflow.cs new file mode 100644 index 0000000..c0ae571 --- /dev/null +++ b/src/NexusCancellation/Caller/HelloCallerWorkflow.workflow.cs @@ -0,0 +1,64 @@ +namespace TemporalioSamples.NexusCancellation.Caller; + +using Microsoft.Extensions.Logging; +using Temporalio.Exceptions; +using Temporalio.Workflows; + +[Workflow] +public class HelloCallerWorkflow +{ + private static readonly IHelloService.HelloLanguage[] Languages = + [IHelloService.HelloLanguage.En, IHelloService.HelloLanguage.Fr, IHelloService.HelloLanguage.De, + IHelloService.HelloLanguage.Es, IHelloService.HelloLanguage.Tr]; + + [WorkflowRun] + public async Task RunAsync(string name) + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(Workflow.CancellationToken); + var client = Workflow.CreateNexusClient(IHelloService.EndpointName); + + // Concurrently execute an operation per language. + var tasks = Languages.Select(lang => + client.ExecuteNexusOperationAsync( + svc => svc.SayHello(new IHelloService.HelloInput(name, lang)), + new NexusOperationOptions + { + // We set the CancellationType to WaitCancellationRequested, which means the caller waits + // for the request to be received by the handler before proceeding with the cancellation. + // + // The default CancellationType is WaitCancellationCompleted, where the caller would wait + // until the operation is completed. + CancellationType = NexusOperationCancellationType.WaitCancellationRequested, + CancellationToken = cts.Token, + })).ToList(); + + var firstTask = await Workflow.WhenAnyAsync(tasks); + + Workflow.Logger.LogInformation("First operation completed, cancelling remaining operations"); + + // Now that the first operation has won the race, we are going to cancel the other operations. +#pragma warning disable CA1849, VSTHRD103 // CancelAsync() is non-deterministic in workflows. + cts.Cancel(); +#pragma warning restore CA1849, VSTHRD103 + + // Wait for all tasks to resolve. Once the workflow completes, the server will stop trying to cancel any of + // the operations that have not yet received cancellation, letting them run to completion. We are using the + // CancellationType of WaitCancellationRequested so these tasks will return as soon as the operation has received + // the cancellation request. + foreach (var task in tasks) + { + try + { + await task; + } + // Only throw an error if an operation errored out not due to cancellation. + catch (Exception ex) when (TemporalException.IsCanceledException(ex)) + { + Workflow.Logger.LogInformation("Operation was cancelled"); + } + } + + var result = await firstTask; + return result?.Message ?? throw new ApplicationFailureException("No successful result"); + } +} \ No newline at end of file diff --git a/src/NexusCancellation/Handler/HelloHandlerWorkflow.workflow.cs b/src/NexusCancellation/Handler/HelloHandlerWorkflow.workflow.cs new file mode 100644 index 0000000..0d12881 --- /dev/null +++ b/src/NexusCancellation/Handler/HelloHandlerWorkflow.workflow.cs @@ -0,0 +1,52 @@ +namespace TemporalioSamples.NexusCancellation.Handler; + +using Microsoft.Extensions.Logging; +using Temporalio.Exceptions; +using Temporalio.Workflows; + +[Workflow] +public class HelloHandlerWorkflow +{ + [WorkflowRun] + public async Task RunAsync(IHelloService.HelloInput input) + { + Workflow.Logger.LogInformation( + "HelloHandlerWorkflow started for {Name} in {Language}", + input.Name, + input.Language); + + // Sleep for a random duration to simulate some work + var duration = TimeSpan.FromSeconds(Workflow.Random.Next(5)); + + try + { + await Workflow.DelayAsync(duration); + } + catch (Exception ex) when (TemporalException.IsCanceledException(ex)) + { + // Simulate cleanup work after cancellation is requested. + // Use CancellationToken.None to create a "disconnected" context. + var cleanupDuration = TimeSpan.FromSeconds(Workflow.Random.Next(5)); + await Workflow.DelayAsync(cleanupDuration, CancellationToken.None); + + Workflow.Logger.LogInformation( + "HelloHandlerWorkflow for {Name} in {Language} was cancelled after {Duration} of work, performed {CleanupDuration} of cleanup", + input.Name, + input.Language, + duration, + cleanupDuration); + throw; // Re-throw the cancellation after cleanup + } + + return input.Language switch + { + IHelloService.HelloLanguage.En => new($"Hello {input.Name} 👋"), + IHelloService.HelloLanguage.Fr => new($"Bonjour {input.Name} 👋"), + IHelloService.HelloLanguage.De => new($"Hallo {input.Name} 👋"), + IHelloService.HelloLanguage.Es => new($"¡Hola! {input.Name} 👋"), + IHelloService.HelloLanguage.Tr => new($"Merhaba {input.Name} 👋"), + _ => throw new ApplicationFailureException( + $"Unsupported language: {input.Language}", errorType: "UNSUPPORTED_LANGUAGE"), + }; + } +} diff --git a/src/NexusCancellation/Handler/HelloService.cs b/src/NexusCancellation/Handler/HelloService.cs new file mode 100644 index 0000000..69fbd1f --- /dev/null +++ b/src/NexusCancellation/Handler/HelloService.cs @@ -0,0 +1,21 @@ +namespace TemporalioSamples.NexusCancellation.Handler; + +using NexusRpc.Handlers; +using Temporalio.Nexus; + +[NexusServiceHandler(typeof(IHelloService))] +public class HelloService +{ + [NexusOperationHandler] + public IOperationHandler SayHello() => + // This Nexus service operation is backed by a workflow run + WorkflowRunOperationHandler.FromHandleFactory( + (WorkflowRunOperationContext context, IHelloService.HelloInput input) => + context.StartWorkflowAsync( + (HelloHandlerWorkflow wf) => wf.RunAsync(input), + // Workflow IDs should typically be business meaningful IDs and are used to + // dedupe workflow starts. For this example, we're using the request ID + // allocated by Temporal when the caller workflow schedules the operation, + // this ID is guaranteed to be stable across retries of this operation. + new() { Id = context.HandlerContext.RequestId })); +} \ No newline at end of file diff --git a/src/NexusCancellation/IHelloService.cs b/src/NexusCancellation/IHelloService.cs new file mode 100644 index 0000000..0374cde --- /dev/null +++ b/src/NexusCancellation/IHelloService.cs @@ -0,0 +1,25 @@ +namespace TemporalioSamples.NexusCancellation; + +using NexusRpc; + +[NexusService] +public interface IHelloService +{ + static readonly string EndpointName = "nexus-cancellation-endpoint"; + + [NexusOperation] + HelloOutput SayHello(HelloInput input); + + public record HelloInput(string Name, HelloLanguage Language); + + public record HelloOutput(string Message); + + public enum HelloLanguage + { + En, + Fr, + De, + Es, + Tr, + } +} diff --git a/src/NexusCancellation/Program.cs b/src/NexusCancellation/Program.cs new file mode 100644 index 0000000..95eff15 --- /dev/null +++ b/src/NexusCancellation/Program.cs @@ -0,0 +1,94 @@ +using Microsoft.Extensions.Logging; +using Temporalio.Client; +using Temporalio.Common.EnvConfig; +using Temporalio.Worker; +using TemporalioSamples.NexusCancellation.Caller; +using TemporalioSamples.NexusCancellation.Handler; + +using var loggerFactory = LoggerFactory.Create(builder => + builder. + AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] "). + SetMinimumLevel(LogLevel.Information)); +var logger = loggerFactory.CreateLogger(); + +// Cancellation token cancelled on ctrl+c +using var tokenSource = new CancellationTokenSource(); +Console.CancelKeyPress += (_, eventArgs) => +{ + tokenSource.Cancel(); + eventArgs.Cancel = true; +}; + +Task ConnectClientAsync(string temporalNamespace) +{ + var connectOptions = ClientEnvConfig.LoadClientConnectOptions(); + connectOptions.TargetHost ??= "localhost:7233"; + connectOptions.Namespace = temporalNamespace; + connectOptions.LoggerFactory = loggerFactory; + return TemporalClient.ConnectAsync(connectOptions); +} + +async Task RunHandlerWorkerAsync() +{ + // Run worker until cancelled + logger.LogInformation("Running handler worker"); + using var worker = new TemporalWorker( + await ConnectClientAsync("nexus-cancellation-handler-namespace"), + new TemporalWorkerOptions(taskQueue: "nexus-cancellation-handler-sample"). + AddNexusService(new HelloService()). + AddWorkflow()); + try + { + await worker.ExecuteAsync(tokenSource.Token); + } + catch (OperationCanceledException) + { + logger.LogInformation("Handler worker cancelled"); + } +} + +async Task RunCallerWorkerAsync() +{ + // Run worker until cancelled + logger.LogInformation("Running caller worker"); + using var worker = new TemporalWorker( + await ConnectClientAsync("nexus-cancellation-caller-namespace"), + new TemporalWorkerOptions(taskQueue: "nexus-cancellation-caller-sample") + .AddWorkflow()); + try + { + await worker.ExecuteAsync(tokenSource.Token); + } + catch (OperationCanceledException) + { + logger.LogInformation("Caller worker cancelled"); + } +} + +async Task ExecuteCallerWorkflowAsync() +{ + logger.LogInformation("Executing caller hello workflow"); + + var client = await ConnectClientAsync("nexus-cancellation-caller-namespace"); + + var result = await client.ExecuteWorkflowAsync( + (HelloCallerWorkflow wf) => wf.RunAsync("Temporal"), + new(id: "nexus-cancellation-hello-id", taskQueue: "nexus-cancellation-caller-sample")); + logger.LogInformation("Workflow result: {Result}", result); +} + +switch (args.ElementAtOrDefault(0)) +{ + case "handler-worker": + await RunHandlerWorkerAsync(); + break; + case "caller-worker": + await RunCallerWorkerAsync(); + break; + case "caller-workflow": + await ExecuteCallerWorkflowAsync(); + break; + default: + throw new ArgumentException( + "Must pass 'handler-worker', 'caller-worker', or 'caller-workflow' as the single argument"); +} \ No newline at end of file diff --git a/src/NexusCancellation/README.md b/src/NexusCancellation/README.md new file mode 100644 index 0000000..adfaba1 --- /dev/null +++ b/src/NexusCancellation/README.md @@ -0,0 +1,96 @@ +# Nexus Cancellation + +This sample demonstrates how to cancel a Nexus operation from a caller workflow. It uses a CancellationType of `WaitCancellationRequested`, which allows the caller workflow to return after the handler workflow has received the cancellation. + +### Instructions + +To run, first see [README.md](../../README.md) for prerequisites such as starting the Temporal server. + +Run the following to create both namespaces and an endpoint: + +``` +temporal operator namespace create --namespace nexus-cancellation-handler-namespace +temporal operator namespace create --namespace nexus-cancellation-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-cancellation-endpoint \ + --target-namespace nexus-cancellation-handler-namespace \ + --target-task-queue nexus-cancellation-handler-sample +``` + +In one terminal, run the handler worker from this directory: + +``` +dotnet run handler-worker +``` + +In a second terminal, run the caller worker from this directory: + +``` +dotnet run caller-worker +``` + +In a third terminal, run the caller workflow from this directory: + +``` +dotnet run caller-workflow +``` + +### Output + +#### Caller Worker Output + +The caller worker output shows when the first operation completes followed by the cancellation of the other operations. + +``` +[23:21:04] info: Program[0] + Running caller worker +[23:21:11] info: Temporalio.Workflow:HelloCallerWorkflow[0] + First operation completed, cancelling remaining operations +[23:21:11] info: Temporalio.Workflow:HelloCallerWorkflow[0] + Operation was cancelled +[23:21:11] info: Temporalio.Workflow:HelloCallerWorkflow[0] + Operation was cancelled +[23:21:11] info: Temporalio.Workflow:HelloCallerWorkflow[0] + Operation was cancelled +``` + +#### Handler Worker Output + +The handler worker output shows which operations were canceled. + +``` +[23:20:59] info: Program[0] + Running handler worker +[23:21:10] info: Temporalio.Workflow:HelloHandlerWorkflow[0] + HelloHandlerWorkflow started for Temporal in Es +[23:21:10] info: Temporalio.Workflow:HelloHandlerWorkflow[0] + HelloHandlerWorkflow started for Temporal in En +[23:21:10] info: Temporalio.Workflow:HelloHandlerWorkflow[0] + HelloHandlerWorkflow started for Temporal in Fr +[23:21:10] info: Temporalio.Workflow:HelloHandlerWorkflow[0] + HelloHandlerWorkflow started for Temporal in Tr +[23:21:10] info: Temporalio.Workflow:HelloHandlerWorkflow[0] + HelloHandlerWorkflow started for Temporal in De +[23:21:14] info: Temporalio.Workflow:HelloHandlerWorkflow[0] + HelloHandlerWorkflow for Temporal in Fr was cancelled after 00:00:04 of work, performed 00:00:03 of cleanup +[23:21:14] info: Temporalio.Workflow:HelloHandlerWorkflow[0] + HelloHandlerWorkflow for Temporal in En was cancelled after 00:00:04 of work, performed 00:00:03 of cleanup +[23:21:14] info: Temporalio.Workflow:HelloHandlerWorkflow[0] + HelloHandlerWorkflow for Temporal in Es was cancelled after 00:00:04 of work, performed 00:00:03 of cleanup +``` + +#### Workflow Result + +The caller workflow output shows the result of the first completed operation. + +``` +[23:21:09] info: Program[0] + Executing caller hello workflow +[23:21:11] info: Program[0] + Workflow result: Hallo Temporal 👋 +``` + +#### Note on Timing + +As this sample uses the CancellationType of `WaitCancellationRequested` you can see that the caller workflow result logs before the cleanup work finishes on the cancelled operations. In the timing above, the caller workflow result logs at `23:21:11`, the cancellations are also logged at `23:21:11` and the handler workflows complete their 3 seconds of cleanup work and log at `23:21:14`. \ No newline at end of file diff --git a/src/NexusCancellation/TemporalioSamples.NexusCancellation.csproj b/src/NexusCancellation/TemporalioSamples.NexusCancellation.csproj new file mode 100644 index 0000000..a1bcd48 --- /dev/null +++ b/src/NexusCancellation/TemporalioSamples.NexusCancellation.csproj @@ -0,0 +1,11 @@ + + + + Exe + + + + + + + diff --git a/tests/NexusCancellation/HelloCallerWorkflowTests.cs b/tests/NexusCancellation/HelloCallerWorkflowTests.cs new file mode 100644 index 0000000..14d66c0 --- /dev/null +++ b/tests/NexusCancellation/HelloCallerWorkflowTests.cs @@ -0,0 +1,59 @@ +namespace TemporalioSamples.Tests.NexusCancellation; + +using Temporalio.Client; +using Temporalio.Worker; +using TemporalioSamples.NexusCancellation; +using TemporalioSamples.NexusCancellation.Caller; +using TemporalioSamples.NexusCancellation.Handler; +using Xunit; +using Xunit.Abstractions; + +public class HelloCallerWorkflowTests : WorkflowEnvironmentTestBase +{ + private static readonly string[] ExpectedGreetings = + [ + "Hello Temporal 👋", + "Bonjour Temporal 👋", + "Hallo Temporal 👋", + "¡Hola! Temporal 👋", + "Merhaba Temporal 👋", + ]; + + public HelloCallerWorkflowTests(ITestOutputHelper output, WorkflowEnvironment env) + : base(output, env) + { + } + + [Fact] + public async Task RunAsync_ReturnsFirstCompletedGreeting() + { + // Create endpoint + var handlerTaskQueue = $"tq-{Guid.NewGuid()}"; + await Env.TestEnv.CreateNexusEndpointAsync(IHelloService.EndpointName, handlerTaskQueue); + + // Run handler worker + using var handlerWorker = new TemporalWorker( + Client, + new TemporalWorkerOptions(handlerTaskQueue). + AddNexusService(new HelloService()). + AddWorkflow()); + await handlerWorker.ExecuteAsync(async () => + { + // Run caller worker + using var callerWorker = new TemporalWorker( + Client, + new TemporalWorkerOptions($"tq-{Guid.NewGuid()}"). + AddWorkflow()); + await callerWorker.ExecuteAsync(async () => + { + // Run workflow, confirm it returns a valid greeting + var result = await Client.ExecuteWorkflowAsync( + (HelloCallerWorkflow wf) => wf.RunAsync("Temporal"), + new(id: $"wf-{Guid.NewGuid()}", taskQueue: callerWorker.Options.TaskQueue!)); + + // Should return one of the valid greetings (whichever completes first) + Assert.Contains(result, ExpectedGreetings); + }); + }); + } +} diff --git a/tests/TemporalioSamples.Tests.csproj b/tests/TemporalioSamples.Tests.csproj index 2fa5780..9490d8a 100644 --- a/tests/TemporalioSamples.Tests.csproj +++ b/tests/TemporalioSamples.Tests.csproj @@ -27,6 +27,7 @@ +