From a576de6375baab9ccba3aeda1f68079150a850df Mon Sep 17 00:00:00 2001 From: "Elias W. BA" Date: Mon, 9 Mar 2026 23:13:51 +0000 Subject: [PATCH] feat: reverse streaming order so changes arrive before text Reverses the JSON field order in both job_chat and workflow_chat so that structured data (code_edits / YAML) is generated first and text explanation second. This allows the client to apply changes to the canvas or editor before the text explanation finishes streaming. Key changes: - Reverse assistant prefill: code_edits/yaml generated before text_answer/text - Add send_changes() to StreamManager for custom SSE "changes" event - job_chat: resolve code_edits into final code before sending changes event - workflow_chat: send parsed YAML in changes event before text streams - Add _unescape_json_string() to fix markdown rendering during streaming - Update prompt examples to match reversed field order --- services/job_chat/job_chat.py | 127 +++++++++++++----- services/job_chat/prompt.py | 12 +- services/streaming_util.py | 11 ++ .../workflow_chat/gen_project_prompts.yaml | 12 +- services/workflow_chat/workflow_chat.py | 84 +++++++----- 5 files changed, 167 insertions(+), 79 deletions(-) diff --git a/services/job_chat/job_chat.py b/services/job_chat/job_chat.py index 3b1d711b..baaf2baa 100644 --- a/services/job_chat/job_chat.py +++ b/services/job_chat/job_chat.py @@ -97,6 +97,19 @@ def __init__(self, config: Optional[ChatConfig] = None): raise ValueError("API key must be provided") self.client = Anthropic(api_key=self.api_key) + @staticmethod + def _unescape_json_string(text): + """Unescape JSON string escape sequences (e.g. \\n -> newline, \\" -> quote). + + When suggest_code is true, Claude generates text inside a JSON string value, + so newlines and quotes arrive escaped. This converts them back to actual + characters so streamed markdown renders properly. + """ + try: + return json.loads(f'"{text}"') + except (json.JSONDecodeError, ValueError): + return text + def generate( self, content: str, @@ -132,7 +145,7 @@ def generate( download_adaptor_docs=download_adaptor_docs, refresh_rag=refresh_rag ) - prompt.append({"role": "assistant", "content": '{\n "text_answer": "'}) + prompt.append({"role": "assistant", "content": '{\n "code_edits": ['}) else: system_message, prompt, retrieved_knowledge = build_old_prompt( @@ -148,10 +161,12 @@ def generate( with sentry_sdk.start_span(description="anthropic_api_call"): if stream: logger.info("Making streaming API call") - text_complete = False + text_started = False sent_length = 0 accumulated_response = "" + original_code = context.get("expression") if context and isinstance(context, dict) else None + with self.client.messages.stream( max_tokens=self.config.max_tokens, messages=prompt, @@ -159,21 +174,22 @@ def generate( system=system_message ) as stream_obj: for event in stream_obj: - accumulated_response, text_complete, sent_length = self.process_stream_event( + accumulated_response, text_started, sent_length = self.process_stream_event( event, accumulated_response, suggest_code, - text_complete, + text_started, sent_length, - stream_manager + stream_manager, + original_code ) message = stream_obj.get_final_message() - # Flush any remaining buffered content when suggest_code is true - if suggest_code and not text_complete: + # Flush any remaining buffered text + if suggest_code and text_started: if sent_length < len(accumulated_response): remaining = accumulated_response[sent_length:] - stream_manager.send_text(remaining) + stream_manager.send_text(self._unescape_json_string(remaining)) else: logger.info("Making non-streaming API call") @@ -197,7 +213,7 @@ def generate( response = "\n\n".join(response_parts) if suggest_code is True: - response = '{\n "text_answer": "' + response # Add back the prefilled opening brace + response = '{\n "code_edits": [' + response # Add back the prefilled opening if suggest_code is True: # Parse JSON response and apply code edits @@ -237,53 +253,94 @@ def generate( diff=diff ) + @staticmethod + def _resolve_code_edits(original_code, code_edits): + """Apply code edits to original code without error correction. + + Best-effort resolution for streaming preview — skips edits that + fail to apply rather than making a second LLM call. + """ + if not original_code or not code_edits: + return None + + current_code = original_code + applied = False + + for edit in code_edits: + action = edit.get("action") + if action == "rewrite": + new_code = edit.get("new_code") + if new_code: + current_code = new_code + applied = True + elif action == "replace": + old_code = edit.get("old_code") + new_code = edit.get("new_code") + if old_code and new_code is not None and old_code in current_code: + current_code = current_code.replace(old_code, new_code, 1) + applied = True + + return current_code if applied else None + def process_stream_event( self, event, accumulated_response, suggest_code, - text_complete, + text_started, sent_length, - stream_manager + stream_manager, + original_code=None ): """ Process a single stream event from the Anthropic API. + + With suggest_code, code_edits are generated first (buffered silently), + then a changes event is sent, and text_answer streams to the client. """ if event.type == "content_block_delta": if event.delta.type == "text_delta": text_chunk = event.delta.text accumulated_response += text_chunk - if suggest_code and not text_complete: - delimiter = '",\n "code_edits"' + if suggest_code and not text_started: + # Code edits phase: buffer silently until text_answer starts + delimiter = ',\n "text_answer": "' - # Stream chunks until we hit the end pattern (which might be split across chunks) if delimiter in accumulated_response: - # Get only the text part and send any remaining unsent text - text_only = accumulated_response.split(delimiter)[0] - remaining_text = text_only[sent_length:] - if remaining_text: - stream_manager.send_text(remaining_text) - - # Send "Writing code..." thinking block - stream_manager.send_thinking("Writing code...") - - text_complete = True - else: - # Buffer to avoid sending partial delimiter - # Only send content that we know won't be part of the delimiter - buffer_size = len(delimiter) - 1 - safe_to_send_until = len(accumulated_response) - buffer_size - - if safe_to_send_until > sent_length: - safe_text = accumulated_response[sent_length:safe_to_send_until] - stream_manager.send_text(safe_text) - sent_length = safe_to_send_until + # Extract code_edits JSON and resolve into final code + code_edits_raw = '[' + accumulated_response.split(delimiter)[0] + try: + code_edits = json.loads(code_edits_raw) + resolved_code = self._resolve_code_edits(original_code, code_edits) + if resolved_code: + stream_manager.send_changes({"code": resolved_code}) + else: + stream_manager.send_changes({"code_edits": code_edits}) + except json.JSONDecodeError: + logger.warning(f"Failed to parse code_edits: {code_edits_raw[:200]}") + + # Mark where text content starts in the accumulated buffer + text_offset = accumulated_response.find(delimiter) + len(delimiter) + sent_length = text_offset + text_started = True + + if suggest_code and text_started: + # Text phase: stream with buffer to handle split escape sequences + # Use 2-char buffer since longest escape is 2 chars (e.g. \n, \") + buffer_size = 2 + safe_to_send_until = len(accumulated_response) - buffer_size + + if safe_to_send_until > sent_length: + safe_text = accumulated_response[sent_length:safe_to_send_until] + stream_manager.send_text(self._unescape_json_string(safe_text)) + sent_length = safe_to_send_until + elif not suggest_code: # Normal streaming for non-code suggestions stream_manager.send_text(text_chunk) - return accumulated_response, text_complete, sent_length + return accumulated_response, text_started, sent_length def parse_and_apply_edits(self, response: str, content: str, original_code: Optional[str] = None) -> tuple[str, Optional[str], Optional[Dict[str, Any]]]: """Parse JSON response and apply code edits to original code.""" diff --git a/services/job_chat/prompt.py b/services/job_chat/prompt.py index 31cad10b..0cbd09e9 100644 --- a/services/job_chat/prompt.py +++ b/services/job_chat/prompt.py @@ -166,12 +166,12 @@ You must respond in JSON format with two fields: { - "text_answer": "Your conversational response here", - "code_edits": [] + "code_edits": [], + "text_answer": "Your conversational response here" } -Use "text_answer" for all explanations, guidance, and conversation. -Use "code_edits" only when you need to modify the user's existing code or provide new code suggestions. +Use "code_edits" when you need to modify the user's existing code or provide new code suggestions. +Use "text_answer" for all explanations, guidance, and conversation. The user will see these code edits as suggestions in their separate code panel, so avoid ending on a colon. Code edit actions: @@ -210,12 +210,12 @@ Example: { - "text_answer": "I'll add error handling after your GET request", "code_edits": [{ "action": "replace", "old_code": "get('/patients');", "new_code": "get('/patients');\\nfn(state => {\\n if (!state.data) {\\n throw new Error(\\\"No data received\\\");\\n }\\n return state;\\n});" - }] + }], + "text_answer": "I'll add error handling after your GET request" } ALWAYS use \\n instead of actual newlines: diff --git a/services/streaming_util.py b/services/streaming_util.py index 14c92bd9..79de7d73 100644 --- a/services/streaming_util.py +++ b/services/streaming_util.py @@ -171,6 +171,17 @@ def send_text(self, text_chunk: str) -> None: "delta": {"type": "text_delta", "text": text_chunk}, }) + def send_changes(self, changes_data: dict[str, Any]) -> None: + """ + Send structured changes (code edits or workflow YAML) as a custom + SSE event so the client can render them before text streams. + """ + if not self.stream_started: + self.start_stream() + + self._close_open_blocks() + self._emit_event('changes', changes_data) + def end_stream(self, stop_reason: str = "end_turn") -> None: """ End the stream by closing all open blocks and sending final events. diff --git a/services/workflow_chat/gen_project_prompts.yaml b/services/workflow_chat/gen_project_prompts.yaml index e264c76d..90d94546 100644 --- a/services/workflow_chat/gen_project_prompts.yaml +++ b/services/workflow_chat/gen_project_prompts.yaml @@ -156,8 +156,8 @@ prompts: The output should be: {{ - "text": "Your reasoning (max ~4 sentences).", - "yaml": "name: Daily CommCare to Satusehat Encounter Sync\njobs:\n Fetch-visits-from-CommCare:\n name: Fetch visits from CommCare\n adaptor: \"@openfn/language-commcare@latest\"\n body: \"// Add operations here\"\n Create-FHIR-Encounter-for-visitors-with-IHS-number:\n name: Create FHIR Encounter for visitors with IHS number\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\n Lookup-IHS-number-in-Satusehat:\n name: Lookup IHS number in Satusehat\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\n Create-FHIR-Encounter-after-IHS-lookup:\n name: Create FHIR Encounter after IHS lookup\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\ntriggers:\n cron:\n type: cron\n cron_expression: 0 0 * * *\n enabled: false\nedges:\n cron->Fetch-visits-from-CommCare:\n source_trigger: cron\n target_job: Fetch-visits-from-CommCare\n condition_type: always\n enabled: true\n Fetch-visits-from-CommCare->Create-FHIR-Encounter-for-visitors-with-IHS-number:\n source_job: Fetch-visits-from-CommCare\n target_job: Create-FHIR-Encounter-for-visitors-with-IHS-number\n condition_type: on_job_success\n enabled: true\n Fetch-visits-from-CommCare->Lookup-IHS-number-in-Satusehat:\n source_job: Fetch-visits-from-CommCare\n target_job: Lookup-IHS-number-in-Satusehat\n condition_type: on_job_success\n enabled: true\n Lookup-IHS-number-in-Satusehat->Create-FHIR-Encounter-after-IHS-lookup:\n source_job: Lookup-IHS-number-in-Satusehat\n target_job: Create-FHIR-Encounter-after-IHS-lookup\n condition_type: on_job_success\n enabled: true" + "yaml": "name: Daily CommCare to Satusehat Encounter Sync\njobs:\n Fetch-visits-from-CommCare:\n name: Fetch visits from CommCare\n adaptor: \"@openfn/language-commcare@latest\"\n body: \"// Add operations here\"\n Create-FHIR-Encounter-for-visitors-with-IHS-number:\n name: Create FHIR Encounter for visitors with IHS number\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\n Lookup-IHS-number-in-Satusehat:\n name: Lookup IHS number in Satusehat\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\n Create-FHIR-Encounter-after-IHS-lookup:\n name: Create FHIR Encounter after IHS lookup\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\ntriggers:\n cron:\n type: cron\n cron_expression: 0 0 * * *\n enabled: false\nedges:\n cron->Fetch-visits-from-CommCare:\n source_trigger: cron\n target_job: Fetch-visits-from-CommCare\n condition_type: always\n enabled: true\n Fetch-visits-from-CommCare->Create-FHIR-Encounter-for-visitors-with-IHS-number:\n source_job: Fetch-visits-from-CommCare\n target_job: Create-FHIR-Encounter-for-visitors-with-IHS-number\n condition_type: on_job_success\n enabled: true\n Fetch-visits-from-CommCare->Lookup-IHS-number-in-Satusehat:\n source_job: Fetch-visits-from-CommCare\n target_job: Lookup-IHS-number-in-Satusehat\n condition_type: on_job_success\n enabled: true\n Lookup-IHS-number-in-Satusehat->Create-FHIR-Encounter-after-IHS-lookup:\n source_job: Lookup-IHS-number-in-Satusehat\n target_job: Create-FHIR-Encounter-after-IHS-lookup\n condition_type: on_job_success\n enabled: true", + "text": "Your reasoning (max ~4 sentences)." }} ## Do NOT fill in job code @@ -167,8 +167,8 @@ prompts: ## Output Format - You must respond in JSON format with two fields: "text" and "yaml". - "text" for all explanation, and "yaml" for the YAML block. + You must respond in JSON format with two fields: "yaml" and "text". + "yaml" for the YAML block, and "text" for all explanation. unstructured_output_format: | ## Read-only Mode @@ -189,9 +189,9 @@ prompts: ## Output Format - You must respond in JSON format with two fields: "text" and "yaml". - The "text" field contains your complete answer with any YAML in triple-backticked code blocks. + You must respond in JSON format with two fields: "yaml" and "text". The "yaml" field should always be null. + The "text" field contains your complete answer with any YAML in triple-backticked code blocks. normal_mode_intro: | Your task is to talk to a client with the goal of converting their description of a workflow into an OpenFn workflow YAML. diff --git a/services/workflow_chat/workflow_chat.py b/services/workflow_chat/workflow_chat.py index 505e8fd6..3ce324aa 100644 --- a/services/workflow_chat/workflow_chat.py +++ b/services/workflow_chat/workflow_chat.py @@ -99,6 +99,18 @@ def __init__(self, config: Optional[ChatConfig] = None): raise ValueError("API key must be provided") self.client = Anthropic(api_key=self.api_key) + @staticmethod + def _unescape_json_string(text): + """Unescape JSON string escape sequences (e.g. \\n -> newline, \\" -> quote). + + When generating inside a JSON string value, newlines and quotes arrive + escaped. This converts them back so streamed markdown renders properly. + """ + try: + return json.loads(f'"{text}"') + except (json.JSONDecodeError, ValueError): + return text + def generate( self, content: str = None, @@ -140,8 +152,8 @@ def generate( read_only=read_only ) - # Add prefilled opening brace for JSON response - prompt.append({"role": "assistant", "content": '{\n "text": "'}) + # Add prefilled opening brace for JSON response (yaml first, text second) + prompt.append({"role": "assistant", "content": '{\n "yaml": "'}) accumulated_usage = { "cache_creation_input_tokens": 0, @@ -157,7 +169,7 @@ def generate( logger.info("Making streaming API call") stream_manager.send_thinking("Thinking...") - text_complete = False + text_started = False sent_length = 0 accumulated_response = "" @@ -168,20 +180,20 @@ def generate( system=system_message ) as stream_obj: for event in stream_obj: - accumulated_response, text_complete, sent_length = self.process_stream_event( + accumulated_response, text_started, sent_length = self.process_stream_event( event, accumulated_response, - text_complete, + text_started, sent_length, stream_manager ) message = stream_obj.get_final_message() - # Flush any remaining buffered content - if not text_complete: + # Flush any remaining buffered text + if text_started: if sent_length < len(accumulated_response): remaining = accumulated_response[sent_length:] - stream_manager.send_text(remaining) + stream_manager.send_text(self._unescape_json_string(remaining)) else: logger.info("Making non-streaming API call") @@ -205,8 +217,8 @@ def generate( response = "\n\n".join(response_parts) - # Add back the prefilled opening brace - response = '{\n "text": "' + response + # Add back the prefilled opening + response = '{\n "yaml": "' + response with sentry_sdk.start_span(description="parse_and_format_yaml"): @@ -473,38 +485,46 @@ def restore_components(self, yaml_data, preserved_values=None): else: edge_data["id"] = str(uuid.uuid4()) - def process_stream_event(self, event, accumulated_response, text_complete, sent_length, stream_manager): + def process_stream_event(self, event, accumulated_response, text_started, sent_length, stream_manager): """ Process a single stream event from the Anthropic API. + + YAML is generated first (buffered silently), then a changes event + is sent, and the text explanation streams to the client. """ if event.type == "content_block_delta": if event.delta.type == "text_delta": text_chunk = event.delta.text accumulated_response += text_chunk - if not text_complete: - delimiter = '",\n "yaml":' + if not text_started: + # YAML phase: buffer silently until text starts + delimiter = '",\n "text": "' - # Stream chunks until we hit the YAML part if delimiter in accumulated_response: - # Get only the text part and send any remaining unsent text - text_only = accumulated_response.split(delimiter)[0] - remaining_text = text_only[sent_length:] - if remaining_text: - stream_manager.send_text(remaining_text) - - text_complete = True - else: - # Buffer to avoid sending partial delimiter - # Only send content that we know won't be part of the delimiter - buffer_size = len(delimiter) - 1 - safe_to_send_until = len(accumulated_response) - buffer_size - - if safe_to_send_until > sent_length: - safe_text = accumulated_response[sent_length:safe_to_send_until] - stream_manager.send_text(safe_text) - sent_length = safe_to_send_until - return accumulated_response, text_complete, sent_length + # Extract YAML content and send as changes event + # yaml_raw is the string content between the prefilled opening " and delimiter " + yaml_raw = accumulated_response.split(delimiter)[0] + yaml_value = self._unescape_json_string(yaml_raw) + if yaml_value: + stream_manager.send_changes({"yaml": yaml_value}) + + # Mark where text content starts + text_offset = accumulated_response.find(delimiter) + len(delimiter) + sent_length = text_offset + text_started = True + + if text_started: + # Text phase: stream with buffer for split escape sequences + buffer_size = 2 + safe_to_send_until = len(accumulated_response) - buffer_size + + if safe_to_send_until > sent_length: + safe_text = accumulated_response[sent_length:safe_to_send_until] + stream_manager.send_text(self._unescape_json_string(safe_text)) + sent_length = safe_to_send_until + + return accumulated_response, text_started, sent_length def main(data_dict: dict) -> dict: