Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions ts/packages/agentRpc/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { createRpc } from "./rpc.js";
import { ChannelProvider } from "./common.js";
import { getObjectProperty, uint8ArrayToBase64 } from "@typeagent/common-utils";
import { AgentInterfaceFunctionName } from "./server.js";
import { randomUUID } from "crypto";

/**
* Race a promise against an AbortSignal. If the signal fires before the
Expand Down Expand Up @@ -186,6 +187,15 @@ export async function createAgentRpcClient(
) {
const channel = channelProvider.createChannel(`agent:${name}`);
const contextMap = createObjectMap<SessionContext<ShimContext>>();
// Tracks port registration handles returned by sessionContext.registerPort
// so the out-of-process agent can release them via the regId we sent back.
const registrationHandles = new Map<string, { release: () => void }>();
Comment thread
TalZaccai marked this conversation as resolved.
// Reverse index: which regIds belong to which agent context. Lets us
// release everything an out-of-process agent registered if it closes
// its context without calling releasePort for each one (crash, bug,
// forgetful agent). Without this, the agent could leak release closures
// for the lifetime of the RPC client.
const regIdsByContext = new Map<number, Set<string>>();
function getContextParam(
context: SessionContext<ShimContext>,
): ContextParams {
Expand All @@ -194,6 +204,7 @@ export async function createAgentRpcClient(
hasInstanceStorage: context.instanceStorage !== undefined,
hasSessionStorage: context.sessionStorage !== undefined,
agentContextId: context.agentContext?.contextId,
sessionContextId: context.sessionContextId,
};
}

Expand Down Expand Up @@ -326,6 +337,42 @@ export async function createAgentRpcClient(
const context = contextMap.get(param.contextId);
context.setLocalHostPort(param.port);
},
registerPort: async (param: {
contextId: number;
role: string;
port: number;
}) => {
const context = contextMap.get(param.contextId);
const handle = context.registerPort(param.role, param.port);
const regId = randomUUID();
registrationHandles.set(regId, handle);
let regIds = regIdsByContext.get(param.contextId);
if (regIds === undefined) {
regIds = new Set<string>();
regIdsByContext.set(param.contextId, regIds);
}
regIds.add(regId);
return { regId };
},
releasePort: async (param: { regId: string; contextId?: number }) => {
const handle = registrationHandles.get(param.regId);
if (handle === undefined) {
return;
}
// Defense-in-depth: only honor releasePort when the calling
// context actually owns the regId. Without this, a buggy or
// compromised out-of-process agent could pass another
// agent's regId and cancel its registration.
if (param.contextId !== undefined) {
const owned = regIdsByContext.get(param.contextId);
if (owned === undefined || !owned.has(param.regId)) {
return;
}
owned.delete(param.regId);
}
registrationHandles.delete(param.regId);
handle.release();
},
indexes: async (param: { contextId: number; type: string }) => {
const context = contextMap.get(param.contextId);
return context.indexes(param.type as any);
Expand Down Expand Up @@ -746,6 +793,26 @@ export async function createAgentRpcClient(
const invokeCloseAgentContext = result.closeAgentContext;
result.closeAgentContext = async (context: SessionContext<ShimContext>) => {
const result = await invokeCloseAgentContext?.(context);
const contextId = contextMap.getId(context);
// Backstop: release any port handles the out-of-process agent
// failed to release explicitly (crash, bug, or just forgot). Mirrors
// the dispatcher-side releaseAllForSession backstop so handles
// can't outlive the context they're scoped to.
const regIds = regIdsByContext.get(contextId);
if (regIds !== undefined) {
for (const regId of regIds) {
const handle = registrationHandles.get(regId);
if (handle !== undefined) {
registrationHandles.delete(regId);
try {
handle.release();
} catch {
// Best-effort cleanup; swallow.
}
}
}
regIdsByContext.delete(contextId);
}
contextMap.close(context);
// Clean up the options RPC channel once this agent context is closed.
// Options are agent-scoped (created once per initializeAgentContext call)
Expand Down
33 changes: 33 additions & 0 deletions ts/packages/agentRpc/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ export function createAgentRpcServer(
contextId: number,
hasInstanceStorage: boolean,
hasSessionStorage: boolean,
sessionContextId: string,
context: any,
): SessionContext<any> {
const dynamicAgentRpcServer = new Map<string, () => void>();
Expand All @@ -450,6 +451,7 @@ export function createAgentRpcServer(
instanceStorage: hasInstanceStorage
? getStorage(contextId, false)
: undefined,
sessionContextId,
notify: (
event: AppAgentEvent,
message: string | DisplayContent,
Expand Down Expand Up @@ -529,6 +531,32 @@ export function createAgentRpcServer(
.invoke("setLocalHostPort", { contextId, port })
.catch();
},
registerPort(role: string, port: number) {
// Fire-and-forget the invoke; resolve the regId lazily so
// release() waits for the round-trip if it gets called
// before the registration response arrives.
const regIdPromise: Promise<string> = rpc
.invoke("registerPort", { contextId, role, port })
.then((r: { regId: string }) => r.regId);
regIdPromise.catch(() => {
// Swallow registration failures here — they're logged
// on the dispatcher side via the registrar; throwing
// synchronously from registerPort would force every
// agent caller to add try/catch around the bind path.
});
return {
release: () => {
void regIdPromise
.then((regId) =>
rpc.invoke("releasePort", {
regId,
contextId,
}),
)
.catch();
},
};
},
addDynamicAgent: async (
name: string,
manifest: AppAgentManifest,
Expand Down Expand Up @@ -638,6 +666,7 @@ export function createAgentRpcServer(
contextId,
hasInstanceStorage,
hasSessionStorage,
sessionContextId,
agentContextId,
} = param;
if (contextId === undefined) {
Expand All @@ -651,6 +680,9 @@ export function createAgentRpcServer(
if (hasSessionStorage === undefined) {
throw new Error("Invalid context param: missing hasSessionStorage");
}
if (sessionContextId === undefined) {
throw new Error("Invalid context param: missing sessionContextId");
}

const agentContext =
agentContextId !== undefined
Expand All @@ -661,6 +693,7 @@ export function createAgentRpcServer(
contextId,
hasInstanceStorage,
hasSessionStorage,
sessionContextId,
agentContext,
);
}
Expand Down
10 changes: 10 additions & 0 deletions ts/packages/agentRpc/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ export type AgentContextInvokeFunctions = {
contextId: number;
port: number;
}) => Promise<void>;
registerPort: (param: {
contextId: number;
role: string;
port: number;
}) => Promise<{ regId: string }>;
releasePort: (param: {
regId: string;
contextId?: number;
}) => Promise<void>;
indexes: (param: { contextId: number; type: string }) => Promise<any>;
reloadAgentSchema: (param: { contextId: number }) => Promise<void>;
popupQuestion: (param: {
Expand Down Expand Up @@ -273,6 +282,7 @@ export type ContextParams = {
hasInstanceStorage: boolean;
hasSessionStorage: boolean;
agentContextId: number | undefined;
sessionContextId: string;
};

export type ActionContextParams = ContextParams & {
Expand Down
52 changes: 48 additions & 4 deletions ts/packages/agentSdk/src/agentInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,15 @@ export interface SessionContext<T = unknown> {
readonly sessionStorage: Storage | undefined;
readonly instanceStorage: Storage | undefined; // storage that are preserved across sessions

/**
* Opaque identifier for the agent's current session-context lifetime.
* Re-generated each time the agent is (re-)initialized; stable for the
* lifetime of this `SessionContext` instance. Provided so out-of-process
* agents (via agent-rpc) can scope port registrations to their session;
* most in-process agents do not need to read this directly.
*/
readonly sessionContextId: string;

notify(
event: AppAgentEvent,
message: string | DisplayContent,
Expand Down Expand Up @@ -302,12 +311,47 @@ export interface SessionContext<T = unknown> {
// The dispatcher will call getDynamicSchema/getDynamicGrammar to get the updated content.
reloadAgentSchema(): Promise<void>;

// Experimental: get the shared local host port
getSharedLocalHostPort(agentName: string): Promise<number>;

// Experimental: update this agent's bound local host port (used after OS port assignment)
/**
* Register a port this agent has just bound (typically with
* `bind(0)` so the OS picks a free ephemeral port). The dispatcher
* records the `(agent, role, port)` tuple in its `PortRegistrar`
* so other in-process agents and out-of-process clients (Chrome
* extension, VS Code extension, etc.) can discover it.
*
* `role` is a free-form, agent-defined string scoping the
* registration within this agent — e.g. `"default"`, `"ws-bridge"`,
* `"http-debug"`. Each agent owns its role namespace and should
* export role constants for in-process and out-of-process callers
* to import; the SDK and discovery layer treat roles as opaque
* strings so adding a role to one agent never requires coordinated
* changes elsewhere. Use distinct roles when an agent exposes
* multiple listeners.
*
* Returns a `release()` callback the agent should invoke when the
* listener is torn down. Forgetting to release is non-fatal — the
* dispatcher releases all of an agent's allocations as a backstop
* when the agent's session context closes — but explicit release
* keeps lookups accurate while the agent is still alive.
*/
registerPort(role: string, port: number): { release: () => void };
Comment thread
TalZaccai marked this conversation as resolved.

/**
* @deprecated Use {@link registerPort} with a meaningful role
* instead. Kept for backwards compatibility with agents that
* predate the multi-role registrar; routes through the registrar
* with `role="default"`.
*/
setLocalHostPort(port: number): void;

/**
* @deprecated Use {@link lookupPort} (out-of-process discovery
* channel) or, for in-process cross-agent lookups, ask the target
* agent directly. Kept for backwards compatibility; routes through
* the registrar with `role="default"` and enforces the existing
* `manifest.sharedLocalView` permission check.
*/
getSharedLocalHostPort(agentName: string): Promise<number>;

// Experimental: get the available indexes
indexes(type: "image" | "email" | "website" | "all"): Promise<any[]>;

Expand Down
4 changes: 4 additions & 0 deletions ts/packages/agentServer/client/jest.config.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

module.exports = require("../../../jest.config.js");
14 changes: 11 additions & 3 deletions ts/packages/agentServer/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
"author": "Microsoft",
"type": "module",
"exports": {
".": "./dist/index.js"
".": "./dist/index.js",
"./discovery": "./dist/discovery.js"
},
"files": [
"dist",
Expand All @@ -21,9 +22,12 @@
"scripts": {
"build": "npm run tsc",
"clean": "rimraf --glob dist *.tsbuildinfo *.done.build.log",
"jest-esm": "node --no-warnings --experimental-vm-modules ./node_modules/jest/bin/jest.js",
"prettier": "prettier --check . --ignore-path ../../../.prettierignore",
"prettier:fix": "prettier --write . --ignore-path ../../../.prettierignore",
"tsc": "tsc -b"
"test": "npm run test:local",
"test:local": "pnpm run jest-esm --testPathPattern=\".*[.]spec[.]js\"",
"tsc": "tsc -b src test"
},
"dependencies": {
"@typeagent/agent-rpc": "workspace:*",
Expand All @@ -34,9 +38,13 @@
},
"devDependencies": {
"@types/debug": "^4.1.12",
"@types/jest": "^29.5.7",
"@types/ws": "^8.5.10",
"jest": "^29.7.0",
"prettier": "^3.5.3",
"rimraf": "^6.0.1",
"typescript": "~5.4.5"
"typescript": "~5.4.5",
"websocket-channel-server": "workspace:*",
"ws": "^8.17.1"
}
}
9 changes: 5 additions & 4 deletions ts/packages/agentServer/client/src/agentServerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import registerDebug from "debug";
import {
AgentServerInvokeFunctions,
AgentServerChannelName,
AGENT_SERVER_DEFAULT_PORT,
DispatcherConnectOptions,
ConversationInfo,
JoinConversationResult,
Expand Down Expand Up @@ -435,7 +436,7 @@ async function waitForServer(
}

export async function ensureAgentServer(
port: number = 8999,
port: number = AGENT_SERVER_DEFAULT_PORT,
hidden: boolean = false,
idleTimeout: number = 0,
): Promise<void> {
Expand All @@ -459,7 +460,7 @@ export async function ensureAgentServer(

export async function ensureAndConnectDispatcher(
clientIO: ClientIO,
port: number = 8999,
port: number = AGENT_SERVER_DEFAULT_PORT,
options?: DispatcherConnectOptions,
onDisconnect?: () => void,
hidden: boolean = false,
Expand All @@ -471,7 +472,7 @@ export async function ensureAndConnectDispatcher(

export async function ensureAndConnectConversation(
clientIO: ClientIO,
port: number = 8999,
port: number = AGENT_SERVER_DEFAULT_PORT,
options?: DispatcherConnectOptions,
onDisconnect?: () => void,
hidden: boolean = false,
Expand All @@ -488,7 +489,7 @@ export async function ensureAndConnectConversation(
}

export async function stopAgentServer(
port: number = 8999,
port: number = AGENT_SERVER_DEFAULT_PORT,
force: boolean = false,
): Promise<void> {
const url = `ws://localhost:${port}`;
Expand Down
Loading
Loading