diff --git a/attack_surface_approximation/arguments_fuzzing/fuzzer.py b/attack_surface_approximation/arguments_fuzzing/fuzzer.py index e622fe0..8f9320d 100644 --- a/attack_surface_approximation/arguments_fuzzing/fuzzer.py +++ b/attack_surface_approximation/arguments_fuzzing/fuzzer.py @@ -56,11 +56,15 @@ def __generate_baseline_hashes(self) -> typing.Generator[str, None, None]: for argument in arguments: analysis_result = self.analysis.analyze(argument) - yield analysis_result.bbs_hash + if analysis_result.bbs_hash is not None: + yield analysis_result.bbs_hash def __check_if_argument_is_valid( self, argument: ArgumentsPair, result: QBDIAnalysis - ) -> None: + ) -> bool: + if result.bbs_hash is None: + return False + if ( argument.get_roles_based_on_analysis(result, self.baseline_hashes) and result.bbs_hash not in self.old_hashes # noqa: W503 @@ -91,7 +95,8 @@ def get_valid_argument( # generates a different hash than the baseline ones, it will be detected # as a false flag because of the sequence generation: --flag first, --flag # afterwards. - self.old_hashes.append(result.bbs_hash) + if result.bbs_hash is not None: + self.old_hashes.append(result.bbs_hash) self.arguments_generator.update_last_analysis_result(result) diff --git a/attack_surface_approximation/arguments_fuzzing/fuzzing_sequence_generator.py b/attack_surface_approximation/arguments_fuzzing/fuzzing_sequence_generator.py index 5a5be19..90a1efd 100644 --- a/attack_surface_approximation/arguments_fuzzing/fuzzing_sequence_generator.py +++ b/attack_surface_approximation/arguments_fuzzing/fuzzing_sequence_generator.py @@ -78,14 +78,15 @@ def generate_fuzzing_arguments( ) -> ArgumentsGenerator: arg = FileArgument(self.canary_filename) yield arg - if ArgumentRole.FILE_ENABLER not in arg.get_roles_based_on_analysis( - self.last_analysis_result, bbs_hashes_baseline - ): - for argument in self.arguments: - yield ArgumentPlusFileArgument(argument, self.canary_filename) yield ArgumentArgument("-") for argument in self.arguments: yield ArgumentArgument(argument) yield ArgumentStringArgument(argument, self.canary_string) + + if ArgumentRole.FILE_ENABLER not in arg.get_roles_based_on_analysis( + self.last_analysis_result, bbs_hashes_baseline + ): + for argument in self.arguments: + yield ArgumentPlusFileArgument(argument, self.canary_filename) diff --git a/attack_surface_approximation/arguments_fuzzing/qbdi_analysis.py b/attack_surface_approximation/arguments_fuzzing/qbdi_analysis.py index 4b9b402..46e257c 100644 --- a/attack_surface_approximation/arguments_fuzzing/qbdi_analysis.py +++ b/attack_surface_approximation/arguments_fuzzing/qbdi_analysis.py @@ -121,11 +121,23 @@ def __create_container(self) -> None: f"sudo chmod 555 {self.__configuration.CONTAINER_EXECUTABLE}" ) + # Ensure the results directory is writable by everyone self.__container.exec_run( + f"mkdir -p {self.__configuration.CONTAINER_RESULTS_FOLDER}" + ) + self.__container.exec_run( + f"sudo chmod 777 {self.__configuration.CONTAINER_RESULTS_FOLDER}" + ) + + self.__container.exec_run( + "rm -f CMakeCache.txt libqbdi_tracer.so", + workdir=self.__configuration.CONTAINER_SO_FOLDER, + ) + cmake_result = self.__container.exec_run( "cmake .", workdir=self.__configuration.CONTAINER_SO_FOLDER, ) - self.__container.exec_run( + make_result = self.__container.exec_run( "make", workdir=self.__configuration.CONTAINER_SO_FOLDER, ) @@ -153,11 +165,11 @@ def __build_analyze_command( stringified_arguments = argument.to_str() stdin_avoidance_command = "echo '\n' |" if timeout_retry else "" - return ( # TODO: {self.__configuration.CONTAINER_EXECUTABLE} + return ( f"timeout {self.timeout} sh -c " f"'{stdin_avoidance_command} LD_BIND_NOW=1 " "LD_PRELOAD=./libqbdi_tracer.so " - "uname " + f"{self.__configuration.CONTAINER_EXECUTABLE} " f"{stringified_arguments}'" ) @@ -187,7 +199,12 @@ def __run_analysis( raw_result = self.__build_and_run_analyze_command( argument, timeout_retry ) - print(raw_result.output) # TODO: remove + + # Ensure the result file is readable by the host user + argument_identifier = argument.to_hex_id() + self.__container.exec_run( + f"chmod 666 {os.path.join(self.__configuration.CONTAINER_RESULTS_FOLDER, argument_identifier)}" + ) result_filename = self.__get_analysis_result_filename(argument) bbs_count, bbs_hash, uses_file = self.__parse_raw_output( diff --git a/attack_surface_approximation/arguments_fuzzing/qbdi_analysis_scripts/qbdi_preload_template.c b/attack_surface_approximation/arguments_fuzzing/qbdi_analysis_scripts/qbdi_preload_template.c index fa04b9d..291135e 100644 --- a/attack_surface_approximation/arguments_fuzzing/qbdi_analysis_scripts/qbdi_preload_template.c +++ b/attack_surface_approximation/arguments_fuzzing/qbdi_analysis_scripts/qbdi_preload_template.c @@ -17,6 +17,7 @@ #define BLOCKS_USED_IN_HASH 10000 #define MAX_ARGS_LENGTH 100 #define OUTPUT_FOLDER "results/" +#define HASH_BUF_SIZE (BLOCKS_USED_IN_HASH * 10) // Enough for 10000 hex ints /* Structures */ @@ -81,7 +82,7 @@ char *encode_command_line(const unsigned char *command, size_t length) { static VMAction show_basic_block_callback(VMInstanceRef vm, const VMState* vmState, GPRState* gprState, FPRState* fprState, void* data) { size_t start_address, end_address; int abstract_address; - char parent_segment = -1, i; + int parent_segment = -1, i; // Check if the program reached main if (!start_trace) return QBDI_CONTINUE; @@ -102,6 +103,10 @@ static VMAction show_basic_block_callback(VMInstanceRef vm, const VMState* vmSta } } + // Safety check: if parent segment not found, skip this block + if (parent_segment == -1) + return QBDI_CONTINUE; + // Compute the abstract address start_address -= segments[parent_segment].start; abstract_address = (parent_segment << 24) + start_address; @@ -173,10 +178,14 @@ int qbdipreload_on_main(int argc, char **argv) { // Copy the arguments for (i = 1; i < argc; i++) { + if (strlen(command_line) + strlen(argv[i]) + 2 >= MAX_ARGS_LENGTH) + break; strcat(command_line, argv[i]); strcat(command_line, " "); } - command_line[strlen(command_line) - 1] = '\0'; + if (argc > 1 && strlen(command_line) > 0) { + command_line[strlen(command_line) - 1] = '\0'; + } return QBDIPRELOAD_NOT_HANDLED; } @@ -184,7 +193,7 @@ int qbdipreload_on_main(int argc, char **argv) { void get_segments() { qbdi_MemoryMap *maps; size_t maps_count; - int i; + int i, j = 0; // Get the memory maps maps = qbdi_getCurrentProcessMaps(false, &maps_count); @@ -202,8 +211,9 @@ void get_segments() { // Store the segments for (i = 0; i < maps_count; i++) { if (maps[i].permission >= QBDI_PF_EXEC && maps[i].end < MIN_MAPPED_ADDRESS) { - segments[i].start = maps[i].start; - segments[i].end = maps[i].end; + segments[j].start = maps[i].start; + segments[j].end = maps[i].end; + j++; } } } @@ -225,16 +235,19 @@ int qbdipreload_on_run(VMInstanceRef vm, rword start, rword stop) { int qbdipreload_on_exit(int status) { FILE *output_file; - char hashed[2 * BLOCKS_USED_IN_HASH * sizeof(int)] = {'\0'}; - char current_hash[2 * sizeof(int)]; + char *hashed = malloc(HASH_BUF_SIZE); + char current_hash[16]; char output_filename[2 * MAX_ARGS_LENGTH + sizeof(OUTPUT_FOLDER) + 1] = {'\0'}; int *p; int i = 0; - char uses_canaries_str; + + if (!hashed) return QBDIPRELOAD_NO_ERROR; + memset(hashed, 0, HASH_BUF_SIZE); // Create the string to be hashed for (p = (int*)utarray_front(blocks); p != NULL && i < BLOCKS_USED_IN_HASH; p = (int*)utarray_next(blocks, p), i++) { - sprintf(current_hash, "%x", *p); + int written = sprintf(current_hash, "%x", *p); + if (strlen(hashed) + written >= HASH_BUF_SIZE - 1) break; strcat(hashed, current_hash); } @@ -242,7 +255,11 @@ int qbdipreload_on_exit(int status) { strcat(output_filename, OUTPUT_FOLDER); strcat(output_filename, encode_command_line(command_line, strlen(command_line))); output_file = fopen(output_filename, "w"); - fprintf(output_file, "%d %ld %d", utarray_len(blocks), hash(hashed), uses_canaries); + if (output_file) { + fprintf(output_file, "%d %lu %d", utarray_len(blocks), hash(hashed), uses_canaries); + fclose(output_file); + } + free(hashed); return QBDIPRELOAD_NO_ERROR; } \ No newline at end of file diff --git a/attack_surface_approximation/cli.py b/attack_surface_approximation/cli.py index 1d0984c..8cc45e7 100644 --- a/attack_surface_approximation/cli.py +++ b/attack_surface_approximation/cli.py @@ -54,6 +54,10 @@ def cli() -> None: ), ) def generate(heuristic: str, output: str, top: int, elf: str = None) -> None: + if heuristic == "binary_pattern_matching" and elf is None: + print("[ERROR] The 'binary_pattern_matching' heuristic requires an ELF file. Please provide one using the --elf option.") + return + generator = ArgumentsGenerator() generator.generate(heuristic, elf) arguments_count = generator.dump(output, top_count=top) @@ -63,6 +67,11 @@ def generate(heuristic: str, output: str, top: int, elf: str = None) -> None: ) +def run_detection(elf: str) -> typing.List[InputStreams]: + detector = InputStreamsDetector(elf) + return detector.detect_all() + + @cli.command( help="Statically detect what input streams are used by an executable." ) @@ -73,13 +82,11 @@ def generate(heuristic: str, output: str, top: int, elf: str = None) -> None: help="ELF Executable", ) def detect(elf: str) -> None: - detector = InputStreamsDetector(elf) - streams = detector.detect_all() - + streams = run_detection(elf) print_detected_streams(streams) -def print_detected_streams(streams: InputStreams) -> None: +def print_detected_streams(streams: typing.List[InputStreams]) -> None: if not any(streams): print_no_detected_stream() else: @@ -90,13 +97,22 @@ def print_no_detected_stream() -> None: print("No input mechanism was detected for the given program.") -def print_multiple_detected_streams(streams: dict) -> None: +def print_multiple_detected_streams(streams: typing.List[InputStreams]) -> None: print("Several input mechanisms were detected for the given program:\n") table = build_detected_streams_table(streams) print(table) +def run_fuzzing(elf: str, dictionary: str) -> typing.List[ArgumentsPair]: + generator = ArgumentsGenerator() + generator.load(dictionary) + possible_arguments = generator.get_arguments() + + fuzzer = ArgumentsFuzzer(elf, possible_arguments) + return fuzzer.get_all_valid_arguments() + + @cli.command(help="Fuzz the arguments of an executable.") @click.option( "--elf", @@ -111,13 +127,7 @@ def print_multiple_detected_streams(streams: dict) -> None: help="Arguments dictionary", ) def fuzz(elf: str, dictionary: str) -> None: - generator = ArgumentsGenerator() - generator.load(dictionary) - possible_arguments = generator.get_arguments() - - fuzzer = ArgumentsFuzzer(elf, possible_arguments) - actual_arguments = fuzzer.get_all_valid_arguments() - + actual_arguments = run_fuzzing(elf, dictionary) print_arguments(actual_arguments) @@ -157,7 +167,7 @@ def build_arguments_table(arguments: typing.List[ArgumentsPair]) -> Table: return table -def build_detected_streams_table(streams: dict) -> Table: +def build_detected_streams_table(streams: typing.List[InputStreams]) -> Table: table = Table() table.add_column("Stream") @@ -183,11 +193,13 @@ def build_detected_streams_table(streams: dict) -> Table: required=True, help="Arguments dictionary", ) -@click.pass_context -def analyze(ctx: click.Context, elf: str, dictionary: str) -> None: - ctx.invoke(detect, elf=elf) +def analyze(elf: str, dictionary: str) -> None: + streams = run_detection(elf) + actual_arguments = run_fuzzing(elf, dictionary) + + print_detected_streams(streams) print("") - ctx.invoke(fuzz, elf=elf, dictionary=dictionary) + print_arguments(actual_arguments) def main() -> None: diff --git a/attack_surface_approximation/dictionaries_generators/heuristics/man_parsing.py b/attack_surface_approximation/dictionaries_generators/heuristics/man_parsing.py index 54c9591..7f56505 100644 --- a/attack_surface_approximation/dictionaries_generators/heuristics/man_parsing.py +++ b/attack_surface_approximation/dictionaries_generators/heuristics/man_parsing.py @@ -21,17 +21,17 @@ def __get_arguments_from_manual( filter_func: typing.Callable, unescape: typing.Callable = None, ) -> typing.Generator[str, None, None]: - with gzip.open(filename, "rt") as manual: - try: + try: + with gzip.open(filename, "rt") as manual: content = manual.read() - except UnicodeDecodeError: - return + except (UnicodeDecodeError, FileNotFoundError, OSError): + return - if unescape: - content = unescape(content) + if unescape: + content = unescape(content) - arguments = filter_func(content) - yield from arguments + arguments = filter_func(content) + yield from arguments def generate(_: str = None) -> typing.List[str]: diff --git a/pyproject.toml b/pyproject.toml index d0d6b62..ea2c127 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,19 +5,20 @@ authors = ["OpenCRS"] version = "0.1.0" [tool.poetry.dependencies] -python = "^3.12" -pycparser = "^2.21" -pyelftools = "^0.28" -docker = "^6.1.2" -rich = "^12.5.1" -click = "^8.1.3" +python = "==3.12.7" +pycparser = "==2.23" +pyelftools = "==0.29" +docker = "==7.1.0" +rich = "==12.6.0" +click = "==8.3.3" +commons = {path = "../commons", develop = false} [tool.poetry.dev-dependencies] -black = "^22.6.0" -isort = "^5.10.1" -pylint = "^2.14.4" -pyproject-flake8 = "^0.0.1-alpha.5" -flake8-annotations = "^2.9.1" +black = "==26.3.1" +isort = "==5.13.2" +pylint = "==2.17.7" +pyproject-flake8 = "==0.0.1a5" +flake8-annotations = "==2.9.1" [tool.poetry.scripts] attack_surface_approximation = "attack_surface_approximation.cli:main"