Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/pykmp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
RegisterData,
RegisterID,
)
from .registers import RegisterOutput

# `PySerialClientCommunicator` is intentionally not re-exported from the package root.
# Doing so would make the optional `pyserial` dependency an unconditional import-time
Expand Down
3 changes: 2 additions & 1 deletion src/pykmp/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
267: "Cooling energy E3 hires",
346: "Module SW rev",
347: "Customer number",
348: "Date and Time", # TODO: unknown unit 79, 28591984415535
348: "Date and Time",
355: "COP Year",
362: "Tariff TA4",
364: "Heat energy A1", # Heat energy with discount A1, t2 < t5 limit
Expand Down Expand Up @@ -235,6 +235,7 @@
64: "Datetime",
65: "imp/l",
66: "l/imp",
79: "DST YY-MM-DD hh:mm:ss",
85: "%RH",
86: "%O\N{SUBSCRIPT TWO}",
87: "m/s",
Expand Down
84 changes: 84 additions & 0 deletions src/pykmp/registers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# SPDX-FileCopyrightText: 2023 Gert van Dijk <github@gertvandijk.nl>
#
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import decimal
from typing import Final

import attrs

from . import codec, constants, messages


REGISTERS_NAMES_LEN_MAX: Final[int] = max(
len(name) for name in constants.REGISTERS.values()
)


@attrs.define(kw_only=True)
class RegisterOutput:
id_int: int
id_hex: str = attrs.field(init=False)
name: str = attrs.field(init=False)
unit_int: int
unit_hex: str = attrs.field(init=False)
unit_str: str = attrs.field(init=False)
value_float: float = attrs.field(init=False, default=None)
value_str: str = attrs.field() # best: uses decimal.Decimal without loss
value_dec: decimal.Decimal = attrs.field()

def __attrs_post_init__(self) -> None:
self.id_hex = f"0x{self.id_int:04X}"
self.unit_hex = f"0x{self.unit_int:02X}"
self.name = constants.REGISTERS.get(self.id_int, f"<unknown reg {self.id_int}>")
self.unit_str = constants.UNITS_NAMES.get(
self.unit_int, f"<unknown unit {self.unit_int}>"
)
if self.value_dec is not None:
self.value_float = float(self.value_dec)
self.value_str = str(self.value_dec)

@classmethod
def from_register_data(cls, reg: messages.RegisterData) -> Self:
value_dec = None
value_str = None
match reg.unit:
case 0x2f:
# hh:mm:ss
d = int.from_bytes(reg.value[2:], 'big')
value_str = f'{(d//10000):02}:{(d // 100 % 100):02}:{(d % 100):02}'
pass
case 0x30:
# yy-mm-dd
d = int.from_bytes(reg.value[2:], 'big')
value_str = f'{(2000 + d//10000):02}-{(d // 100 % 100):02}-{(d % 100):02}'
pass
case 0x32:
# mm-dd
d = int.from_bytes(reg.value[2:], 'big')
value_str = f'{(d // 100 % 100):02}-{(d % 100):02}'
case 0x4f:
# DST yy:mm:dd hh:mm:ss
dst = reg.value[2]
value_str = f'{(2000 + reg.value[3]):02}-{reg.value[4]:02}-{reg.value[5]:02} ' \
+ f'{reg.value[6]:02}:{reg.value[7]:02}:{reg.value[8]:02}' \
+ f'{"+" if dst > 0 else "-"}{(dst // 60):02}:{(dst % 60):02}'
case 0x36:
# ASCII
value_str = bytes(reg.value[2:]).decode('ascii')
case _:
value_dec = codec.FloatCodec.decode(reg.value)
return cls(
id_int=reg.id_,
unit_int=reg.unit,
value_dec=value_dec,
value_str=value_str,
)

def to_pretty_line(self) -> str:
return (
f"{self.id_int!r:>4} → {self.name:<{REGISTERS_NAMES_LEN_MAX}} = "
f"{self.value_str} {self.unit_str}"
)
47 changes: 2 additions & 45 deletions src/pykmp/tool/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import click

import pykmp
from pykmp import client, codec, constants, messages
from pykmp import client, constants, messages, registers

if TYPE_CHECKING:
from collections.abc import Collection, Sequence
Expand Down Expand Up @@ -147,49 +147,6 @@ def get_serial(ctx: click.Context) -> None:
click.echo(f"Meter serial is: {response.serial}")


REGISTERS_NAMES_LEN_MAX: Final[int] = max(
len(name) for name in constants.REGISTERS.values()
)


@attrs.define(kw_only=True)
class RegisterOutput:
id_int: int
id_hex: str = attrs.field(init=False)
name: str = attrs.field(init=False)
unit_int: int
unit_hex: str = attrs.field(init=False)
unit_str: str = attrs.field(init=False)
value_float: float = attrs.field(init=False)
value_str: str = attrs.field(init=False) # best: uses decimal.Decimal without loss
value_dec: decimal.Decimal

def __attrs_post_init__(self) -> None:
self.id_hex = f"0x{self.id_int:04X}"
self.unit_hex = f"0x{self.unit_int:02X}"
self.name = constants.REGISTERS.get(self.id_int, f"<unknown reg {self.id_int}>")
self.unit_str = constants.UNITS_NAMES.get(
self.unit_int, f"<unknown unit {self.unit_int}>"
)
self.value_float = float(self.value_dec)
self.value_str = str(self.value_dec)

@classmethod
def from_register_data(cls, reg: messages.RegisterData) -> Self:
value_dec = codec.FloatCodec.decode(reg.value)
return cls(
id_int=reg.id_,
unit_int=reg.unit,
value_dec=value_dec,
)

def to_pretty_line(self) -> str:
return (
f"{self.id_int!r:>4} → {self.name:<{REGISTERS_NAMES_LEN_MAX}} = "
f"{self.value_str} {self.unit_str}"
)


def warn_registers_unknowns(
registers: Collection[messages.RegisterData],
) -> None:
Expand Down Expand Up @@ -279,7 +236,7 @@ def get_register(

warn_registers_unknowns(response.registers.values())
outputs = (
RegisterOutput.from_register_data(reg) for reg in response.registers.values()
registers.RegisterOutput.from_register_data(reg) for reg in response.registers.values()
)

match output_format:
Expand Down
96 changes: 96 additions & 0 deletions tests/test_registers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# SPDX-FileCopyrightText: 2026 Jan Kundrát <jkt@jankundrat.com>
#
# SPDX-License-Identifier: Apache-2.0

from __future__ import annotations

import pytest
import decimal

from pykmp import messages, registers

@pytest.mark.parametrize(
("id_", "unit", "blob_with_size", "value_str", "value_dec", "unit_str"),
[
pytest.param(
1001,
51,
'04 00 00 00 00 00',
'0',
decimal.Decimal('0'),
'no unit (number)',
),
pytest.param(
60,
8,
'04 43 00 05 c4 a6',
'378.022',
decimal.Decimal('378.022'),
'GJ',
),
pytest.param(
1002,
47,
'04 00 00 00 00 04',
'00:00:04',
None,
'hh:mm:ss',
),
pytest.param(
1003,
48,
'04 00 00 03 ae 4f',
'2024-12-31',
None,
'yy:mm:dd',
),
pytest.param(
0,
50,
'04 00 00 03 38',
'08-24',
None,
'mm:dd',
),
pytest.param(
348,
79,
'07 00 00 18 0c 1f 00 00 04',
'2024-12-31 00:00:04-00:00',
None,
'DST YY-MM-DD hh:mm:ss',
),
# FIXME: how to print the DST? This (unadjusted time, and a DST remark at the end) is what the vendor's
# documentation is using, but the LogView SW actually shows '07 00 3c 1a 04 15 0c 07 1e' as
# '2026-04-21 13:07:30', i.e., with the DST offset of one hour already added.
pytest.param(
348,
79,
'07 00 3C 10 06 1E 0E 30 37',
'2016-06-30 14:48:55+01:00',
None,
'DST YY-MM-DD hh:mm:ss',
),
pytest.param(
254,
54,
'0C 00 30 32 4B 35 32 41 43 31 41 37 43 5A',
'02K52AC1A7CZ',
None,
'ASCII',
),
]
)
def test_register_parsing(id_, unit, blob_with_size, value_str, value_dec, unit_str):
data = messages.RegisterData(id_=id_, unit=unit, value=bytes.fromhex(blob_with_size))
parsed = registers.RegisterOutput.from_register_data(data)
assert parsed.value_str == value_str
assert parsed.value_dec == value_dec
assert parsed.unit_str == unit_str


def test_register_pretty_line():
data = messages.RegisterData(id_=1001, unit=51, value=bytes.fromhex('04 00 00 00 00 00'))
parsed = registers.RegisterOutput.from_register_data(data)
for part in ('1001', 'Fabrication No', ' 0 no unit (number)'):
assert part in parsed.to_pretty_line()