From 523c55b7f0a9ba408f0c22b2f8e02bc0eedf0693 Mon Sep 17 00:00:00 2001 From: deacon Date: Mon, 16 Mar 2026 10:53:48 -0400 Subject: [PATCH] feat: support dict-style steps with per-step metadata in adversary profiles Extends adversary atomic_ordering to accept dict-style steps alongside legacy string ability IDs. Each dict step can specify executor_facts per platform for fact injection into command templates. Updates the atomic planner to use step_idx for correct ordering of repeated abilities, and the planning service to generate links with metadata-injected facts. --- AdversarySCHEMA.yml | 32 +++++++ app/objects/c_adversary.py | 25 ++++-- app/objects/secondclass/c_link.py | 4 +- app/planners/atomic.py | 22 +++-- app/service/planning_svc.py | 137 +++++++++++++++++++++++++----- 5 files changed, 182 insertions(+), 38 deletions(-) create mode 100644 AdversarySCHEMA.yml diff --git a/AdversarySCHEMA.yml b/AdversarySCHEMA.yml new file mode 100644 index 000000000..33dd7ac3a --- /dev/null +++ b/AdversarySCHEMA.yml @@ -0,0 +1,32 @@ +# Adversary Profile Schema - Extended Format +# +# atomic_ordering accepts both legacy string ability IDs and dict-style +# steps with per-step metadata. The two formats can be mixed freely. +# +# Dict-style step fields: +# ability_id (required) - UUID of the ability to execute +# metadata (optional) - per-step configuration +# executor_facts - map of platform -> list of {trait, value} dicts +# used to inject facts into command templates + +adversary_id: 95cad57a-8e4f-4ba0-87cf-03d8a7cad0a0 +name: Example Adversary +description: Demonstrates the extended adversary profile schema +objective: 495a9828-cab1-44dd-a0ca-66e58177d8cc +atomic_ordering: + # Dict-style step with per-platform executor facts + - ability_id: 36eecb80-ede3-442b-8774-956e906aff02 + metadata: + executor_facts: + linux: + - trait: time.sleep + value: '10' + # Same ability can appear multiple times with different metadata + - ability_id: 36eecb80-ede3-442b-8774-956e906aff02 + metadata: + executor_facts: + linux: + - trait: time.sleep + value: '3' + # Legacy string format still works + # - diff --git a/app/objects/c_adversary.py b/app/objects/c_adversary.py index bcc85fd42..3eaf4064e 100644 --- a/app/objects/c_adversary.py +++ b/app/objects/c_adversary.py @@ -17,7 +17,9 @@ class Meta: adversary_id = ma.fields.String() name = ma.fields.String() description = ma.fields.String() - atomic_ordering = ma.fields.List(ma.fields.String()) + atomic_ordering = ma.fields.List( + ma.fields.Raw(), # Accepts either str (ability_id) or dict (step with metadata) + ) objective = ma.fields.String() tags = ma.fields.List(ma.fields.String(), allow_none=True) has_repeatable_abilities = ma.fields.Boolean(dump_only=True) @@ -59,16 +61,18 @@ class Adversary(FirstClassObjectInterface, BaseObject): def unique(self): return self.hash('%s' % self.adversary_id) - def __init__(self, name='', adversary_id='', description='', atomic_ordering=(), objective='', tags=None, plugin=''): + def __init__(self, name='', adversary_id='', description='', atomic_ordering=(), objective='', tags=None, + plugin='', metadata=None, **_): super().__init__() self.adversary_id = adversary_id if adversary_id else str(uuid.uuid4()) self.name = name self.description = description - self.atomic_ordering = atomic_ordering + self.atomic_ordering = list(atomic_ordering or []) self.objective = objective or DEFAULT_OBJECTIVE_ID self.tags = set(tags) if tags else set() self.has_repeatable_abilities = False self.plugin = plugin + self.metadata = metadata or {} def store(self, ram): existing = self.retrieve(ram['adversaries'], self.unique) @@ -85,9 +89,11 @@ def store(self, ram): return existing def verify(self, log, abilities, objectives): - for ability_id in self.atomic_ordering: - if not next((ability for ability in abilities if ability.ability_id == ability_id), None): - log.warning('Ability referenced in adversary %s but not found: %s', self.adversary_id, ability_id) + for step in self.atomic_ordering: + ability_id = step if isinstance(step, str) else step.get('ability_id') + if not any(ability.ability_id == ability_id for ability in abilities): + log.warning('Ability referenced in adversary %s but not found: %s', + self.adversary_id, ability_id) if not self.objective: self.objective = DEFAULT_OBJECTIVE_ID @@ -108,4 +114,9 @@ async def which_plugin(self): return self.plugin def check_repeatable_abilities(self, ability_list): - return any(ab.repeatable for ab_id in self.atomic_ordering for ab in ability_list if ab.ability_id == ab_id) + for step in self.atomic_ordering: + ability_id = step if isinstance(step, str) else step.get('ability_id') + for ab in ability_list: + if ab.ability_id == ability_id and ab.repeatable: + return True + return False diff --git a/app/objects/secondclass/c_link.py b/app/objects/secondclass/c_link.py index 7afd4dc7e..a25d28ba8 100644 --- a/app/objects/secondclass/c_link.py +++ b/app/objects/secondclass/c_link.py @@ -49,6 +49,7 @@ class Meta: output = ma.fields.String() deadman = ma.fields.Boolean() agent_reported_time = ma.fields.DateTime(format=BaseObject.TIME_FORMAT, load_default=None) + metadata = ma.fields.Dict(keys=ma.fields.String(), values=ma.fields.Raw(), allow_none=True) @ma.pre_load() def fix_ability(self, link, **_): @@ -160,7 +161,7 @@ def is_global_variable(cls, variable): return variable in cls.RESERVED def __init__(self, command='', plaintext_command='', paw='', ability=None, executor=None, status=-3, score=0, jitter=0, cleanup=0, id='', - pin=0, host=None, deadman=False, used=None, relationships=None, agent_reported_time=None): + pin=0, host=None, deadman=False, used=None, relationships=None, agent_reported_time=None, metadata=None): super().__init__() self.id = str(id) self.command = command @@ -186,6 +187,7 @@ def __init__(self, command='', plaintext_command='', paw='', ability=None, execu self.output = False self.deadman = deadman self.agent_reported_time = agent_reported_time + self.metadata = metadata or {} def __eq__(self, other): if isinstance(other, Link): diff --git a/app/planners/atomic.py b/app/planners/atomic.py index 2db0bd0bd..1a9eeee93 100644 --- a/app/planners/atomic.py +++ b/app/planners/atomic.py @@ -32,11 +32,19 @@ async def _get_links(self, agent=None): return await self.planning_svc.get_links(operation=self.operation, agent=agent) # Given list of links, returns the link that appears first in the adversary's atomic ordering. - async def _get_next_atomic_link(self, links): - abil_id_to_link = dict() - for link in links: - abil_id_to_link[link.ability.ability_id] = link + async def _get_next_atomic_link(self, possible_links): + # Try to match based on explicit step_idx (set by _generate_new_links for dict-style steps) + link_lookup = {link.step_idx: link for link in possible_links if hasattr(link, 'step_idx')} + for idx, _ in enumerate(self.operation.adversary.atomic_ordering): + if idx in link_lookup: + return link_lookup[idx] + + # Fallback to ability_id matching (legacy string-style ordering) + abil_id_to_link = {link.ability.ability_id: link for link in possible_links} candidate_ids = set(abil_id_to_link.keys()) - for ab_id in self.operation.adversary.atomic_ordering: - if ab_id in candidate_ids: - return abil_id_to_link[ab_id] + for step in self.operation.adversary.atomic_ordering: + ability_id = step if isinstance(step, str) else step.get('ability_id') + if ability_id in candidate_ids: + return abil_id_to_link[ability_id] + + return None diff --git a/app/service/planning_svc.py b/app/service/planning_svc.py index 2996490dd..35d87aaf4 100644 --- a/app/service/planning_svc.py +++ b/app/service/planning_svc.py @@ -1,3 +1,4 @@ +from app.objects.secondclass.c_fact import Fact from app.objects.secondclass.c_link import Link from app.service.interfaces.i_planning_svc import PlanningServiceInterface from app.utility.base_planning_svc import BasePlanningService @@ -142,7 +143,8 @@ async def get_links(self, operation, buckets=None, agent=None, trim=True): For an operation and agent combination, create links (that can be executed). When no agent is supplied, links for all agents - are returned. + are returned. Supports both legacy string ability IDs and + dict-style steps with per-step metadata in atomic_ordering. :param operation: Operation to generate links for :type operation: Operation @@ -158,25 +160,56 @@ async def get_links(self, operation, buckets=None, agent=None, trim=True): :type trim: bool, optional :return: a list of links sorted by score and atomic ordering """ - ao = operation.adversary.atomic_ordering - abilities = await self.get_service('data_svc') \ - .locate('abilities', match=dict(ability_id=tuple(ao))) + raw_steps = operation.adversary.atomic_ordering + step_entries = [] + unique_ids = set() + + for idx, step in enumerate(raw_steps): + if isinstance(step, str): + ability_id = step + metadata = {} + else: + ability_id = step.get('ability_id') + metadata = step.get('metadata', {}) + + step_entries.append({ + 'step_idx': idx, + 'ability_id': ability_id, + 'metadata': metadata, + }) + unique_ids.add(ability_id) + + all_abilities = await self.get_service('data_svc').locate( + 'abilities', match=dict(ability_id=tuple(unique_ids)) + ) + ability_map = {a.ability_id: a for a in all_abilities} + + steps_with_abilities = [] + for step in step_entries: + ability = ability_map.get(step['ability_id']) + if not ability: + continue + steps_with_abilities.append({ + 'step_idx': step['step_idx'], + 'ability': ability, + 'metadata': step['metadata'], + }) + if buckets: - # buckets specified - get all links for given buckets, - # (still in underlying atomic adversary order) - t = [] - for bucket in buckets: - t.extend([ab for ab in abilities for b in ab.buckets if b == bucket]) - abilities = t + steps_with_abilities = [ + s for s in steps_with_abilities + if any(bucket in s['ability'].buckets for bucket in buckets) + ] + links = [] if agent: - links.extend(await self.generate_and_trim_links(agent, operation, abilities, trim)) + links.extend(await self.generate_and_trim_links(agent, operation, steps_with_abilities, trim)) else: agent_links = [] for agent in operation.agents: - agent_links.append(await self.generate_and_trim_links(agent, operation, abilities, trim)) + agent_links.append(await self.generate_and_trim_links(agent, operation, steps_with_abilities, trim)) links = await self._remove_links_of_duplicate_singletons(agent_links) - self.log.debug('Generated %s usable links' % (len(links))) + self.log.debug('Generated %s usable links', len(links)) return await self.sort_links(links) async def get_cleanup_links(self, operation, agent=None): @@ -332,33 +365,91 @@ async def _check_and_generate_cleanup_links(self, agent, operation): link_status=operation.link_status()) return agent_cleanup_links - async def _generate_new_links(self, operation, agent, abilities, link_status): - """Generate links with given status + async def _generate_new_links(self, operation, agent, steps_with_abilities, link_status): + """Generate links with given status, supporting per-step metadata. + + Each entry in steps_with_abilities is a dict with keys: + step_idx - ordinal position in the adversary profile + ability - resolved Ability object + metadata - dict of per-step metadata (may contain executor_facts) :param operation: Operation to generate links on :type operation: Operation :param agent: Agent to generate links on :type agent: Agent - :param agent: Abilities to generate links for - :type agent: list(Ability) + :param steps_with_abilities: Steps with resolved abilities and metadata + :type steps_with_abilities: list(dict) :param link_status: Link status, referencing link state dict :type link_status: int :return: Links for agent :rtype: list(Link) """ links = [] - for ability in await agent.capabilities(abilities): + for step in steps_with_abilities: + idx = step['step_idx'] + ability = step['ability'] + step_meta = step.get('metadata', {}) + + supported_abilities = await agent.capabilities([ability]) + if ability.ability_id not in [a.ability_id for a in supported_abilities]: + continue + executor = await agent.get_preferred_executor(ability) if not executor: continue + await self._call_ability_plugin_hooks(ability, executor) - if executor.command: - link = Link.load(dict(command=self.encode_string(executor.test), paw=agent.paw, score=0, - ability=ability, executor=executor, status=link_status, - jitter=self.jitter(operation.jitter))) - links.append(link) + + # Collect per-step executor_facts if present + fact_dicts = [] + exec_facts = step_meta.get('executor_facts', {}).get(executor.platform, []) + for f in exec_facts: + try: + fact_dicts.append({f['trait'].strip(): f['value'].strip('" ')}) + except (KeyError, AttributeError) as e: + self.log.warning('Skipping malformed fact in step %s: %s (%s)', idx, f, e) + + link = Link.load(dict( + paw=agent.paw, + score=0, + ability=ability, + executor=executor, + status=link_status, + jitter=self.jitter(operation.jitter), + )) + link.step_idx = idx + + if fact_dicts: + for fd in fact_dicts: + for trait, value in fd.items(): + link.used.append(Fact(trait=trait, value=value, source=operation.id)) + injected = self._inject_facts(executor.test, fact_dicts) + else: + injected, _, used = await self._build_single_test_variant(executor.test, [], executor.name) + for f in used: + link.used.append(f) + + link.command = self.encode_string(injected) + links.append(link) + return links + @staticmethod + def _inject_facts(template, fact_dicts): + """Replace #{trait} placeholders in the template using fact dicts. + + :param template: Command template with #{trait} placeholders + :type template: str + :param fact_dicts: List of {trait: value} dicts + :type fact_dicts: list(dict) + :return: Template with placeholders replaced + :rtype: str + """ + for entry in fact_dicts: + for trait, value in entry.items(): + template = template.replace(f'#{{{trait}}}', value) + return template + async def _generate_cleanup_links(self, operation, agent, link_status): """Generate cleanup links with given status