Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 117 additions & 15 deletions app/api/v2/managers/ability_api_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,21 @@
from app.api.v2.managers.base_api_manager import BaseApiManager
from app.api.v2.responses import JsonHttpBadRequest
from app.objects.c_ability import AbilitySchema
from app.service.file_svc import FileSvc
from app.utility.base_world import BaseWorld


class AbilityApiManager(BaseApiManager):
_EXECUTOR_LABEL_PATTERN = re.compile(r'^[a-zA-Z0-9_.-]+$')

def __init__(self, data_svc, file_svc):
super().__init__(data_svc=data_svc, file_svc=file_svc)

async def create_on_disk_object(self, data: dict, access: dict, ram_key: str, id_property: str, obj_class: type):
self._validate_ability_data(create=True, data=data)
obj_id = data.get('id')
obj_id = data['id']
if self.find_object(ram_key, {id_property: obj_id}):
raise JsonHttpBadRequest(f'Ability with given id already exists: {obj_id}')
file_path = self._create_ability_filepath(data.get('tactic'), obj_id)
allowed = self._get_allowed_from_access(access)
await self._save_and_reload_object(file_path, data, obj_class, allowed)
Expand Down Expand Up @@ -51,39 +56,136 @@
return next(self.find_objects(ram_key, {id_property: obj_id}))

def _validate_ability_data(self, create: bool, data: dict):
# Correct ability_id key for ability file saving.
data['id'] = data.pop('ability_id', '')
# Normalize ability ID: prefer explicit 'ability_id' if provided, otherwise preserve any existing 'id'.
ability_id = None
if 'ability_id' in data:
ability_id = data.pop('ability_id')
elif 'id' in data:
ability_id = data.get('id')

# Sanitize supplied IDs before assigning them internally. If no ID is supplied during creation,
# generate one instead.
if ability_id in (None, '') and create:
data['id'] = str(uuid.uuid4())
else:
data['id'] = BaseApiManager._sanitize_id(ability_id)

# If a new ability is being created, ensure required fields present.
if create:
# Set ability ID if undefined
if not data['id']:
data['id'] = str(uuid.uuid4())
if not data.get('name'):
raise JsonHttpBadRequest(f'Cannot create ability {data["id"]} due to missing name')
if 'tactic' not in data:
if not data.get('tactic'):
raise JsonHttpBadRequest(f'Cannot create ability {data["id"]} due to missing tactic')
if not data.get('executors'):
if not (data.get('executors') or data.get('platforms')):
raise JsonHttpBadRequest(f'Cannot create ability {data["id"]}: at least one executor required')
# Validate ID, used for file creation
validator = re.compile(r'^[a-zA-Z0-9-_]+$')
if 'id' in data and not validator.match(data['id']):
raise JsonHttpBadRequest(f'Invalid ability ID {data["id"]}. IDs can only contain '
'alphanumeric characters, hyphens, and underscores.')

# Validate tactic, used for directory creation, lower case if present
validator = re.compile(r'^[a-zA-Z0-9-_]+$')
if 'tactic' in data:
if not validator.match(data['tactic']):
if not isinstance(data['tactic'], str) or not validator.match(data['tactic']):
raise JsonHttpBadRequest(f'Invalid ability tactic {data["tactic"]}. Tactics can only contain '
'alphanumeric characters, hyphens, and underscores.')
data['tactic'] = data['tactic'].lower()

if 'executors' in data and not data.get('executors'):
if 'executors' in data and not data.get('executors') and 'platforms' not in data:
raise JsonHttpBadRequest(f'Cannot create ability {data["id"]}: at least one executor required')

if 'name' in data and not data.get('name'):
raise JsonHttpBadRequest(f'Cannot create ability {data["id"]} due to missing name')

self._validate_ability_privilege(data)
self._validate_ability_executors(data)

def _validate_ability_privilege(self, data: dict):
if 'privilege' not in data:
return

privilege = data.get('privilege')
if privilege is None or privilege == '':
return
if not isinstance(privilege, str):
raise JsonHttpBadRequest(f'Invalid ability privilege {privilege}. Privilege must be one of: '
'User, Elevated.')

allowed_privileges = {privilege.name for privilege in BaseWorld.Privileges}
if privilege not in allowed_privileges:
raise JsonHttpBadRequest(f'Invalid ability privilege {privilege}. Privilege must be one of: '
'User, Elevated.')

def _validate_ability_executors(self, data: dict):
if data.get('executors') is not None:
self._validate_executor_list(data['executors'])
if data.get('platforms') is not None:
self._validate_platform_executor_map(data['platforms'])

def _validate_executor_list(self, executors):
if not isinstance(executors, list):
raise JsonHttpBadRequest('Invalid ability executors. Executors must be a list.')

for index, executor in enumerate(executors):
if not isinstance(executor, dict):
raise JsonHttpBadRequest(f'Invalid ability executor at index {index}. Executor must be a dictionary.')
self._validate_executor_label(executor.get('name'), f'executor[{index}].name')
self._validate_executor_label(executor.get('platform'), f'executor[{index}].platform')
self._validate_payloads(executor.get('payloads'), f'executor[{index}].payloads')

def _validate_platform_executor_map(self, platforms):

Check failure on line 132 in app/api/v2/managers/ability_api_manager.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 19 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=mitre_caldera&issues=AZ3-7OTkPTni1fHIUofH&open=AZ3-7OTkPTni1fHIUofH&pullRequest=3342
if not isinstance(platforms, dict):
raise JsonHttpBadRequest('Invalid ability platforms. Platforms must be a dictionary.')

for platform_names, platform_executors in platforms.items():
for platform_name in self._split_and_validate_labels(platform_names, 'platform'):
if not isinstance(platform_executors, dict):
raise JsonHttpBadRequest(f'Invalid ability platform {platform_name}. Platform executors must be '
'a dictionary.')
for executor_names, executor in platform_executors.items():
for executor_name in self._split_and_validate_labels(executor_names, 'executor'):
if not isinstance(executor, dict):
raise JsonHttpBadRequest(f'Invalid ability executor {executor_name} for platform '
f'{platform_name}. Executor must be a dictionary.')
self._validate_payloads(executor.get('payloads'), f'platforms.{platform_name}.payloads')

@classmethod
def _split_and_validate_labels(cls, value, field_name):
if not isinstance(value, str):
raise JsonHttpBadRequest(f'Invalid ability {field_name} {value}. {field_name.capitalize()} names must be '
'strings.')

labels = [label.strip() for label in value.split(',')]
if not labels or any(not label for label in labels):
raise JsonHttpBadRequest(f'Invalid ability {field_name} {value}. {field_name.capitalize()} names cannot '
'be empty.')

for label in labels:
cls._validate_executor_label(label, field_name)
return labels

@classmethod
def _validate_executor_label(cls, value, field_name):
if not isinstance(value, str) or not value:
raise JsonHttpBadRequest(f'Invalid ability {field_name}. Executor and platform names must be non-empty '
'strings.')
if not cls._EXECUTOR_LABEL_PATTERN.match(value):
raise JsonHttpBadRequest(f'Invalid ability {field_name} {value}. Executor and platform names can only '
'contain alphanumeric characters, periods, hyphens, and underscores.')

@staticmethod
def _validate_payloads(payloads, field_name):
if payloads is None:
return
if not isinstance(payloads, list):
raise JsonHttpBadRequest(f'Invalid ability {field_name}. Payloads must be a list.')

for payload in payloads:
if not isinstance(payload, str):
raise JsonHttpBadRequest(f'Invalid ability payload {payload}. Payload names must be strings.')
safe_filename = FileSvc._validate_filename(payload)
if BaseWorld.is_uuid4(payload) and safe_filename:
continue
if not safe_filename:
raise JsonHttpBadRequest(f'Invalid ability payload {payload}. Payload names cannot contain path '
'separators, traversal sequences, null bytes, or unsafe characters.')

def _create_ability_filepath(self, tactic: str, obj_id: str):
tactic_dir = os.path.join('data', 'abilities', tactic)
if not os.path.exists(tactic_dir):
Expand Down
69 changes: 66 additions & 3 deletions app/objects/c_ability.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,80 @@ class Meta:
delete_payload = ma.fields.Bool(load_default=None)

@ma.pre_load
def fix_id(self, data, **_):
def normalize_ability_file_fields(self, data, **_):
"""
Ensures that ability file fields are formatted correctly for processing
"""
if not isinstance(data, dict):
return data
if 'id' in data:
data['ability_id'] = data.pop('id')
if isinstance(data.get('technique'), dict):
technique = data.pop('technique')
data.setdefault('technique_id', technique.get('attack_id'))
data.setdefault('technique_name', technique.get('name'))
if 'platforms' in data and 'executors' not in data:
data['executors'] = self._platforms_to_executor_list(data.pop('platforms'))
if self._has_legacy_requirements(data.get('requirements')):
data['requirements'] = self._legacy_requirements_to_list(data['requirements'])
return data

@ma.post_load
def build_ability(self, data, **kwargs):
if 'technique' in data:
data['technique_name'] = data.pop('technique')
return None if kwargs.get('partial') is True else Ability(**data)

@staticmethod
def _platforms_to_executor_list(platforms):
"""
Translates legacy platform-structured YAML into caldera executor format
"""
executors = []
if not isinstance(platforms, dict):
raise ma.ValidationError('Platforms must be a dictionary.', 'platforms')
for platform_names, platform_executors in platforms.items():
if not isinstance(platform_executors, dict):
raise ma.ValidationError('Platform executors must be a dictionary.', 'platforms')

platform_list = [name.strip() for name in str(platform_names).split(',')]

for executor_names, executor_data in platform_executors.items():
if not isinstance(executor_data, dict):
raise ma.ValidationError('Executor data must be a dictionary.', 'platforms')
executor = dict(executor_data) # make a dict of the data and fix up below
if isinstance(executor.get('cleanup'), str):
# cleanup actions should be in a list
executor['cleanup'] = [executor['cleanup']]
if isinstance(executor.get('parsers'), dict):
executor['parsers'] = [
{'module': module, 'parserconfigs': parserconfigs}
for module, parserconfigs in executor['parsers'].items()
]

executor_list = [name.strip() for name in str(executor_names).split(',')]
executors.extend(
{**executor, 'platform': platform_name, 'name': executor_name}
for platform_name in platform_list
for executor_name in executor_list
)
return executors

@staticmethod
def _has_legacy_requirements(requirements):
return (
isinstance(requirements, list)
and requirements
and isinstance(requirements[0], dict)
and 'relationship_match' not in requirements[0]
)

@staticmethod
def _legacy_requirements_to_list(requirements):
converted = []
for requirement in requirements:
for module, relationship_match in requirement.items():
converted.append({'module': module, 'relationship_match': relationship_match})
return converted


class Ability(FirstClassObjectInterface, BaseObject):

Expand Down
12 changes: 12 additions & 0 deletions app/objects/secondclass/c_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ class ExecutorSchema(ma.Schema):
variations = ma.fields.List(ma.fields.Nested(VariationSchema()))
additional_info = ma.fields.Dict(keys=ma.fields.String(), values=ma.fields.String())

@ma.validates_schema
def validate_required_executor_fields(self, data, **kwargs):
if kwargs.get('partial') is True:
return

errors = {}
for field_name in ('name', 'platform'):
if not data.get(field_name):
errors[field_name] = ['Missing data for required field.']
if errors:
raise ma.ValidationError(errors)

@ma.post_load
def build_executor(self, data, **_):
return Executor(**data)
Expand Down
Loading
Loading