From 69806cd3c9ed210b4606755fad4b79eb2be52b73 Mon Sep 17 00:00:00 2001 From: petruki <31597636+petruki@users.noreply.github.com> Date: Sat, 21 Feb 2026 11:45:51 -0800 Subject: [PATCH] feat: added Regex Match config options for limit and blacklist --- README.md | 6 ++- switcher_client/client.py | 6 ++- switcher_client/lib/globals/global_context.py | 8 ++++ .../lib/utils/timed_match/timed_match.py | 5 +- tests/test_switcher_local.py | 46 +++++++++++++++++-- 5 files changed, 59 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index fbf461b..0617123 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,8 @@ Client.build_context( snapshot_auto_update_interval=3, silent_mode='5m', throttle_max_workers=2, + regex_max_black_list=10, + regex_max_time_limit=100, cert_path='./certs/ca.pem' ) ) @@ -153,8 +155,8 @@ switcher = Client.get_switcher() | `snapshot_auto_update_interval` | `int` | Auto-update interval in seconds (0 = disabled) | `0` | | `silent_mode` | `str` | Silent mode retry time (e.g., '5m' for 5 minutes) | `None` | | `throttle_max_workers` | `int` | Max workers for throttling feature checks | `None` | -| `regex_max_black_list` | `int` | 🚧 TODO - Max cached entries for failed regex | `50` | -| `regex_max_time_limit` | `int` | 🚧 TODO - Regex execution time limit (ms) | `3000` | +| `regex_max_black_list` | `int` | Max cached entries for failed regex | `100` | +| `regex_max_time_limit` | `int` | Regex execution time limit (ms) | `3000` | | `cert_path` | `str` | 🚧 TODO - Path to custom certificate for API connections | `None` | #### Security Features diff --git a/switcher_client/client.py b/switcher_client/client.py index b8ffe1e..ad09397 100644 --- a/switcher_client/client.py +++ b/switcher_client/client.py @@ -11,6 +11,8 @@ from .switcher import Switcher class SwitcherOptions: + REGEX_MAX_BLACK_LIST = 'regex_max_black_list' + REGEX_MAX_TIME_LIMIT = 'regex_max_time_limit' SNAPSHOT_AUTO_UPDATE_INTERVAL = 'snapshot_auto_update_interval' SILENT_MODE = 'silent_mode' @@ -53,7 +55,9 @@ def build_context( def _build_options(options: ContextOptions): options_handler = { SwitcherOptions.SNAPSHOT_AUTO_UPDATE_INTERVAL: lambda: Client.schedule_snapshot_auto_update(), - SwitcherOptions.SILENT_MODE: lambda: Client._init_silent_mode(get(options.silent_mode, '')) + SwitcherOptions.SILENT_MODE: lambda: Client._init_silent_mode(get(options.silent_mode, '')), + SwitcherOptions.REGEX_MAX_BLACK_LIST: lambda: TimedMatch.set_max_blacklisted(options.regex_max_black_list), + SwitcherOptions.REGEX_MAX_TIME_LIMIT: lambda: TimedMatch.set_max_time_limit(options.regex_max_time_limit) } for option_key, handler in options_handler.items(): diff --git a/switcher_client/lib/globals/global_context.py b/switcher_client/lib/globals/global_context.py index d256557..9e065ca 100644 --- a/switcher_client/lib/globals/global_context.py +++ b/switcher_client/lib/globals/global_context.py @@ -4,6 +4,8 @@ DEFAULT_LOCAL = False DEFAULT_LOGGER = False DEFAULT_FREEZE = False +DEFAULT_REGEX_MAX_BLACKLISTED = 100 +DEFAULT_REGEX_MAX_TIME_LIMIT = 3000 class ContextOptions: """ @@ -14,12 +16,16 @@ class ContextOptions: :param snapshot_auto_update_interval: The interval in milliseconds to auto-update the snapshot. If not set, it will not auto-update the snapshot :param silent_mode: When defined it will switch to local during the specified time before it switches back to remote, e.g. 5s (s: seconds - m: minutes - h: hours) :param throttle_max_workers: The maximum number of workers to use for background refresh when throttle is enabled. If None, the default value is based on the number of CPUs. Default is None + :param regex_max_black_list: The maximum number of blacklisted regex inputs. If not set, it will use the default value of 100 + :param regex_max_time_limit: The maximum time limit in milliseconds for regex matching. If not set, it will use the default value of 3000 ms """ def __init__(self, local = DEFAULT_LOCAL, logger = DEFAULT_LOGGER, freeze = DEFAULT_FREEZE, + regex_max_black_list = DEFAULT_REGEX_MAX_BLACKLISTED, + regex_max_time_limit = DEFAULT_REGEX_MAX_TIME_LIMIT, snapshot_location: Optional[str] = None, snapshot_auto_update_interval: Optional[int] = None, silent_mode: Optional[str] = None, @@ -31,6 +37,8 @@ def __init__(self, self.snapshot_auto_update_interval = snapshot_auto_update_interval self.silent_mode = silent_mode self.throttle_max_workers = throttle_max_workers + self.regex_max_black_list = regex_max_black_list + self.regex_max_time_limit = regex_max_time_limit class Context: def __init__(self, diff --git a/switcher_client/lib/utils/timed_match/timed_match.py b/switcher_client/lib/utils/timed_match/timed_match.py index be164b0..01e597c 100644 --- a/switcher_client/lib/utils/timed_match/timed_match.py +++ b/switcher_client/lib/utils/timed_match/timed_match.py @@ -4,12 +4,9 @@ from typing import List, Optional, Any from dataclasses import dataclass +from ...globals.global_context import DEFAULT_REGEX_MAX_BLACKLISTED, DEFAULT_REGEX_MAX_TIME_LIMIT from .worker import TaskType, WorkerResult, WorkerTask, persistent_regex_worker -# Default constants -DEFAULT_REGEX_MAX_TIME_LIMIT = 3000 # 3 seconds in milliseconds -DEFAULT_REGEX_MAX_BLACKLISTED = 100 - @dataclass class Blacklist: """Represents a blacklisted regex pattern and input combination.""" diff --git a/tests/test_switcher_local.py b/tests/test_switcher_local.py index 60b75ee..4899457 100644 --- a/tests/test_switcher_local.py +++ b/tests/test_switcher_local.py @@ -1,8 +1,10 @@ import json +from time import time + from switcher_client.client import Client, ContextOptions from switcher_client.errors import LocalCriteriaError -from switcher_client.lib.utils.timed_match.timed_match import TimedMatch +from switcher_client.lib.utils.timed_match.timed_match import DEFAULT_REGEX_MAX_BLACKLISTED, DEFAULT_REGEX_MAX_TIME_LIMIT async_error = None @@ -76,8 +78,7 @@ def test_local_with_strategy_no_matching_input(): """ Should return disabled when no matching input is provided for the strategy """ # given - TimedMatch.set_max_time_limit(100) - given_context('tests/snapshots') + given_context(snapshot_location='tests/snapshots', regex_max_time_limit=100) Client.load_snapshot() switcher = Client.get_switcher() @@ -90,6 +91,37 @@ def test_local_with_strategy_no_matching_input(): # teardown Client.clear_resources() +def test_local_with_strategy_redos_input(): + """ Should return disabled when ReDoS input is provided for the strategy """ + + # given + regex_input = 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' + given_context(snapshot_location='tests/snapshots', + regex_max_black_list=1, + regex_max_time_limit=100) + Client.load_snapshot() + + switcher = Client.get_switcher() + + # test + current_time_ms = int(time() * 1000) + assert switcher \ + .check_regex(regex_input) \ + .is_on('FF2FOR2024') is False + elapsed_time_ms = int(time() * 1000) - current_time_ms + assert elapsed_time_ms < 150 # Within TimedMatch time limit (100ms) + margin + + # test blacklisted input + current_time_ms = int(time() * 1000) + assert switcher \ + .check_regex(regex_input) \ + .is_on('FF2FOR2024') is False + elapsed_time_ms = int(time() * 1000) - current_time_ms + assert elapsed_time_ms < 50 # Should be fast on second attempt due to blacklist + + # teardown + Client.clear_resources() + def test_local_domain_disabled(): """ Should return disabled when domain is deactivated """ @@ -192,13 +224,17 @@ def test_local_no_snapshot(): # Helpers -def given_context(snapshot_location: str, environment: str = 'default') -> None: +def given_context(snapshot_location: str, environment: str = 'default', + regex_max_black_list = DEFAULT_REGEX_MAX_BLACKLISTED, + regex_max_time_limit = DEFAULT_REGEX_MAX_TIME_LIMIT): Client.build_context( domain='Playground', environment=environment, options=ContextOptions( local=True, logger=True, - snapshot_location=snapshot_location + snapshot_location=snapshot_location, + regex_max_black_list=regex_max_black_list, + regex_max_time_limit=regex_max_time_limit ) ) \ No newline at end of file