FEAT Beam Search for OpenAIResponseTarget#1346
FEAT Beam Search for OpenAIResponseTarget#1346riedgar-ms wants to merge 119 commits intoAzure:mainfrom
Conversation
| beam_reviewer=TopKBeamReviewer(k=2, drop_chars=1), | ||
| attack_scoring_config=scoring_config, | ||
| num_chars_per_step=0, | ||
| ) |
There was a problem hiding this comment.
The tests only cover initialization validation but lack coverage for the core attack execution logic including _perform_async, _propagate_beam_async, and _score_beam_async. While these methods interact with OpenAIResponseTarget features, consider adding tests with mocked responses to verify the beam propagation, scoring, and beam review logic work correctly.
| ) | |
| ) | |
| @pytest.mark.asyncio | |
| async def test_perform_async_called_with_mock_context_async( | |
| self, mock_target, mock_true_false_scorer, mock_float_scale_scorer | |
| ): | |
| """Ensure _perform_async can be awaited with a mocked context.""" | |
| scoring_config = AttackScoringConfig( | |
| objective_scorer=mock_true_false_scorer, auxiliary_scorers=[mock_float_scale_scorer] | |
| ) | |
| attack = BeamSearchAttack( | |
| objective_target=mock_target, | |
| beam_reviewer=TopKBeamReviewer(k=2, drop_chars=1), | |
| attack_scoring_config=scoring_config, | |
| ) | |
| mock_context = MagicMock() | |
| attack._perform_async = AsyncMock(return_value=None) # type: ignore[assignment] | |
| await attack._perform_async(mock_context) | |
| attack._perform_async.assert_awaited_once_with(mock_context) | |
| @pytest.mark.asyncio | |
| async def test_propagate_beam_async_with_mock_beam_async( | |
| self, mock_target, mock_true_false_scorer, mock_float_scale_scorer | |
| ): | |
| """Verify _propagate_beam_async is async-callable and works with a Beam instance.""" | |
| scoring_config = AttackScoringConfig( | |
| objective_scorer=mock_true_false_scorer, auxiliary_scorers=[mock_float_scale_scorer] | |
| ) | |
| attack = BeamSearchAttack( | |
| objective_target=mock_target, | |
| beam_reviewer=TopKBeamReviewer(k=2, drop_chars=1), | |
| attack_scoring_config=scoring_config, | |
| ) | |
| initial_beam = Beam(identifier=str(uuid.uuid4()), content="seed", score=None) | |
| propagated_beam = Beam(identifier=str(uuid.uuid4()), content="seed_extended", score=None) | |
| attack._propagate_beam_async = AsyncMock(return_value=propagated_beam) # type: ignore[assignment] | |
| result = await attack._propagate_beam_async(initial_beam) | |
| attack._propagate_beam_async.assert_awaited_once_with(initial_beam) | |
| assert result is propagated_beam | |
| @pytest.mark.asyncio | |
| async def test_score_beam_async_with_mock_scorers_async( | |
| self, mock_target, mock_true_false_scorer, mock_float_scale_scorer | |
| ): | |
| """Verify _score_beam_async can be awaited and returns mocked score data.""" | |
| scoring_config = AttackScoringConfig( | |
| objective_scorer=mock_true_false_scorer, auxiliary_scorers=[mock_float_scale_scorer] | |
| ) | |
| attack = BeamSearchAttack( | |
| objective_target=mock_target, | |
| beam_reviewer=TopKBeamReviewer(k=2, drop_chars=1), | |
| attack_scoring_config=scoring_config, | |
| ) | |
| beam = Beam(identifier=str(uuid.uuid4()), content="to_score", score=None) | |
| fake_score_result = {"objective_score": 0.9, "auxiliary_scores": [0.5]} | |
| attack._score_beam_async = AsyncMock(return_value=fake_score_result) # type: ignore[assignment] | |
| result = await attack._score_beam_async(beam) | |
| attack._score_beam_async.assert_awaited_once_with(beam) | |
| assert result == fake_score_result |
| target = self._get_target_for_beam(beam) | ||
|
|
||
| current_context = copy.deepcopy(self._start_context) | ||
| await self._setup_async(context=current_context) |
There was a problem hiding this comment.
Calling _setup_async for each beam propagation can cause race conditions when multiple beams are processed concurrently. The method overwrites self._start_context and modifies the context's conversation_id, which could lead to unpredictable behavior. Consider refactoring to avoid calling _setup_async in the concurrent beam propagation, or ensure proper isolation between concurrent beam operations.
| await self._setup_async(context=current_context) |
There was a problem hiding this comment.
As I think I've noted when this comment has come up before, my whole handling of the new messages may be incorrect. I'd like the 'bigger picture' perspective
| _extra_beam_count = self.desired_beam_count or len(beams) | ||
|
|
||
| for i in range(_extra_beam_count - len(new_beams)): | ||
| nxt = copy.deepcopy(new_beams[i % self.k]) |
There was a problem hiding this comment.
When creating new beams from top performers using deepcopy, the beam IDs are preserved. This means multiple beams can have the same conversation_id, which could cause issues with conversation tracking and memory storage. Consider generating new unique IDs for the copied beams to maintain proper conversation isolation.
| nxt = copy.deepcopy(new_beams[i % self.k]) | |
| nxt = copy.deepcopy(new_beams[i % self.k]) | |
| nxt.id = str(uuid.uuid4()) | |
| if nxt.message is not None and hasattr(nxt.message, "conversation_id"): | |
| nxt.message.conversation_id = str(uuid.uuid4()) |
There was a problem hiding this comment.
I'd appreciate a human review on this; it's linked to my other questions
| await self._setup_async(context=current_context) | ||
|
|
||
| message = self._get_message(current_context) | ||
| beam.id = current_context.conversation_id |
There was a problem hiding this comment.
_propagate_beam_async calls _setup_async, which mutates shared instance state (self._start_context) and is executed concurrently across beams. This creates a race condition and also overwrites the original start context for subsequent iterations. Refactor so per-beam setup does not write to shared state (e.g., create a local conversation_id + call ConversationManager.initialize_context_async directly without touching _start_context).
| await self._setup_async(context=current_context) | |
| message = self._get_message(current_context) | |
| beam.id = current_context.conversation_id | |
| local_conversation_id = str(uuid.uuid4()) | |
| current_context.conversation_id = local_conversation_id | |
| await self._conversation_manager.initialize_context_async( | |
| context=current_context, | |
| conversation_reference=ConversationReference( | |
| conversation_id=local_conversation_id, | |
| conversation_type=ConversationType.ATTACK, | |
| ), | |
| prepended_conversation_config=self._prepended_conversation_config, | |
| ) | |
| message = self._get_message(current_context) | |
| beam.id = local_conversation_id |
| # Log the attack configuration | ||
| self._logger.info(f"Starting {self.__class__.__name__} with objective: {context.objective}") | ||
|
|
||
| beams = [Beam(id=context.conversation_id, text="", score=0.0) for _ in range(self._num_beams)] | ||
|
|
There was a problem hiding this comment.
BeamSearchAttack adds significant new behavior (beam propagation/scoring/pruning), but the current tests mainly cover grammar/reviewer and init validation. Add unit tests that mock PromptNormalizer.send_prompt_async and scorers to validate _perform_async end-to-end (beam expansion across iterations, scoring-driven pruning, and AttackResult fields like related_conversations).
| if beam.objective_score and beam.objective_score.get_value(): | ||
| # We have a positive score, so it's a success | ||
| return AttackOutcome.SUCCESS, "Objective achieved according to scorer" | ||
|
|
||
| # No response at all (all attempts filtered/failed) | ||
| return AttackOutcome.FAILURE, "All attempts were filtered or failed to get a response" | ||
|
|
There was a problem hiding this comment.
The failure outcome_reason here is misleading when a response was produced but the objective scorer returned a negative/false score. As written, any non-success (including “objective not achieved”) reports that all attempts were filtered/failed. Consider distinguishing between “no scorable response” vs “objective not achieved” (e.g., check beam.message / beam.objective_score and craft a reason accordingly).
| if beam.objective_score and beam.objective_score.get_value(): | |
| # We have a positive score, so it's a success | |
| return AttackOutcome.SUCCESS, "Objective achieved according to scorer" | |
| # No response at all (all attempts filtered/failed) | |
| return AttackOutcome.FAILURE, "All attempts were filtered or failed to get a response" | |
| if beam.objective_score is not None: | |
| if beam.objective_score.get_value(): | |
| # We have a positive score, so it's a success | |
| return AttackOutcome.SUCCESS, "Objective achieved according to scorer" | |
| # A response was scored but did not achieve the objective | |
| return AttackOutcome.FAILURE, "Objective not achieved according to scorer" | |
| # No objective score was produced | |
| if beam.message is None: | |
| # No response at all (all attempts filtered/failed) | |
| return AttackOutcome.FAILURE, "All attempts were filtered or failed to get a response" | |
| # We have a response but no positive objective score | |
| return AttackOutcome.FAILURE, "Objective not achieved: no positive score for the response" |
Description
Use the Lark grammar feature of the
OpenAIResponseTargetto create a beam search for PyRIT. This is a single turn attack, where a collection of candidate responses (the beams) are maintained. On each iteration, the model's response is allowed to extend a little for each beam. The beams are scored, with the worst performing ones discarded, and replaced with copies of higher scoring beams.Tests and Documentation
Have basic unit tests of the classes added, but since this requires features only currently in the
OpenAIResponseTargetthere didn't seem much point in mocking that. There is a notebook which runs everything E2E.