Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changeset/async-serialization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"@workflow/core": patch
"@workflow/world": patch
"@workflow/cli": patch
"@workflow/web": patch
"@workflow/world-testing": patch
---

Make serialization functions async with Encryptor interface

All 8 dehydrate/hydrate functions in the serialization layer are now async and accept an `Encryptor` parameter for future encryption support. Adds `Encryptor`, `EncryptionContext`, and `KeyMaterial` interfaces to `@workflow/world`. This is a no-op refactor — the encryptor parameter is unused in this change.
30 changes: 21 additions & 9 deletions packages/cli/src/lib/inspect/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,9 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
},
resolveData,
});
const runsWithHydratedIO = runs.data.map(hydrateResourceIO);
const runsWithHydratedIO = await Promise.all(
runs.data.map(async (run) => hydrateResourceIO(run, world))
);
showJson({ ...runs, data: runsWithHydratedIO });
return;
} catch (error) {
Expand Down Expand Up @@ -572,7 +574,9 @@ export const listRuns = async (world: World, opts: InspectCLIOptions = {}) => {
}
},
displayPage: async (runs) => {
const runsWithHydratedIO = runs.map(hydrateResourceIO);
const runsWithHydratedIO = await Promise.all(
runs.map(async (run) => hydrateResourceIO(run, world))
);
logger.log(showTable(runsWithHydratedIO, props, opts));
},
});
Expand All @@ -588,7 +592,9 @@ export const getRecentRun = async (
pagination: { limit: 1, sortOrder: opts.sort || 'desc' },
resolveData: 'none', // Don't need data for just getting the ID
});
runs.data = runs.data.map(hydrateResourceIO);
runs.data = await Promise.all(
runs.data.map(async (run) => hydrateResourceIO(run, world))
);
return runs.data[0];
} catch (error) {
if (handleApiError(error, opts.backend)) {
Expand All @@ -608,7 +614,7 @@ export const showRun = async (
}
try {
const run = await world.runs.get(runId, { resolveData: 'all' });
const runWithHydratedIO = hydrateResourceIO(run);
const runWithHydratedIO = await hydrateResourceIO(run, world);
if (opts.json) {
showJson(runWithHydratedIO);
return;
Expand Down Expand Up @@ -711,7 +717,9 @@ export const listSteps = async (
}
},
displayPage: async (steps) => {
const stepsWithHydratedIO = steps.map(hydrateResourceIO);
const stepsWithHydratedIO = await Promise.all(
steps.map(async (step) => hydrateResourceIO(step, world))
);
logger.log(showTable(stepsWithHydratedIO, props, opts));
showInspectInfoBox('step');
},
Expand All @@ -735,7 +743,7 @@ export const showStep = async (
const step = await world.steps.get(opts.runId, stepId, {
resolveData: 'all',
});
const stepWithHydratedIO = hydrateResourceIO(step);
const stepWithHydratedIO = await hydrateResourceIO(step, world);
if (opts.json) {
showJson(stepWithHydratedIO);
return;
Expand Down Expand Up @@ -950,7 +958,9 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => {
},
resolveData,
});
const hydratedHooks = hooks.data.map(hydrateResourceIO);
const hydratedHooks = await Promise.all(
hooks.data.map(async (hook) => hydrateResourceIO(hook, world))
);
showJson({ ...hooks, data: hydratedHooks });
return;
} catch (error) {
Expand Down Expand Up @@ -994,7 +1004,9 @@ export const listHooks = async (world: World, opts: InspectCLIOptions = {}) => {
}
},
displayPage: async (hooks) => {
const hydratedHooks = hooks.map(hydrateResourceIO);
const hydratedHooks = await Promise.all(
hooks.map(async (hook) => hydrateResourceIO(hook, world))
);
logger.log(showTable(hydratedHooks, HOOK_LISTED_PROPS, opts));
showInspectInfoBox('hook');
},
Expand All @@ -1013,7 +1025,7 @@ export const showHook = async (
const hook = await world.hooks.get(hookId, {
resolveData: 'all',
});
const hydratedHook = hydrateResourceIO(hook);
const hydratedHook = await hydrateResourceIO(hook, world);
if (opts.json) {
showJson(hydratedHook);
return;
Expand Down
14 changes: 11 additions & 3 deletions packages/core/src/observability.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { inspect } from 'node:util';
import { WORKFLOW_DESERIALIZE, WORKFLOW_SERIALIZE } from '@workflow/serde';
import type { Encryptor } from '@workflow/world';
import { describe, expect, it } from 'vitest';
import { registerSerializationClass } from './class-serialization.js';
import {
Expand All @@ -15,6 +16,8 @@ import {
} from './observability.js';
import { dehydrateStepReturnValue } from './serialization.js';

const mockEncryptor: Encryptor = {};

describe('ClassInstanceRef', () => {
describe('constructor and properties', () => {
it('should create instance with correct properties', () => {
Expand Down Expand Up @@ -332,10 +335,15 @@ describe('hydrateResourceIO with custom class instances', () => {
(TestPoint as any).classId = 'test//TestPoint';
registerSerializationClass('test//TestPoint', TestPoint);

it('should convert Instance type to ClassInstanceRef in step output', () => {
it('should convert Instance type to ClassInstanceRef in step output', async () => {
// Simulate serialized step data with a custom class instance
const point = new TestPoint(3, 4);
const serialized = dehydrateStepReturnValue(point, [], 'wrun_test');
const serialized = await dehydrateStepReturnValue(
point,
'wrun_test',
{},
[]
);

// Create a step resource with serialized output
const step = {
Expand All @@ -346,7 +354,7 @@ describe('hydrateResourceIO with custom class instances', () => {

// Hydrate the step - this should convert Instance to ClassInstanceRef
// because the class is not registered in the o11y context (streamPrintRevivers)
const hydrated = hydrateResourceIO(step);
const hydrated = await hydrateResourceIO(step, mockEncryptor);

// The output should be a ClassInstanceRef
expect(isClassInstanceRef(hydrated.output)).toBe(true);
Expand Down
73 changes: 45 additions & 28 deletions packages/core/src/observability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import { inspect } from 'node:util';
import { parseClassName } from '@workflow/utils/parse-name';
import type { Encryptor } from '@workflow/world';
import { unflatten } from 'devalue';
import { runtimeLogger } from './logger.js';
import {
Expand Down Expand Up @@ -254,20 +255,22 @@ const hydrateLegacyData = (data: any[]): unknown => {
return unflatten(data, getObservabilityRevivers());
};

const hydrateStepIO = <
const hydrateStepIO = async <
T extends { stepId?: string; input?: any; output?: any; runId?: string },
>(
step: T
): T => {
step: T,
encryptor: Encryptor
): Promise<T> => {
let hydratedInput = step.input;
let hydratedOutput = step.output;

// Hydrate input - handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(step.input) && step.input.byteLength > 0) {
hydratedInput = hydrateStepArguments(
hydratedInput = await hydrateStepArguments(
step.input,
[],
step.runId as string,
encryptor,
[],
globalThis,
streamPrintRevivers
);
Expand All @@ -277,8 +280,10 @@ const hydrateStepIO = <

// Hydrate output - handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(step.output)) {
hydratedOutput = hydrateStepReturnValue(
hydratedOutput = await hydrateStepReturnValue(
step.output,
step.runId as string,
encryptor,
globalThis,
streamPrintRevivers
);
Expand All @@ -293,18 +298,21 @@ const hydrateStepIO = <
};
};

const hydrateWorkflowIO = <
const hydrateWorkflowIO = async <
T extends { runId?: string; input?: any; output?: any },
>(
workflow: T
): T => {
workflow: T,
encryptor: Encryptor
): Promise<T> => {
let hydratedInput = workflow.input;
let hydratedOutput = workflow.output;

// Hydrate input - handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(workflow.input) && workflow.input.byteLength > 0) {
hydratedInput = hydrateWorkflowArguments(
hydratedInput = await hydrateWorkflowArguments(
workflow.input,
workflow.runId as string,
encryptor,
globalThis,
streamPrintRevivers
);
Expand All @@ -314,10 +322,11 @@ const hydrateWorkflowIO = <

// Hydrate output - handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(workflow.output)) {
hydratedOutput = hydrateWorkflowReturnValue(
hydratedOutput = await hydrateWorkflowReturnValue(
workflow.output,
[],
workflow.runId as string,
encryptor,
[],
globalThis,
streamPrintRevivers
);
Expand All @@ -332,11 +341,12 @@ const hydrateWorkflowIO = <
};
};

const hydrateEventData = <
const hydrateEventData = async <
T extends { eventId?: string; eventData?: any; runId?: string },
>(
event: T
): T => {
event: T,
encryptor: Encryptor
): Promise<T> => {
if (!event.eventData) {
return event;
}
Expand All @@ -348,8 +358,10 @@ const hydrateEventData = <
if ('result' in eventData && typeof eventData.result === 'object') {
// Handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(eventData.result)) {
eventData.result = hydrateStepReturnValue(
eventData.result = await hydrateStepReturnValue(
eventData.result,
event.runId as string,
encryptor,
globalThis,
streamPrintRevivers
);
Expand All @@ -369,18 +381,22 @@ const hydrateEventData = <
};
};

const hydrateHookMetadata = <T extends { hookId?: string; metadata?: any }>(
hook: T
): T => {
const hydrateHookMetadata = async <
T extends { hookId?: string; metadata?: any },
>(
hook: T,
encryptor: Encryptor
): Promise<T> => {
let hydratedMetadata = hook.metadata;

if (hook.metadata && 'runId' in hook) {
// Handle both binary (specVersion 2) and legacy (specVersion 1) formats
if (isBinaryFormat(hook.metadata)) {
hydratedMetadata = hydrateStepArguments(
hydratedMetadata = await hydrateStepArguments(
hook.metadata,
[],
hook.runId as string,
encryptor,
[],
globalThis,
streamPrintRevivers
);
Expand All @@ -395,7 +411,7 @@ const hydrateHookMetadata = <T extends { hookId?: string; metadata?: any }>(
};
};

export const hydrateResourceIO = <
export const hydrateResourceIO = async <
T extends {
stepId?: string;
hookId?: string;
Expand All @@ -407,20 +423,21 @@ export const hydrateResourceIO = <
executionContext?: any;
},
>(
resource: T
): T => {
resource: T,
encryptor: Encryptor
): Promise<T> => {
if (!resource) {
return resource;
}
let hydrated: T;
if ('stepId' in resource) {
hydrated = hydrateStepIO(resource);
hydrated = await hydrateStepIO(resource, encryptor);
} else if ('hookId' in resource) {
hydrated = hydrateHookMetadata(resource);
hydrated = await hydrateHookMetadata(resource, encryptor);
} else if ('eventId' in resource) {
hydrated = hydrateEventData(resource);
hydrated = await hydrateEventData(resource, encryptor);
} else {
hydrated = hydrateWorkflowIO(resource);
hydrated = await hydrateWorkflowIO(resource, encryptor);
}
if ('executionContext' in hydrated) {
const { executionContext, ...rest } = hydrated;
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/private.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* Utils used by the bundler when transforming code
*/

import type { Encryptor } from '@workflow/world';
import type { EventsConsumer } from './events-consumer.js';
import type { QueueItem } from './global.js';
import type { Serializable } from './schemas.js';
Expand Down Expand Up @@ -49,4 +50,8 @@ export interface WorkflowOrchestratorContext {
onWorkflowError: (error: Error) => void;
generateUlid: () => string;
generateNanoid: () => string;
/** The workflow run ID */
runId: string;
/** Encryptor for serialization (optional encryption support) */
encryptor: Encryptor;
}
22 changes: 7 additions & 15 deletions packages/core/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ export {
export {
cancelRun,
listStreams,
type ReadStreamOptions,
type RecreateRunOptions,
readStream,
recreateRunFromExisting,
reenqueueRun,
type ReadStreamOptions,
type RecreateRunOptions,
type StopSleepOptions,
type StopSleepResult,
wakeUpRun,
Expand Down Expand Up @@ -219,19 +219,11 @@ export function workflowEntrypoint(
events.push(result.event!);
}

const result = await trace(
'workflow.replay',
{},
async (replaySpan) => {
replaySpan?.setAttributes({
...Attribute.WorkflowEventsCount(events.length),
});
return await runWorkflow(
workflowCode,
workflowRun,
events
);
}
const result = await runWorkflow(
workflowCode,
workflowRun,
events,
world
);

// Complete the workflow run via event (event-sourced architecture)
Expand Down
Loading
Loading