Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 17 additions & 13 deletions app/api/v2/managers/ability_api_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
import uuid
import os
import yaml

from typing import Any

from app.api.v2.managers.base_api_manager import BaseApiManager
from app.api.v2.responses import JsonHttpBadRequest
from app.objects.c_ability import AbilitySchema
Expand All @@ -14,7 +12,7 @@
class AbilityApiManager(BaseApiManager):
def __init__(self, data_svc, file_svc):
super().__init__(data_svc=data_svc, file_svc=file_svc)

async def create_on_disk_object(self, data: dict, access: dict, ram_key: str, id_property: str, obj_class: type):
self._validate_ability_data(create=True, data=data)
obj_id = data.get('id')
Expand All @@ -23,7 +21,7 @@ async def create_on_disk_object(self, data: dict, access: dict, ram_key: str, id
await self._save_and_reload_object(file_path, data, obj_class, allowed)
await self._data_svc.create_or_update_everything_adversary()
return next(self.find_objects(ram_key, {id_property: obj_id}))

async def replace_on_disk_object(self, obj: Any, data: dict, ram_key: str, id_property: str):
self._validate_ability_data(create=True, data=data)
obj_id = getattr(obj, id_property)
Expand All @@ -33,11 +31,11 @@ async def replace_on_disk_object(self, obj: Any, data: dict, ram_key: str, id_pr
file_path = self._create_ability_filepath(data.get('tactic'), obj_id)
await self._save_and_reload_object(file_path, data, type(obj), obj.access)
return next(self.find_objects(ram_key, {id_property: obj_id}))

async def remove_object_from_disk_by_id(self, identifier: str, ram_key: str):
await super().remove_object_from_disk_by_id(identifier, ram_key)
await self._data_svc.create_or_update_everything_adversary()

async def update_on_disk_object(self, obj: Any, data: dict, ram_key: str, id_property: str, obj_class: type):
obj_id = getattr(obj, id_property)
file_path = await self._get_existing_object_file_path(obj_id, ram_key)
Expand All @@ -49,11 +47,10 @@ async def update_on_disk_object(self, obj: Any, data: dict, ram_key: str, id_pro
file_path = self._create_ability_filepath(data.get('tactic'), obj_id)
await self._save_and_reload_object(file_path, existing_obj_data, obj_class, obj.access)
return next(self.find_objects(ram_key, {id_property: obj_id}))

def _validate_ability_data(self, create: bool, data: dict):
# Correct ability_id key for ability file saving.
data['id'] = data.pop('ability_id', '')

# If a new ability is being created, ensure required fields present.
if create:
# Set ability ID if undefined
Expand All @@ -70,20 +67,27 @@ def _validate_ability_data(self, create: bool, data: dict):
if 'id' in data and not validator.match(data['id']):
raise JsonHttpBadRequest(f'Invalid ability ID {data["id"]}. IDs can only contain '
'alphanumeric characters, hyphens, and underscores.')

# Validate tactic, used for directory creation, lower case if present
if 'tactic' in data:
if not validator.match(data['tactic']):
raise JsonHttpBadRequest(f'Invalid ability tactic {data["tactic"]}. Tactics can only contain '
'alphanumeric characters, hyphens, and underscores.')
data['tactic'] = data['tactic'].lower()

# PATCH: Validate technique_name to block HTML/script injection.
# Allowlist covers all legitimate ATT&CK technique name characters:
# letters, digits, spaces, hyphens, underscores, parentheses,
# slashes, dots, commas, colons, and ampersands.
# Excludes < > " ' ; which are not present in any ATT&CK technique name.
technique_name_validator = re.compile(r'^[a-zA-Z0-9\s\-_()/.,&:]+$')
if data.get('technique_name') and not technique_name_validator.match(data['technique_name']):
raise JsonHttpBadRequest(f'Invalid technique_name "{data["technique_name"]}". technique_name can only '
'contain alphanumeric characters, spaces, hyphens, underscores, '
'parentheses, slashes, dots, commas, colons, and ampersands.')
if 'executors' in data and not data.get('executors'):
raise JsonHttpBadRequest(f'Cannot create ability {data["id"]}: at least one executor required')

if 'name' in data and not data.get('name'):
raise JsonHttpBadRequest(f'Cannot create ability {data["id"]} due to missing name')

def _create_ability_filepath(self, tactic: str, obj_id: str):
tactic_dir = os.path.join('data', 'abilities', tactic)
if not os.path.exists(tactic_dir):
Expand All @@ -94,4 +98,4 @@ async def _save_and_reload_object(self, file_path: str, data: dict, obj_type: ty
await self._file_svc.save_file(file_path, yaml.dump([data], encoding='utf-8', sort_keys=False),
'', encrypt=False)
await self._data_svc.remove('abilities', dict(ability_id=data['id']))
await self._data_svc.load_ability_file(file_path, access)
await self._data_svc.load_ability_file(file_path, access)
Loading