API reference documentation â codec module
Low-level codec classes for the Kamstrup KMP protocol.
This contains decoders/encoders for the Physical, Data Link and Application layer (generics).
Logic for specific messages (requests and responses) are not part of this module, but generic codecs such as floating point data format encoding/decoding used in requests/responses are.
Concrete classes
pykmp.codec.PhysicalCodec
Codec for the physical layer of the Kamstrup KMP protocol.
This codec is responsible for encoding/decoding a frame to/from the data link layer message for writing/reading to/from the IR head. What it does is:
- Adding/removing start/stop bytes.
- Stuffing/destuffing some special byte values.
Note that the codec is slightly different for sending or receiving messages to/from the meter.
See section 3.1 of the KMP protocol description document.
direction: PhysicalDirection = attrs.field()
class-attribute
instance-attribute
BYTE_STUFFING_MAP: Final[dict[bytes, bytes]] = {the_byte.to_bytes(1, 'big'): constants.ByteCode.STUFFING.value.to_bytes(1, 'big') + the_byte ^ 255.to_bytes(1, 'big')for the_byte in (constants.ByteCode.STUFFING.value, constants.ByteCode.ACK.value, constants.ByteCode.START_FROM_METER.value, constants.ByteCode.START_TO_METER.value, constants.ByteCode.STOP.value)}
class-attribute
instance-attribute
__attrs_post_init__() -> None
Select start byte value according to configuration (direction).
decode(frame: PhysicalBytes) -> DataLinkBytes
Decode a byte sequence of the physical layer into 'DataLinkBytes'.
The frame
must start with a start byte and ends with an end byte, unless it's
an ACK frame.
encode(data_bytes: DataLinkBytes) -> PhysicalBytes
Encode a byte sequence of the data link layer into 'PhysicalBytes'.
This 'stuffs' the special byte values and adds start/stop bytes.
For encoding an ACK message, see encode_ack().
encode_ack() -> PhysicalBytes
classmethod
Encode an ACK message.
This type of message does not need andy stuffing or start/stop bytes.
pykmp.codec.DataLinkCodec
Codec for the data link layer of the Kamstrup KMP protocol.
This codec is responsible for encoding/decoding to/from the application layer messages. What it does is:
- Destructuring the byte sequence into a destination address, application layer byte sequence and a CRC checksum.
- CRC checksum calculation/verification.
See section 3.2 of the KMP protocol description document.
DATA_LINK_BYTES_LENGTH_MIN: Final[int] = 4
class-attribute
instance-attribute
APPLICATION_BYTES_LENGTH_MIN: Final[int] = 1
class-attribute
instance-attribute
crc_calculator: Final[crc.Calculator] = _create_kamstrup_crc16_ccitt_calculator()
class-attribute
instance-attribute
decode(raw: DataLinkBytes) -> DataLinkData
Decode a byte sequence of the data link layer into 'DataLinkData'.
This destructures it into the destination address, the application data and the CRC checksum. Also the latter will be verified.
encode(data: DataLinkData) -> DataLinkBytes
Encode a byte sequence of the application layer into 'DataLinkBytes'.
The structure includes the destination address and a CRC checksum (calculated here).
pykmp.codec.ApplicationCodec
Codec for the application layer of the Kamstrup KMP protocol.
This codec is responsible for encoding/decoding to/from the command data byte sequences. What it does is destructuring the byte sequence into a Command ID (CID) and the command data.
Note that this covers both requests and responses and command data may be emtpy.
See section 3.3 of the KMP protocol description document.
APPLICATION_BYTES_LENGTH_MIN: Final[int] = 1
class-attribute
instance-attribute
decode(data: ApplicationBytes) -> ApplicationData
classmethod
Decode a byte sequence of the application layer into 'ApplicationData'.
encode(to_encode: ApplicationData) -> ApplicationBytes
classmethod
Encode to a byte sequence of the application layer 'ApplicationBytes'.
pykmp.codec.FloatCodec
Codec for the variable length base-10 floating point format in the KMP protocol.
The length of the mantissa is encoded and is commonly 32 bits (4 bytes). A visual representation of the format using an example:
data: 0x024300FB
0x02 0x43 0x00 0xFB (hex)
00000010 01000011 00000000 11111011 (bin)
00000010 ________ ________ ________ <- length of significand
________ 0_______ ________ ________ <- sign bit for significand 'SI' (1=negative)
________ _1______ ________ ________ <- sign bit for exponent 'SE'
________ __000011 ________ ________ <- 6 exponent bits
________ ________ 00000000 11111011 <- significand (int) 'mantissa'
In the above example: - 0x02 decodes to a length of 2 bytes = 16 bits to read for the mantissa. - SI=0, so a positive value. - SE=1, thus a negative exponent. - exponent=0x03 = 3 (decimal) - mantissa=0x00FB = 251 (decimal)
Calculation with the above example data:
In short, The mantissa holds the significands of the data, the others are just for scale.
See also section 4.2 of the KMP protocol description document.
decode(data: bytes) -> decimal.Decimal
classmethod
Decode a byte sequence of a floating point format to a decimal.Decimal.
decode_int_or_float(data: bytes) -> int | float
classmethod
Decode a KMP protocol byte sequence of a floating point as int or float.
Returns an int if it can be encoded as a int ("nothing after the comma") to avoid losing significance.
This implements an alternative way to decoding the base-10 floating point. It is included for reference/testing. It may be slow or one may be unfamiliar with Python's decimal.Decimal types. Note that you may lose significance in this conversion to float, or some infamous floating point error like '63.440000000000005' instead of '63.44'.
Its use is discouraged and if one needs a float then convert the returned
decimal.Decimal from the regular decode()
method.
encode(*, to_encode: decimal.Decimal, significand_num_bytes: int | None = 4) -> bytes
classmethod
Encode a decimal.Decimal value to a byte sequence in the KMP protocol.
Data classes & types
pykmp.codec.PhysicalBytes = NewType('PhysicalBytes', bytes)
module-attribute
Distinct type (bytes) representing serialized bytes on the physical layer.
pykmp.codec.DataLinkBytes = NewType('DataLinkBytes', bytes)
module-attribute
Distinct type (bytes) representing serialized bytes on the data link layer.
pykmp.codec.ApplicationBytes = NewType('ApplicationBytes', bytes)
module-attribute
Distinct type (bytes) representing serialized data on the application layer.
pykmp.codec.ApplicationDataBytes = NewType('ApplicationDataBytes', bytes)
module-attribute
Distinct type (bytes) representing serialized application (message) data.
pykmp.codec.ApplicationData
Data class for the data in the application layer of the Kamstrup KMP protocol.
command_id: int
instance-attribute
data: ApplicationDataBytes
instance-attribute
pykmp.codec.DataLinkData
Data class for the data in the data link layer of the Kamstrup KMP protocol.
destination_address: int
instance-attribute
application_bytes: ApplicationBytes
instance-attribute
crc_value: int | None = None
class-attribute
instance-attribute
pykmp.codec.PhysicalDirection
Bases: Enum
Specifies the direction of communication for the Kamstrup KMP protocol.
TO_METER = enum.auto()
class-attribute
instance-attribute
FROM_METER = enum.auto()
class-attribute
instance-attribute
Exceptions
pykmp.codec.AckReceivedException
Bases: Exception
Not an error; it was just an ACK and no data link bytes to return.
pykmp.codec.BaseCodecError
Bases: Exception
Base error for anything that originates from logic here.
pykmp.codec.OutOfRangeError
Bases: BaseCodecError
A value was found outside of a valid range.
Supports ranges with both lower and upper bounds, only upper and only lower. Range is inclusive.
what: str
instance-attribute
valid_range: tuple[int, int] | tuple[int, None] | tuple[None, int]
instance-attribute
actual: int
instance-attribute
__str__() -> str
pykmp.codec.DataLengthUnexpectedError
Bases: BaseCodecError
what: str
instance-attribute
actual: int
instance-attribute
length_expected: int | None = None
class-attribute
instance-attribute
expected_is_minimum: bool = False
class-attribute
instance-attribute
__str__() -> str
pykmp.codec.BoundaryByteInvalidError
Bases: BaseCodecError
First or last byte is not a start or stop byte respectively.
what: Literal['start', 'stop']
instance-attribute
expected_byte: int
instance-attribute
actual_byte: int
instance-attribute
__str__() -> str
pykmp.codec.InvalidDestinationAddressError
Bases: BaseCodecError
Destination address for the data link layer is not within valid range.
__str__() -> str
pykmp.codec.UnsupportedDecimalExponentError
Bases: BaseCodecError
The Decimal value cannot be encoded to a base-10 float (non-integer exponent).
actual_exponent: Any
instance-attribute
__str__() -> str
pykmp.codec.CrcChecksumInvalidError
Bases: BaseCodecError
CRC checksum validation of the data link byte sequence did not pass.