Skip to content

Commit c58aa16

Browse files
authored
Merge pull request #85 from kesku/fix/deep-research-streaming
fix: enable streaming for sonar-deep-research
2 parents 9b4218e + dfe9a26 commit c58aa16

2 files changed

Lines changed: 109 additions & 10 deletions

File tree

src/index.test.ts

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -473,14 +473,35 @@ describe("Perplexity MCP Server", () => {
473473
const models = ["sonar-pro", "sonar-deep-research", "sonar-reasoning-pro"];
474474

475475
for (const model of models) {
476-
const mockResponse = {
477-
choices: [{ message: { content: `Response from ${model}` } }],
478-
};
479-
480-
global.fetch = vi.fn().mockResolvedValue({
481-
ok: true,
482-
json: async () => mockResponse,
483-
} as Response);
476+
if (model === "sonar-deep-research") {
477+
// sonar-deep-research uses streaming, so provide an SSE mock
478+
const sseData = [
479+
`data: ${JSON.stringify({ choices: [{ delta: { content: "Response " } }] })}\n\n`,
480+
`data: ${JSON.stringify({ choices: [{ delta: { content: `from ${model}` } }] })}\n\n`,
481+
`data: [DONE]\n\n`,
482+
].join("");
483+
484+
const stream = new ReadableStream({
485+
start(controller) {
486+
controller.enqueue(new TextEncoder().encode(sseData));
487+
controller.close();
488+
},
489+
});
490+
491+
global.fetch = vi.fn().mockResolvedValue({
492+
ok: true,
493+
body: stream,
494+
} as unknown as Response);
495+
} else {
496+
const mockResponse = {
497+
choices: [{ message: { content: `Response from ${model}` } }],
498+
};
499+
500+
global.fetch = vi.fn().mockResolvedValue({
501+
ok: true,
502+
json: async () => mockResponse,
503+
} as Response);
504+
}
484505

485506
const messages = [{ role: "user", content: "test" }];
486507
const result = await performChatCompletion(messages, model);

src/server.ts

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,77 @@ export function stripThinkingTokens(content: string): string {
6060
return content.replace(/<think>[\s\S]*?<\/think>/g, '').trim();
6161
}
6262

63+
export async function consumeSSEStream(response: Response): Promise<ChatCompletionResponse> {
64+
const body = response.body;
65+
if (!body) {
66+
throw new Error("Response body is null");
67+
}
68+
69+
const reader = (body as ReadableStream<Uint8Array>).getReader();
70+
const decoder = new TextDecoder();
71+
72+
let contentParts: string[] = [];
73+
let citations: string[] | undefined;
74+
let usage: ChatCompletionResponse["usage"] | undefined;
75+
let id: string | undefined;
76+
let model: string | undefined;
77+
let created: number | undefined;
78+
let buffer = "";
79+
80+
while (true) {
81+
const { done, value } = await reader.read();
82+
if (done) break;
83+
84+
buffer += decoder.decode(value, { stream: true });
85+
86+
const lines = buffer.split("\n");
87+
// Keep the last potentially incomplete line in the buffer
88+
buffer = lines.pop() || "";
89+
90+
for (const line of lines) {
91+
const trimmed = line.trim();
92+
if (!trimmed || !trimmed.startsWith("data:")) continue;
93+
94+
const data = trimmed.slice("data:".length).trim();
95+
if (data === "[DONE]") continue;
96+
97+
try {
98+
const parsed = JSON.parse(data);
99+
100+
if (parsed.id) id = parsed.id;
101+
if (parsed.model) model = parsed.model;
102+
if (parsed.created) created = parsed.created;
103+
if (parsed.citations) citations = parsed.citations;
104+
if (parsed.usage) usage = parsed.usage;
105+
106+
const delta = parsed.choices?.[0]?.delta;
107+
if (delta?.content) {
108+
contentParts.push(delta.content);
109+
}
110+
} catch {
111+
// Skip malformed JSON chunks (e.g. keep-alive pings)
112+
}
113+
}
114+
}
115+
116+
const assembled: ChatCompletionResponse = {
117+
choices: [
118+
{
119+
message: { content: contentParts.join("") },
120+
finish_reason: "stop",
121+
index: 0,
122+
},
123+
],
124+
...(citations && { citations }),
125+
...(usage && { usage }),
126+
...(id && { id }),
127+
...(model && { model }),
128+
...(created && { created }),
129+
};
130+
131+
return ChatCompletionResponseSchema.parse(assembled);
132+
}
133+
63134
export async function performChatCompletion(
64135
messages: Message[],
65136
model: string = "sonar-pro",
@@ -74,10 +145,13 @@ export async function performChatCompletion(
74145
// Read timeout fresh each time to respect env var changes
75146
const TIMEOUT_MS = parseInt(process.env.PERPLEXITY_TIMEOUT_MS || "300000", 10);
76147

148+
const useStreaming = model === "sonar-deep-research";
149+
77150
const url = new URL(`${PERPLEXITY_BASE_URL}/chat/completions`);
78151
const body: Record<string, unknown> = {
79152
model: model,
80153
messages: messages,
154+
...(useStreaming && { stream: true }),
81155
...(options?.search_recency_filter && { search_recency_filter: options.search_recency_filter }),
82156
...(options?.search_domain_filter && { search_domain_filter: options.search_domain_filter }),
83157
...(options?.search_context_size && { web_search_options: { search_context_size: options.search_context_size } }),
@@ -125,8 +199,12 @@ export async function performChatCompletion(
125199

126200
let data: ChatCompletionResponse;
127201
try {
128-
const json = await response.json();
129-
data = ChatCompletionResponseSchema.parse(json);
202+
if (useStreaming) {
203+
data = await consumeSSEStream(response);
204+
} else {
205+
const json = await response.json();
206+
data = ChatCompletionResponseSchema.parse(json);
207+
}
130208
} catch (error) {
131209
if (error instanceof z.ZodError) {
132210
const issues = error.issues;

0 commit comments

Comments
 (0)