Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions front/plugins/fritzbox/fritzbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,16 @@ def check_guest_wifi_status(fc, guest_service_num):
return guest_info


def create_guest_wifi_device(fc, host):
def create_guest_wifi_device(fc):
"""
Create a synthetic device entry for guest WiFi.
Derives a deterministic fake MAC from the Fritz!Box hardware MAC address.
Falls back to the configured host string if the MAC cannot be retrieved.
Falls back to a fixed sentinel string if the MAC cannot be retrieved.
Returns: Device dictionary
"""
try:
fritzbox_mac = fc.call_action('DeviceInfo:1', 'GetInfo').get('NewMACAddress', '')
guest_mac = string_to_fake_mac(normalize_mac(fritzbox_mac) if fritzbox_mac else host)
guest_mac = string_to_fake_mac(normalize_mac(fritzbox_mac) if fritzbox_mac else 'FRITZBOX_GUEST')
Comment on lines 177 to +178
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Harden fallback when MAC is present-but-invalid/blank.

At Line 178, fallback only checks raw truthiness. Values like whitespace can normalize to an empty string and produce a hash seed of "" instead of the sentinel fallback.

Proposed fix
-        fritzbox_mac = fc.call_action('DeviceInfo:1', 'GetInfo').get('NewMACAddress', '')
-        guest_mac = string_to_fake_mac(normalize_mac(fritzbox_mac) if fritzbox_mac else 'FRITZBOX_GUEST')
+        fritzbox_mac = fc.call_action('DeviceInfo:1', 'GetInfo').get('NewMACAddress', '')
+        normalized_mac = normalize_mac(fritzbox_mac) if fritzbox_mac else ''
+        guest_mac = string_to_fake_mac(normalized_mac if normalized_mac else 'FRITZBOX_GUEST')
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@front/plugins/fritzbox/fritzbox.py` around lines 177 - 178, The current
fallback uses raw truthiness of fritzbox_mac which allows values like whitespace
or an invalid MAC to slip through; update the guest_mac creation by first
calling normalize_mac(fritzbox_mac) (or '' if fritzbox_mac is None), then check
the normalized result (e.g., truthy after strip) and only use it if valid,
otherwise use the sentinel 'FRITZBOX_GUEST' as the seed passed into
string_to_fake_mac; modify the code around fritzbox_mac and guest_mac so
string_to_fake_mac receives either the normalized MAC or the sentinel, not the
original raw value.


device = {
'mac_address': guest_mac,
Expand Down Expand Up @@ -224,7 +224,7 @@ def main():
if report_guest:
guest_status = check_guest_wifi_status(fc, guest_service)
if guest_status['active']:
guest_device = create_guest_wifi_device(fc, host)
guest_device = create_guest_wifi_device(fc)
if guest_device:
device_data.append(guest_device)

Expand Down
7 changes: 7 additions & 0 deletions server/scan/session_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
from messaging.in_app import update_unread_notifications_count
from const import NULL_EQUIVALENTS_SQL

# Predicate used in every negative-event INSERT to skip forced-online devices.
# Centralised here so all three event paths stay in sync.
_SQL_NOT_FORCED_ONLINE = "LOWER(COALESCE(devForceStatus, '')) != 'online'"


# Make sure log level is initialized correctly
Logger(get_setting_value("LOG_LEVEL"))
Expand Down Expand Up @@ -179,6 +183,7 @@ def insert_events(db):
WHERE devAlertDown != 0
AND devCanSleep = 0
AND devPresentLastScan = 1
AND {_SQL_NOT_FORCED_ONLINE}
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE devMac = scanMac
) """)
Expand All @@ -194,6 +199,7 @@ def insert_events(db):
AND devCanSleep = 1
AND devIsSleeping = 0
AND devPresentLastScan = 0
AND {_SQL_NOT_FORCED_ONLINE}
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE devMac = scanMac)
AND NOT EXISTS (SELECT 1 FROM Events
Expand Down Expand Up @@ -229,6 +235,7 @@ def insert_events(db):
FROM Devices
WHERE devAlertDown = 0
AND devPresentLastScan = 1
AND {_SQL_NOT_FORCED_ONLINE}
AND NOT EXISTS (SELECT 1 FROM CurrentScan
WHERE devMac = scanMac
) """)
Expand Down
10 changes: 7 additions & 3 deletions test/db_test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def insert_device(
can_sleep: int = 0,
last_connection: str | None = None,
last_ip: str = "192.168.1.1",
force_status: str | None = None,
) -> None:
"""
Insert a minimal Devices row.
Expand All @@ -189,16 +190,19 @@ def insert_device(
ISO-8601 UTC string; defaults to 60 minutes ago when omitted.
last_ip:
Value stored in devLastIP.
force_status:
Value for devForceStatus (``'online'``, ``'offline'``, or ``None``/
``'dont_force'``).
"""
cur.execute(
"""
INSERT INTO Devices
(devMac, devAlertDown, devPresentLastScan, devCanSleep,
devLastConnection, devLastIP, devIsArchived, devIsNew)
VALUES (?, ?, ?, ?, ?, ?, 0, 0)
devLastConnection, devLastIP, devIsArchived, devIsNew, devForceStatus)
VALUES (?, ?, ?, ?, ?, ?, 0, 0, ?)
""",
(mac, alert_down, present_last_scan, can_sleep,
last_connection or minutes_ago(60), last_ip),
last_connection or minutes_ago(60), last_ip, force_status),
)


Expand Down
49 changes: 20 additions & 29 deletions test/plugins/test_fritzbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
created during tests.
"""

import hashlib
import sys
import os
from unittest.mock import patch, MagicMock

from utils.crypto_utils import string_to_fake_mac

import pytest

# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -59,19 +60,12 @@ def _make_host_entry(mac="AA:BB:CC:DD:EE:FF", ip="192.168.1.10",
@pytest.fixture
def mock_fritz_hosts():
"""
Patches fritzconnection.lib.fritzhosts in sys.modules so that
fritzbox.get_connected_devices() uses a controllable FritzHosts mock.
Yields the FritzHosts *instance* (what FritzHosts(fc) returns).
Patches fritzbox.FritzHosts so that get_connected_devices() uses a
controllable mock. Yields the FritzHosts *instance* (what FritzHosts(fc)
returns).
"""
hosts_instance = MagicMock()
fritz_hosts_module = MagicMock()
fritz_hosts_module.FritzHosts = MagicMock(return_value=hosts_instance)

with patch.dict("sys.modules", {
"fritzconnection": MagicMock(),
"fritzconnection.lib": MagicMock(),
"fritzconnection.lib.fritzhosts": fritz_hosts_module,
}):
with patch("fritzbox.FritzHosts", return_value=hosts_instance):
yield hosts_instance


Expand Down Expand Up @@ -229,12 +223,15 @@ def test_returns_device_dict(self):
assert device["active_status"] == "Active"
assert device["interface_type"] == "Access Point"
assert device["ip_address"] == ""
# MAC must match string_to_fake_mac output (fa:ce: prefix)
assert device["mac_address"].startswith("fa:ce:")

def test_guest_mac_has_locally_administered_bit(self):
"""First byte must be 0x02 — locally-administered, unicast."""
"""The locally-administered bit (0x02) must be set in the first byte.
string_to_fake_mac uses the 'fa:ce:' prefix; 0xFA & 0x02 == 0x02."""
device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF"))
first_byte = int(device["mac_address"].split(":")[0], 16)
assert first_byte == 0x02
assert first_byte & 0x02 != 0

def test_guest_mac_format_is_valid(self):
"""MAC must be 6 colon-separated lowercase hex pairs."""
Expand All @@ -258,11 +255,11 @@ def test_different_fritzbox_macs_produce_different_guest_macs(self):
assert mac_a != mac_b

def test_no_fritzbox_mac_uses_fallback(self):
"""When DeviceInfo returns no MAC, fall back to 02:00:00:00:00:01."""
"""When DeviceInfo returns no MAC, fall back to a sentinel-derived MAC."""
fc = MagicMock()
fc.call_action.return_value = {"NewMACAddress": ""}
device = fritzbox.create_guest_wifi_device(fc)
assert device["mac_address"] == "02:00:00:00:00:01"
assert device["mac_address"] == string_to_fake_mac("FRITZBOX_GUEST")

def test_device_info_exception_returns_none(self):
"""If DeviceInfo call raises, create_guest_wifi_device must return None."""
Expand All @@ -274,12 +271,11 @@ def test_device_info_exception_returns_none(self):
def test_known_mac_produces_known_guest_mac(self):
"""
Regression anchor: for a fixed Fritz!Box MAC, the expected guest MAC
is precomputed here independently. If the hashing logic in
fritzbox.py changes, this test fails immediately.
is derived via string_to_fake_mac(normalize_mac(...)). If the hashing
logic in fritzbox.py or string_to_fake_mac changes, this test fails.
"""
fritzbox_mac = "aa:bb:cc:dd:ee:ff" # normalize_mac output of "AA:BB:CC:DD:EE:FF"
digest = hashlib.md5(f"GUEST:{fritzbox_mac}".encode()).digest()
expected = "02:" + ":".join(f"{b:02x}" for b in digest[:5])
fritzbox_mac = normalize_mac("AA:BB:CC:DD:EE:FF")
expected = string_to_fake_mac(fritzbox_mac)

device = fritzbox.create_guest_wifi_device(self._fc_with_mac("AA:BB:CC:DD:EE:FF"))
assert device["mac_address"] == expected
Expand All @@ -296,10 +292,8 @@ def test_successful_connection(self):
fc_instance.modelname = "FRITZ!Box 7590"
fc_instance.system_version = "7.57"
fc_class = MagicMock(return_value=fc_instance)
fc_module = MagicMock()
fc_module.FritzConnection = fc_class

with patch.dict("sys.modules", {"fritzconnection": fc_module}):
with patch("fritzbox.FritzConnection", fc_class):
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)

assert result is fc_instance
Expand All @@ -308,16 +302,13 @@ def test_successful_connection(self):
)

def test_import_error_returns_none(self):
with patch.dict("sys.modules", {"fritzconnection": None}):
with patch("fritzbox.FritzConnection", side_effect=ImportError("fritzconnection not found")):
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)

assert result is None

def test_connection_exception_returns_none(self):
fc_module = MagicMock()
fc_module.FritzConnection.side_effect = Exception("Connection refused")

with patch.dict("sys.modules", {"fritzconnection": fc_module}):
with patch("fritzbox.FritzConnection", side_effect=Exception("Connection refused")):
result = fritzbox.get_fritzbox_connection("fritz.box", 49443, "admin", "pass", True)

assert result is None
Expand Down
119 changes: 119 additions & 0 deletions test/scan/test_down_sleep_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,122 @@ def test_online_history_down_count_excludes_sleeping(self):
assert count == 1, (
f"Expected 1 down device (sleeping device must not be counted), got {count}"
)


# ---------------------------------------------------------------------------
# Layer 1c: insert_events() — forced-online device suppression
#
# Devices with devForceStatus='online' are always considered present by the
# operator. Generating 'Device Down' or 'Disconnected' events for them causes
# spurious flapping detection (devFlapping counts these events in DevicesView).
#
# Affected queries in insert_events():
# 1a Device Down (non-sleeping) — DevicesView query
# 1b Device Down (sleep-expired) — DevicesView query
# 3 Disconnected — Devices table query
# ---------------------------------------------------------------------------

class TestInsertEventsForceOnline:
"""
Regression tests: forced-online devices must never generate
'Device Down' or 'Disconnected' events.
"""

def test_forced_online_no_device_down_event(self):
"""
devForceStatus='online', devAlertDown=1, absent from CurrentScan.
Must NOT produce a 'Device Down' event (regression: used to fire and
cause devFlapping=1 after the threshold was reached).
"""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:01", alert_down=1, present_last_scan=1,
force_status="online")
conn.commit()

insert_events(DummyDB(conn))

assert "ff:00:00:00:00:01" not in _down_event_macs(cur), (
"forced-online device must never generate a 'Device Down' event"
)

def test_forced_online_sleep_expired_no_device_down_event(self):
"""
devForceStatus='online', devCanSleep=1, sleep window expired.
Must NOT produce a 'Device Down' event via the sleep-expired path.
"""
conn = _make_db(sleep_minutes=30)
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:02", alert_down=1, present_last_scan=0,
can_sleep=1, last_connection=_minutes_ago(45),
force_status="online")
conn.commit()

insert_events(DummyDB(conn))

assert "ff:00:00:00:00:02" not in _down_event_macs(cur), (
"forced-online sleeping device must not get 'Device Down' after sleep expires"
)

def test_forced_online_no_disconnected_event(self):
"""
devForceStatus='online', devAlertDown=0 (Disconnected path), absent.
Must NOT produce a 'Disconnected' event.
"""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:03", alert_down=0, present_last_scan=1,
force_status="online")
conn.commit()

insert_events(DummyDB(conn))

cur.execute(
"SELECT COUNT(*) AS cnt FROM Events "
"WHERE eveMac = 'ff:00:00:00:00:03' AND eveEventType = 'Disconnected'"
)
assert cur.fetchone()["cnt"] == 0, (
"forced-online device must never generate a 'Disconnected' event"
)

def test_forced_online_uppercase_no_device_down_event(self):
"""devForceStatus='ONLINE' (uppercase) must also be suppressed."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:04", alert_down=1, present_last_scan=1,
force_status="ONLINE")
conn.commit()

insert_events(DummyDB(conn))

assert "ff:00:00:00:00:04" not in _down_event_macs(cur), (
"forced-online device (uppercase) must never generate a 'Device Down' event"
)

def test_dont_force_still_fires_device_down(self):
"""devForceStatus='dont_force' must behave normally — event fires."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:05", alert_down=1, present_last_scan=1,
force_status="dont_force")
conn.commit()

insert_events(DummyDB(conn))

assert "ff:00:00:00:00:05" in _down_event_macs(cur), (
"dont_force device must still generate 'Device Down' when absent"
)

def test_forced_offline_still_fires_device_down(self):
"""devForceStatus='offline' suppresses nothing — event fires."""
conn = _make_db()
cur = conn.cursor()
_insert_device(cur, "ff:00:00:00:00:06", alert_down=1, present_last_scan=1,
force_status="offline")
conn.commit()

insert_events(DummyDB(conn))

assert "ff:00:00:00:00:06" in _down_event_macs(cur), (
"forced-offline device must still generate 'Device Down' when absent"
)
Loading