Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 66 additions & 65 deletions worker_plan/worker_plan_internal/diagnostics/premortem.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ class PremortemAnalysis(BaseModel):
assumptions_to_kill: List[AssumptionItem] = Field(description="A list of 3 new, critical, underlying assumptions to test immediately.")
failure_modes: List[FailureModeItem] = Field(description="A list containing exactly 3 distinct failure failure_modes, one for each archetype.")

class ArchetypeNarrative(BaseModel):
"""Minimal per-archetype schema. IDs and cross-references are assigned by the program, not the LLM."""
archetype: str = Field(description="The failure archetype category most relevant to this project and scenario (e.g. 'Process/Financial', 'Technical/Logistical', 'Market/Human', or a more specific variant).")
assumption: str = Field(description="One critical assumption the project is making that, if false, would cause this failure.")
test_now: str = Field(description="One concrete action to immediately test if this assumption holds.")
failure_title: str = Field(description="A short, compelling title for this failure scenario (e.g. 'The Gridlock Gamble').")
failure_story: str = Field(description="A detailed narrative of how this failure unfolds. Explain causes, chain of events, and impact.")
warning_signs: List[str] = Field(description="2-4 observable signals that this failure is beginning to occur.")

PREMORTEM_SYSTEM_PROMPT = """
Persona: You are a senior project analyst. Your primary goal is to write compelling, detailed, and distinct failure stories that are also operationally actionable.

Expand Down Expand Up @@ -119,92 +128,84 @@ def execute(cls, llm_executor: LLMExecutor, speed_vs_detail: SpeedVsDetailEnum,
logger.debug(f"User Prompt:\n{user_prompt}")
system_prompt = PREMORTEM_SYSTEM_PROMPT.strip()

accumulated_chat_message_list = [
ChatMessage(
role=MessageRole.SYSTEM,
content=system_prompt,
)
]

user_prompt_list = [
user_prompt,
"Generate 3 new assumptions that are thematically different from the previous ones. Start assumption_id at A4.",
"Generate 3 new assumptions that are thematically different from the previous ones and covers different archetypes. Start assumption_id at A7.",
]
# Archetype suggestions guide the LLM; the LLM writes the actual archetype name
# in ArchetypeNarrative.archetype, adapting it to the specific project if needed.
archetype_suggestions = ["Process/Financial", "Technical/Logistical", "Market/Human"]
num_rounds = 1 if speed_vs_detail == SpeedVsDetailEnum.FAST_BUT_SKIP_DETAILS else 3
if speed_vs_detail == SpeedVsDetailEnum.FAST_BUT_SKIP_DETAILS:
user_prompt_list = user_prompt_list[:1]
logger.info("Running in FAST_BUT_SKIP_DETAILS mode. Omitting some assumptions.")
logger.info("Running in FAST_BUT_SKIP_DETAILS mode. 1 round × 3 archetypes = 3 calls.")
else:
logger.info("Running in ALL_DETAILS_BUT_SLOW mode. Processing all assumptions.")
logger.info("Running in ALL_DETAILS_BUT_SLOW mode. 3 rounds × 3 archetypes = 9 calls.")

responses: list[PremortemAnalysis] = []
assumptions_to_kill: list[AssumptionItem] = []
failure_modes: list[FailureModeItem] = []
metadata_list: list[dict] = []
for user_prompt_index, user_prompt_item in enumerate(user_prompt_list):
logger.info(f"Processing user_prompt_index: {user_prompt_index+1} of {len(user_prompt_list)}")
chat_message_list = accumulated_chat_message_list.copy()
chat_message_list.append(
ChatMessage(
role=MessageRole.USER,
content=user_prompt_item,

call_index = 0
first_call = True
for round_index in range(num_rounds):
for archetype_suggestion in archetype_suggestions:
call_index += 1
assumption_id = f"A{len(assumptions_to_kill) + 1}"
failure_mode_index = len(failure_modes) + 1
logger.info(f"Call {call_index}: round={round_index+1}, suggestion={archetype_suggestion!r}")

archetype_user_prompt = (
f"{user_prompt}\n\n"
f"Suggested archetype: {archetype_suggestion}\n"
f"Write one assumption and one failure scenario. "
f"Adapt the archetype name if a more specific label fits the project."
)
)
chat_message_list = [
ChatMessage(role=MessageRole.SYSTEM, content=system_prompt),
ChatMessage(role=MessageRole.USER, content=archetype_user_prompt),
]

def execute_function(llm: LLM) -> dict:
sllm = llm.as_structured_llm(PremortemAnalysis)
sllm = llm.as_structured_llm(ArchetypeNarrative)
start_time = time.perf_counter()

chat_response = sllm.chat(chat_message_list)
pydantic_response = chat_response.raw

narrative = require_raw(chat_response, ArchetypeNarrative)
end_time = time.perf_counter()
duration = int(ceil(end_time - start_time))

metadata = dict(llm.metadata)
metadata["llm_classname"] = llm.class_name()
metadata["duration"] = duration

return {
"pydantic_response": pydantic_response,
"metadata": metadata,
"duration": duration
}
return {"narrative": narrative, "metadata": metadata}

try:
result = llm_executor.run(execute_function)
except PipelineStopRequested:
# Re-raise PipelineStopRequested without wrapping it
raise
except Exception as e:
logger.debug(f"LLM chat interaction failed: {e}")
logger.error("LLM chat interaction failed.", exc_info=True)
if user_prompt_index == 0:
logger.error("The first user prompt failed. This is a critical error. Please check the system prompt and user prompt.")
raise ValueError("LLM chat interaction failed.") from e
else:
logger.error(f"User prompt {user_prompt_index+1} failed. Continuing with next user prompt.")
continue

assistant_content_raw: dict = result["pydantic_response"].model_dump()
# Compact JSON without newlines and spaces, since it's going to be parsed by the LLM. Pretty printing wastes input tokens for the LLM.
assistant_content: str = json.dumps(assistant_content_raw, separators=(',', ':'))

chat_message_list.append(
ChatMessage(
role=MessageRole.ASSISTANT,
content=assistant_content,
)
)

responses.append(result["pydantic_response"])
logger.error(f"Call {call_index} failed: {e}", exc_info=True)
if first_call:
raise ValueError(f"First archetype call failed: {e}") from e
logger.warning(f"Skipping suggestion {archetype_suggestion!r} due to failure.")
continue

first_call = False
narrative: ArchetypeNarrative = result["narrative"]
metadata_list.append(result["metadata"])
accumulated_chat_message_list = chat_message_list.copy()

# Use the last response as the primary result
assumptions_to_kill: list[AssumptionItem] = []
failure_modes: list[FailureModeItem] = []
for response in responses:
assumptions_to_kill.extend(response.assumptions_to_kill)
failure_modes.extend(response.failure_modes)
# Code assigns IDs and cross-references — the LLM only provides narrative text.
# The actual archetype name comes from narrative.archetype (LLM adapts to the project).
assumption = AssumptionItem(
assumption_id=assumption_id,
statement=narrative.assumption,
test_now=narrative.test_now,
falsifier=f"Result of: {narrative.test_now} — reveals the assumption does not hold.",
)
failure_mode = FailureModeItem(
failure_mode_index=failure_mode_index,
root_cause_assumption_id=assumption_id,
failure_mode_archetype=narrative.archetype,
failure_mode_title=narrative.failure_title,
risk_analysis=narrative.failure_story,
early_warning_signs=narrative.warning_signs,
)
assumptions_to_kill.append(assumption)
failure_modes.append(failure_mode)

final_response = PremortemAnalysis(
assumptions_to_kill=assumptions_to_kill,
Expand Down Expand Up @@ -286,7 +287,7 @@ def _calculate_risk_level_brief(likelihood: Optional[int], impact: Optional[int]
def _calculate_risk_level_verbose(likelihood: Optional[int], impact: Optional[int]) -> str:
"""Calculates a qualitative risk level from likelihood and impact scores."""
if likelihood is None or impact is None:
return f"Likelihood {likelihood}/5, Impact {impact}/5"
return "Not Scored"

score = likelihood * impact
if score >= 15:
Expand Down
Loading