Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/hubble_satnet_decoder/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""hubble-satnet-decoder — Hubble PHY preamble detector, FSK decoder, spectrogram."""

from .chipset import get_chipset_stats, reset_chipset_stats
from . import constants
from .constants import (
CHANNEL_SPACING,
DEVICE_CHANNEL_SPACING,
Expand All @@ -11,6 +12,17 @@
SYNTH_RES,
configure,
)


def enable_collision_logging(enabled: bool = True) -> None:
"""Enable or disable collision detection logging.

When enabled, prints [COLLISION] messages when secondary peaks
are detected during symbol demodulation (indicates potential
packet collisions or interference).
"""
constants.LOG_COLLISIONS = enabled

from .decoder import decode_signal
from .detector import detect_preambles
from .spectrogram import compute_spec_chunk
Expand All @@ -27,6 +39,7 @@
"configure",
"decode_signal",
"detect_preambles",
"enable_collision_logging",
"get_chipset_stats",
"reset_chipset_stats",
]
3 changes: 3 additions & 0 deletions src/hubble_satnet_decoder/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@
# Verbose flag (override for diagnostics)
VERBOSE: bool = False

# Log collision detections (secondary peaks during demodulation)
LOG_COLLISIONS: bool = False

# ---------------------------------------------------------------------------
# Visualisation spectrogram defaults (used by compute_spec_chunk)
# ---------------------------------------------------------------------------
Expand Down
30 changes: 27 additions & 3 deletions src/hubble_satnet_decoder/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from . import constants
from .chipset import cs_inc, get_last_attempt, identify_chipset
from .demod import build_chan_mask, demod_one_symbol, interp_peak
from .demod import build_chan_mask, demod_one_symbol, demod_one_symbol_with_collision, interp_peak
from .detector import detect_preambles
from .whitening import data_de_scrambling

Expand Down Expand Up @@ -278,16 +278,24 @@ def _decode_v1(signal, start_sample, sps):
# Demodulate header (6 symbols, same channel)
chan_mask = build_chan_mask(F0, synth_res_val)
header_syms = []
header_collisions = 0
half_sym = nsym // 2
for h in range(constants.NUM_HEADER_SYMS):
sym_abs_idx = constants.PREAMBLE_LEN + h
s0 = start_sample + sym_abs_idx * slot + half_sym
if s0 + nsym > len(signal):
return None, None
fsk_bin, _, _ = demod_one_symbol(
fsk_bin, _, _, collision = demod_one_symbol_with_collision(
signal[s0:s0 + nsym], F0, synth_res_val, chan_mask,
)
header_syms.append(fsk_bin)
if collision:
header_collisions += 1
if constants.LOG_COLLISIONS:
print(f"[COLLISION] Header symbol {h}: collision detected at F0={F0:.1f} Hz")

if constants.LOG_COLLISIONS and header_collisions > 0:
print(f"[COLLISION] Header total: {header_collisions}/{constants.NUM_HEADER_SYMS} symbols with collisions")

_last_attempt["header_syms"] = list(header_syms)

Expand Down Expand Up @@ -321,6 +329,7 @@ def _decode_v1(signal, start_sample, sps):
hopping_seq = constants.HOPPING_SEQS[hop_seq_idx]
_last_attempt.update(
header_n_corr=int(header_n_corr),
header_collisions=header_collisions,
channel_num=channel_num, hop_seq_idx=hop_seq_idx,
pkt_len_idx=pkt_len_idx, num_pdu_symbols=num_pdu_symbols,
)
Expand All @@ -340,6 +349,7 @@ def _decode_v1(signal, start_sample, sps):
f0_cur = F0
mask = build_chan_mask(F0, synth_res_val)
pdu_syms = []
pdu_collisions = 0

for p_idx in range(num_pdu_symbols):
sym_abs_idx = constants.PREAMBLE_LEN + constants.NUM_HEADER_SYMS + p_idx
Expand All @@ -355,10 +365,14 @@ def _decode_v1(signal, start_sample, sps):
mask = build_chan_mask(f0_cur, synth_res_val)
cur_ch = nxt

fsk_bin, _, _ = demod_one_symbol(
fsk_bin, _, _, collision = demod_one_symbol_with_collision(
signal[s0:s0 + nsym], f0_cur, synth_res_val, mask,
)
pdu_syms.append(fsk_bin)
if collision:
pdu_collisions += 1
if constants.LOG_COLLISIONS:
print(f"[COLLISION] PDU symbol {p_idx}: collision detected at F0={f0_cur:.1f} Hz, channel={cur_ch}")

if len(pdu_syms) != num_pdu_symbols:
_last_attempt["reason"] = "pdu_incomplete"
Expand All @@ -371,6 +385,7 @@ def _decode_v1(signal, start_sample, sps):

if pdu_n_corr < 0:
_last_attempt["pdu_syms_head"] = pdu_syms[:10]
_last_attempt["pdu_collisions"] = pdu_collisions
cs_inc(chipset_name, "pdu_fail")
if _diag:
print(
Expand Down Expand Up @@ -420,6 +435,13 @@ def _decode_v1(signal, start_sample, sps):
f"data={mac_syms}, pdu_demod={pdu_syms}"
)

total_collisions = header_collisions + pdu_collisions
if constants.LOG_COLLISIONS and total_collisions > 0:
print(
f"[COLLISION] Packet 0x{ntw_id:08X} seq={seq_num}: "
f"{header_collisions} header + {pdu_collisions} PDU = {total_collisions} total collisions"
)

return (
{"F0_hz": float(F0), "total_energy_dB": float(total_energy_dBFS)},
{
Expand All @@ -429,6 +451,8 @@ def _decode_v1(signal, start_sample, sps):
"chipset": chipset_name,
"channel_num": channel_num, "hop_seq_idx": hop_seq_idx,
"header_n_corr": int(header_n_corr), "pdu_n_corr": int(pdu_n_corr),
"header_collisions": header_collisions,
"pdu_collisions": pdu_collisions,
"measured_synth_res": round(measured_synth_res, 2),
"num_pdu_symbols": num_pdu_symbols,
"freq_delta_hz": round(freq_delta_hz, 1),
Expand Down
36 changes: 36 additions & 0 deletions src/hubble_satnet_decoder/demod.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,39 @@ def demod_one_symbol(
fsk_bin = int(round((peak_freq - F0) / synth_res_val))
fsk_bin = max(0, min(constants.NUM_FSK_BINS - 1, fsk_bin))
return fsk_bin, peak_freq, float(psd_masked[peak_bin])


def demod_one_symbol_with_collision(
sig_segment: np.ndarray,
F0: float,
synth_res_val: float,
chan_mask: np.ndarray,
collision_threshold: float = 0.5,
null_bins: int = 2,
) -> tuple[int, float, float, bool]:
"""Like :func:`demod_one_symbol` but also detects potential collisions.

Returns ``(fsk_bin, peak_freq, peak_power, collision_detected)``.

A collision is flagged when a secondary peak (outside ±null_bins of the
primary) exceeds ``collision_threshold`` times the primary peak power.
"""
spectrum = np.fft.fft(sig_segment)
psd = np.abs(spectrum) ** 2
psd_masked = psd.copy()
psd_masked[~chan_mask] = 0.0

peak_bin = int(np.argmax(psd_masked))
peak_freq = interp_peak(psd_masked, peak_bin, constants.fft_freqs)
peak_power = float(psd_masked[peak_bin])

psd_second = psd_masked.copy()
null_start = max(0, peak_bin - null_bins)
null_end = min(len(psd_second), peak_bin + null_bins + 1)
psd_second[null_start:null_end] = 0.0
second_peak_power = float(np.max(psd_second))
collision = second_peak_power > (collision_threshold * peak_power)

fsk_bin = int(round((peak_freq - F0) / synth_res_val))
fsk_bin = max(0, min(constants.NUM_FSK_BINS - 1, fsk_bin))
return fsk_bin, peak_freq, peak_power, collision
Loading