Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 92 additions & 35 deletions services/job_chat/job_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,19 @@ def __init__(self, config: Optional[ChatConfig] = None):
raise ValueError("API key must be provided")
self.client = Anthropic(api_key=self.api_key)

@staticmethod
def _unescape_json_string(text):
"""Unescape JSON string escape sequences (e.g. \\n -> newline, \\" -> quote).

When suggest_code is true, Claude generates text inside a JSON string value,
so newlines and quotes arrive escaped. This converts them back to actual
characters so streamed markdown renders properly.
"""
try:
return json.loads(f'"{text}"')
except (json.JSONDecodeError, ValueError):
return text

def generate(
self,
content: str,
Expand Down Expand Up @@ -132,7 +145,7 @@ def generate(
download_adaptor_docs=download_adaptor_docs,
refresh_rag=refresh_rag
)
prompt.append({"role": "assistant", "content": '{\n "text_answer": "'})
prompt.append({"role": "assistant", "content": '{\n "code_edits": ['})

else:
system_message, prompt, retrieved_knowledge = build_old_prompt(
Expand All @@ -148,32 +161,35 @@ def generate(
with sentry_sdk.start_span(description="anthropic_api_call"):
if stream:
logger.info("Making streaming API call")
text_complete = False
text_started = False
sent_length = 0
accumulated_response = ""

original_code = context.get("expression") if context and isinstance(context, dict) else None

with self.client.messages.stream(
max_tokens=self.config.max_tokens,
messages=prompt,
model=self.config.model,
system=system_message
) as stream_obj:
for event in stream_obj:
accumulated_response, text_complete, sent_length = self.process_stream_event(
accumulated_response, text_started, sent_length = self.process_stream_event(
event,
accumulated_response,
suggest_code,
text_complete,
text_started,
sent_length,
stream_manager
stream_manager,
original_code
)
message = stream_obj.get_final_message()

# Flush any remaining buffered content when suggest_code is true
if suggest_code and not text_complete:
# Flush any remaining buffered text
if suggest_code and text_started:
if sent_length < len(accumulated_response):
remaining = accumulated_response[sent_length:]
stream_manager.send_text(remaining)
stream_manager.send_text(self._unescape_json_string(remaining))

else:
logger.info("Making non-streaming API call")
Expand All @@ -197,7 +213,7 @@ def generate(
response = "\n\n".join(response_parts)

if suggest_code is True:
response = '{\n "text_answer": "' + response # Add back the prefilled opening brace
response = '{\n "code_edits": [' + response # Add back the prefilled opening

if suggest_code is True:
# Parse JSON response and apply code edits
Expand Down Expand Up @@ -237,53 +253,94 @@ def generate(
diff=diff
)

@staticmethod
def _resolve_code_edits(original_code, code_edits):
"""Apply code edits to original code without error correction.

Best-effort resolution for streaming preview — skips edits that
fail to apply rather than making a second LLM call.
"""
if not original_code or not code_edits:
return None

current_code = original_code
applied = False

for edit in code_edits:
action = edit.get("action")
if action == "rewrite":
new_code = edit.get("new_code")
if new_code:
current_code = new_code
applied = True
elif action == "replace":
old_code = edit.get("old_code")
new_code = edit.get("new_code")
if old_code and new_code is not None and old_code in current_code:
current_code = current_code.replace(old_code, new_code, 1)
applied = True

return current_code if applied else None

def process_stream_event(
self,
event,
accumulated_response,
suggest_code,
text_complete,
text_started,
sent_length,
stream_manager
stream_manager,
original_code=None
):
"""
Process a single stream event from the Anthropic API.

With suggest_code, code_edits are generated first (buffered silently),
then a changes event is sent, and text_answer streams to the client.
"""
if event.type == "content_block_delta":
if event.delta.type == "text_delta":
text_chunk = event.delta.text
accumulated_response += text_chunk

if suggest_code and not text_complete:
delimiter = '",\n "code_edits"'
if suggest_code and not text_started:
# Code edits phase: buffer silently until text_answer starts
delimiter = ',\n "text_answer": "'

# Stream chunks until we hit the end pattern (which might be split across chunks)
if delimiter in accumulated_response:
# Get only the text part and send any remaining unsent text
text_only = accumulated_response.split(delimiter)[0]
remaining_text = text_only[sent_length:]
if remaining_text:
stream_manager.send_text(remaining_text)

# Send "Writing code..." thinking block
stream_manager.send_thinking("Writing code...")

text_complete = True
else:
# Buffer to avoid sending partial delimiter
# Only send content that we know won't be part of the delimiter
buffer_size = len(delimiter) - 1
safe_to_send_until = len(accumulated_response) - buffer_size

if safe_to_send_until > sent_length:
safe_text = accumulated_response[sent_length:safe_to_send_until]
stream_manager.send_text(safe_text)
sent_length = safe_to_send_until
# Extract code_edits JSON and resolve into final code
code_edits_raw = '[' + accumulated_response.split(delimiter)[0]
try:
code_edits = json.loads(code_edits_raw)
resolved_code = self._resolve_code_edits(original_code, code_edits)
if resolved_code:
stream_manager.send_changes({"code": resolved_code})
else:
stream_manager.send_changes({"code_edits": code_edits})
except json.JSONDecodeError:
logger.warning(f"Failed to parse code_edits: {code_edits_raw[:200]}")

# Mark where text content starts in the accumulated buffer
text_offset = accumulated_response.find(delimiter) + len(delimiter)
sent_length = text_offset
text_started = True

if suggest_code and text_started:
# Text phase: stream with buffer to handle split escape sequences
# Use 2-char buffer since longest escape is 2 chars (e.g. \n, \")
buffer_size = 2
safe_to_send_until = len(accumulated_response) - buffer_size

if safe_to_send_until > sent_length:
safe_text = accumulated_response[sent_length:safe_to_send_until]
stream_manager.send_text(self._unescape_json_string(safe_text))
sent_length = safe_to_send_until

elif not suggest_code:
# Normal streaming for non-code suggestions
stream_manager.send_text(text_chunk)

return accumulated_response, text_complete, sent_length
return accumulated_response, text_started, sent_length

def parse_and_apply_edits(self, response: str, content: str, original_code: Optional[str] = None) -> tuple[str, Optional[str], Optional[Dict[str, Any]]]:
"""Parse JSON response and apply code edits to original code."""
Expand Down
12 changes: 6 additions & 6 deletions services/job_chat/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,12 @@
You must respond in JSON format with two fields:

{
"text_answer": "Your conversational response here",
"code_edits": []
"code_edits": [],
"text_answer": "Your conversational response here"
}

Use "text_answer" for all explanations, guidance, and conversation.
Use "code_edits" only when you need to modify the user's existing code or provide new code suggestions.
Use "code_edits" when you need to modify the user's existing code or provide new code suggestions.
Use "text_answer" for all explanations, guidance, and conversation.
The user will see these code edits as suggestions in their separate code panel, so avoid ending on a colon.

Code edit actions:
Expand Down Expand Up @@ -210,12 +210,12 @@

Example:
{
"text_answer": "I'll add error handling after your GET request",
"code_edits": [{
"action": "replace",
"old_code": "get('/patients');",
"new_code": "get('/patients');\\nfn(state => {\\n if (!state.data) {\\n throw new Error(\\\"No data received\\\");\\n }\\n return state;\\n});"
}]
}],
"text_answer": "I'll add error handling after your GET request"
}

ALWAYS use \\n instead of actual newlines:
Expand Down
11 changes: 11 additions & 0 deletions services/streaming_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,17 @@ def send_text(self, text_chunk: str) -> None:
"delta": {"type": "text_delta", "text": text_chunk},
})

def send_changes(self, changes_data: dict[str, Any]) -> None:
"""
Send structured changes (code edits or workflow YAML) as a custom
SSE event so the client can render them before text streams.
"""
if not self.stream_started:
self.start_stream()

self._close_open_blocks()
self._emit_event('changes', changes_data)

def end_stream(self, stop_reason: str = "end_turn") -> None:
"""
End the stream by closing all open blocks and sending final events.
Expand Down
12 changes: 6 additions & 6 deletions services/workflow_chat/gen_project_prompts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ prompts:

The output should be:
{{
"text": "Your reasoning (max ~4 sentences).",
"yaml": "name: Daily CommCare to Satusehat Encounter Sync\njobs:\n Fetch-visits-from-CommCare:\n name: Fetch visits from CommCare\n adaptor: \"@openfn/language-commcare@latest\"\n body: \"// Add operations here\"\n Create-FHIR-Encounter-for-visitors-with-IHS-number:\n name: Create FHIR Encounter for visitors with IHS number\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\n Lookup-IHS-number-in-Satusehat:\n name: Lookup IHS number in Satusehat\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\n Create-FHIR-Encounter-after-IHS-lookup:\n name: Create FHIR Encounter after IHS lookup\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\ntriggers:\n cron:\n type: cron\n cron_expression: 0 0 * * *\n enabled: false\nedges:\n cron->Fetch-visits-from-CommCare:\n source_trigger: cron\n target_job: Fetch-visits-from-CommCare\n condition_type: always\n enabled: true\n Fetch-visits-from-CommCare->Create-FHIR-Encounter-for-visitors-with-IHS-number:\n source_job: Fetch-visits-from-CommCare\n target_job: Create-FHIR-Encounter-for-visitors-with-IHS-number\n condition_type: on_job_success\n enabled: true\n Fetch-visits-from-CommCare->Lookup-IHS-number-in-Satusehat:\n source_job: Fetch-visits-from-CommCare\n target_job: Lookup-IHS-number-in-Satusehat\n condition_type: on_job_success\n enabled: true\n Lookup-IHS-number-in-Satusehat->Create-FHIR-Encounter-after-IHS-lookup:\n source_job: Lookup-IHS-number-in-Satusehat\n target_job: Create-FHIR-Encounter-after-IHS-lookup\n condition_type: on_job_success\n enabled: true"
"yaml": "name: Daily CommCare to Satusehat Encounter Sync\njobs:\n Fetch-visits-from-CommCare:\n name: Fetch visits from CommCare\n adaptor: \"@openfn/language-commcare@latest\"\n body: \"// Add operations here\"\n Create-FHIR-Encounter-for-visitors-with-IHS-number:\n name: Create FHIR Encounter for visitors with IHS number\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\n Lookup-IHS-number-in-Satusehat:\n name: Lookup IHS number in Satusehat\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\n Create-FHIR-Encounter-after-IHS-lookup:\n name: Create FHIR Encounter after IHS lookup\n adaptor: \"@openfn/language-satusehat@latest\"\n body: \"// Add operations here\"\ntriggers:\n cron:\n type: cron\n cron_expression: 0 0 * * *\n enabled: false\nedges:\n cron->Fetch-visits-from-CommCare:\n source_trigger: cron\n target_job: Fetch-visits-from-CommCare\n condition_type: always\n enabled: true\n Fetch-visits-from-CommCare->Create-FHIR-Encounter-for-visitors-with-IHS-number:\n source_job: Fetch-visits-from-CommCare\n target_job: Create-FHIR-Encounter-for-visitors-with-IHS-number\n condition_type: on_job_success\n enabled: true\n Fetch-visits-from-CommCare->Lookup-IHS-number-in-Satusehat:\n source_job: Fetch-visits-from-CommCare\n target_job: Lookup-IHS-number-in-Satusehat\n condition_type: on_job_success\n enabled: true\n Lookup-IHS-number-in-Satusehat->Create-FHIR-Encounter-after-IHS-lookup:\n source_job: Lookup-IHS-number-in-Satusehat\n target_job: Create-FHIR-Encounter-after-IHS-lookup\n condition_type: on_job_success\n enabled: true",
"text": "Your reasoning (max ~4 sentences)."
}}

## Do NOT fill in job code
Expand All @@ -167,8 +167,8 @@ prompts:

## Output Format

You must respond in JSON format with two fields: "text" and "yaml".
"text" for all explanation, and "yaml" for the YAML block.
You must respond in JSON format with two fields: "yaml" and "text".
"yaml" for the YAML block, and "text" for all explanation.

unstructured_output_format: |
## Read-only Mode
Expand All @@ -189,9 +189,9 @@ prompts:

## Output Format

You must respond in JSON format with two fields: "text" and "yaml".
The "text" field contains your complete answer with any YAML in triple-backticked code blocks.
You must respond in JSON format with two fields: "yaml" and "text".
The "yaml" field should always be null.
The "text" field contains your complete answer with any YAML in triple-backticked code blocks.

normal_mode_intro: |
Your task is to talk to a client with the goal of converting their description of a workflow into an OpenFn workflow YAML.
Expand Down
Loading