Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 165 additions & 29 deletions Runware/Runware-base.ts

Large diffs are not rendered by default.

148 changes: 84 additions & 64 deletions Runware/Runware-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ export class RunwareServer extends RunwareBase {
_instantiated: boolean = false;
_listeners: any[] = [];
_reconnectingIntervalId: null | any = null;
_pingTimeout: any;
_pongListener: any;
private _connecting: boolean = false;

constructor(props: RunwareBaseType) {
super(props);
Expand All @@ -22,91 +21,99 @@ export class RunwareServer extends RunwareBase {
this.connect();
}

// protected addListener({
// lis,
// check,
// groupKey,
// }: {
// lis: (v: any) => any;
// check: (v: any) => any;
// groupKey?: string;
// }) {
// const listener = (msg: any) => {
// if (msg?.error) {
// lis(msg);
// } else if (check(msg)) {
// lis(msg);
// }
// };
// const groupListener = { key: getUUID(), listener, groupKey };
// this._listeners.push(groupListener);
// const destroy = () => {
// this._listeners = removeListener(this._listeners, groupListener);
// };

// return {
// destroy,
// };
// }

protected async connect() {
if (!this._url) return;
if (this._connecting) return;
this._connecting = true;

this.resetConnection();

const url = buildSdkUrl(this._url);
this._ws = new WebSocket(url, {
perMessageDeflate: false,
headers: {
"X-SDK-Name": "js",
"X-SDK-Version": SDK_VERSION,
},
});
try {
const url = buildSdkUrl(this._url);
this._logger.connecting(url);

// delay(1);
this._ws = new WebSocket(url, {
perMessageDeflate: false,
headers: {
"X-SDK-Name": "js",
"X-SDK-Version": SDK_VERSION,
},
});
} catch (err) {
this._connecting = false;
this._logger.connectionError(err);
return;
}

this._ws.on("error", (err: any) => {
this._connecting = false;
this._logger.connectionError(err?.message || err);
});

this._ws.on("error", () => {});
this._ws.on("close", () => {
this.handleClose();
});

this._ws.on("open", () => {
this._ws.on("open", async () => {
if (this._reconnectingIntervalId) {
clearInterval(this._reconnectingIntervalId);
}
if (this._connectionSessionUUID && this.isWebsocketReadyState()) {
this.send({
taskType: ETaskType.AUTHENTICATION,
apiKey: this._apiKey,
connectionSessionUUID: this._connectionSessionUUID,
});
} else {
if (this.isWebsocketReadyState()) {
this.send({
apiKey: this._apiKey,

this._logger.authenticating(!!this._connectionSessionUUID);

try {
if (this._connectionSessionUUID && this.isWebsocketReadyState()) {
await this.send({
taskType: ETaskType.AUTHENTICATION,
apiKey: this._apiKey,
connectionSessionUUID: this._connectionSessionUUID,
});
} else {
if (this.isWebsocketReadyState()) {
await this.send({
apiKey: this._apiKey,
taskType: ETaskType.AUTHENTICATION,
});
}
}
} catch (err) {
this._connecting = false;
this._logger.error("Failed to send auth message", err);
return;
}

this.addListener({
const authListener = this.addListener({
taskUUID: ETaskType.AUTHENTICATION,
lis: (m) => {
this._connecting = false;
if (m?.error) {
this._connectionError = m;
this._logger.authError(m);
authListener?.destroy?.();
return;
}
this._connectionSessionUUID =
m?.[ETaskType.AUTHENTICATION]?.[0]?.connectionSessionUUID;
this._connectionError = undefined;
this._logger.authenticated(this._connectionSessionUUID || "");
authListener?.destroy?.();
this.startHeartbeat();
},
});
});

this._ws.on("message", (e: any, isBinary: any) => {
const data = isBinary ? e : e?.toString();
if (!data) return;
const m = JSON.parse(data);
let m: any;
try {
m = JSON.parse(data);
} catch (err) {
this._logger.error("Failed to parse WebSocket message", err);
return;
}

if (this.handlePongMessage(m)) return;

this._listeners.forEach((lis) => {
const result = lis.listener(m);
Expand All @@ -117,11 +124,33 @@ export class RunwareServer extends RunwareBase {
});
}

protected send = (msg: Object) => {
protected send = async (msg: Object) => {
if (!this.isWebsocketReadyState()) {
this._logger.sendReconnecting();
if (this._ws) {
try {
if (typeof this._ws.terminate === "function") {
this._ws.terminate();
} else {
this._ws.close();
}
} catch {}
}
this._connectionSessionUUID = undefined;
// ensureConnection either resolves (ws ready) or throws
await this.ensureConnection();
}
const taskType = (msg as any)?.taskType;
const taskUUID = (msg as any)?.taskUUID;
this._logger.messageSent(taskType, taskUUID);
this._ws.send(JSON.stringify([msg]));
};

protected handleClose() {
this._connecting = false;
this._logger.connectionClosed();
this._connectionSessionUUID = undefined;
this.stopHeartbeat();
if (this.isInvalidAPIKey()) {
return;
}
Expand All @@ -130,12 +159,13 @@ export class RunwareServer extends RunwareBase {
}

if (this._shouldReconnect) {
this._logger.reconnectScheduled(1000);
setTimeout(() => this.connect(), 1000);
}
// this._reconnectingIntervalId = setInterval(() => this.connect(), 1000);
}

protected resetConnection = () => {
this.stopHeartbeat();
if (this._ws) {
this._listeners.forEach((list) => {
list?.destroy?.();
Expand All @@ -151,15 +181,5 @@ export class RunwareServer extends RunwareBase {
}
};

protected heartBeat() {
clearTimeout(this._pingTimeout);

this._pingTimeout = setTimeout(() => {
if (this.isWebsocketReadyState()) {
this.send({ ping: true });
}
}, 5000);
}

//end of data
}
25 changes: 22 additions & 3 deletions Runware/async-retry.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,48 @@
import { delay } from "./utils";
import type { RunwareLogger } from "./logger";

export const asyncRetry = async (
apiCall: Function,
options: {
maxRetries?: number;
delayInSeconds?: number;
callback?: Function;
logger?: RunwareLogger;
} = {}
) => {
const { delayInSeconds = 1, callback } = options;
): Promise<any> => {
const { delayInSeconds = 1, callback, logger } = options;
let maxRetries = options.maxRetries ?? 1;
const initialMaxRetries = maxRetries;

// Fix: maxRetries=0 should execute apiCall once with no retries
if (maxRetries <= 0) {
return await apiCall();
}

while (maxRetries) {
try {
const result = await apiCall();
if (maxRetries < initialMaxRetries) {
logger?.retrySuccess(initialMaxRetries - maxRetries + 1);
}
return result; // Return the result if successful
} catch (error: any) {
callback?.();
// Fix: API errors (with .error property) throw immediately — no callback, no retry
if (error?.error) {
logger?.retrySkippedApiError(error.error?.code || "unknown");
throw error;
}

// Only call callback for retryable errors (network/timeout)
callback?.();

maxRetries--;
if (maxRetries > 0) {
logger?.retryAttempt(initialMaxRetries - maxRetries, initialMaxRetries, delayInSeconds * 1000);
await delay(delayInSeconds); // Delay before the next retry
return await asyncRetry(apiCall, { ...options, maxRetries });
} else {
logger?.retryExhausted(initialMaxRetries);
throw error; // Throw the error if max retries are reached
}
}
Expand Down
1 change: 1 addition & 0 deletions Runware/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export * from "./types";

export * from "./Runware-server";
export * from "./Runware";
export { RunwareLogger, LogLevel, createLogger } from "./logger";
export { SDK_VERSION } from "./utils";
Loading
Loading