Skip to content

API reference documentation – codec module

Low-level codec classes for the Kamstrup KMP protocol.

This contains decoders/encoders for the Physical, Data Link and Application layer (generics).

Logic for specific messages (requests and responses) are not part of this module, but generic codecs such as floating point data format encoding/decoding used in requests/responses are.

Concrete classes

pykmp.codec.PhysicalCodec

Codec for the physical layer of the Kamstrup KMP protocol.

This codec is responsible for encoding/decoding a frame to/from the data link layer message for writing/reading to/from the IR head. What it does is:

  • Adding/removing start/stop bytes.
  • Stuffing/destuffing some special byte values.

Note that the codec is slightly different for sending or receiving messages to/from the meter.

See section 3.1 of the KMP protocol description document.

direction: PhysicalDirection = attrs.field() class-attribute instance-attribute

BYTE_STUFFING_MAP: Final[dict[bytes, bytes]] = {the_byte.to_bytes(1, 'big'): constants.ByteCode.STUFFING.value.to_bytes(1, 'big') + the_byte ^ 255.to_bytes(1, 'big')for the_byte in (constants.ByteCode.STUFFING.value, constants.ByteCode.ACK.value, constants.ByteCode.START_FROM_METER.value, constants.ByteCode.START_TO_METER.value, constants.ByteCode.STOP.value)} class-attribute instance-attribute

__attrs_post_init__() -> None

Select start byte value according to configuration (direction).

decode(frame: PhysicalBytes) -> DataLinkBytes

Decode a byte sequence of the physical layer into 'DataLinkBytes'.

The frame must start with a start byte and ends with an end byte, unless it's an ACK frame.

encode(data_bytes: DataLinkBytes) -> PhysicalBytes

Encode a byte sequence of the data link layer into 'PhysicalBytes'.

This 'stuffs' the special byte values and adds start/stop bytes.

For encoding an ACK message, see encode_ack().

encode_ack() -> PhysicalBytes classmethod

Encode an ACK message.

This type of message does not need andy stuffing or start/stop bytes.

pykmp.codec.DataLinkCodec

Codec for the data link layer of the Kamstrup KMP protocol.

This codec is responsible for encoding/decoding to/from the application layer messages. What it does is:

  • Destructuring the byte sequence into a destination address, application layer byte sequence and a CRC checksum.
  • CRC checksum calculation/verification.

See section 3.2 of the KMP protocol description document.

APPLICATION_BYTES_LENGTH_MIN: Final[int] = 1 class-attribute instance-attribute

crc_calculator: Final[crc.Calculator] = _create_kamstrup_crc16_ccitt_calculator() class-attribute instance-attribute

decode(raw: DataLinkBytes) -> DataLinkData

Decode a byte sequence of the data link layer into 'DataLinkData'.

This destructures it into the destination address, the application data and the CRC checksum. Also the latter will be verified.

encode(data: DataLinkData) -> DataLinkBytes

Encode a byte sequence of the application layer into 'DataLinkBytes'.

The structure includes the destination address and a CRC checksum (calculated here).

pykmp.codec.ApplicationCodec

Codec for the application layer of the Kamstrup KMP protocol.

This codec is responsible for encoding/decoding to/from the command data byte sequences. What it does is destructuring the byte sequence into a Command ID (CID) and the command data.

Note that this covers both requests and responses and command data may be emtpy.

See section 3.3 of the KMP protocol description document.

APPLICATION_BYTES_LENGTH_MIN: Final[int] = 1 class-attribute instance-attribute

decode(data: ApplicationBytes) -> ApplicationData classmethod

Decode a byte sequence of the application layer into 'ApplicationData'.

encode(to_encode: ApplicationData) -> ApplicationBytes classmethod

Encode to a byte sequence of the application layer 'ApplicationBytes'.

pykmp.codec.FloatCodec

Codec for the variable length base-10 floating point format in the KMP protocol.

The length of the mantissa is encoded and is commonly 32 bits (4 bytes). A visual representation of the format using an example:

data: 0x024300FB

    0x02     0x43     0x00     0xFB (hex)
00000010 01000011 00000000 11111011 (bin)

00000010 ________ ________ ________ <- length of significand
________ 0_______ ________ ________ <- sign bit for significand 'SI' (1=negative)
________ _1______ ________ ________ <- sign bit for exponent 'SE'
________ __000011 ________ ________ <- 6 exponent bits
________ ________ 00000000 11111011 <- significand (int) 'mantissa'

In the above example: - 0x02 decodes to a length of 2 bytes = 16 bits to read for the mantissa. - SI=0, so a positive value. - SE=1, thus a negative exponent. - exponent=0x03 = 3 (decimal) - mantissa=0x00FB = 251 (decimal)

Calculation with the above example data:

    -1|1 * (mantissa * 10 ^ ( -1|1   * exponent )

      1  * (  251    * 10 ^ (  -1    *    3     ) = 0.251

In short, The mantissa holds the significands of the data, the others are just for scale.

See also section 4.2 of the KMP protocol description document.

decode(data: bytes) -> decimal.Decimal classmethod

Decode a byte sequence of a floating point format to a decimal.Decimal.

decode_int_or_float(data: bytes) -> int | float classmethod

Decode a KMP protocol byte sequence of a floating point as int or float.

Returns an int if it can be encoded as a int ("nothing after the comma") to avoid losing significance.

This implements an alternative way to decoding the base-10 floating point. It is included for reference/testing. It may be slow or one may be unfamiliar with Python's decimal.Decimal types. Note that you may lose significance in this conversion to float, or some infamous floating point error like '63.440000000000005' instead of '63.44'.

Its use is discouraged and if one needs a float then convert the returned decimal.Decimal from the regular decode() method.

encode(*, to_encode: decimal.Decimal, significand_num_bytes: int | None = 4) -> bytes classmethod

Encode a decimal.Decimal value to a byte sequence in the KMP protocol.

Data classes & types

pykmp.codec.PhysicalBytes = NewType('PhysicalBytes', bytes) module-attribute

Distinct type (bytes) representing serialized bytes on the physical layer.

pykmp.codec.DataLinkBytes = NewType('DataLinkBytes', bytes) module-attribute

Distinct type (bytes) representing serialized bytes on the data link layer.

pykmp.codec.ApplicationBytes = NewType('ApplicationBytes', bytes) module-attribute

Distinct type (bytes) representing serialized data on the application layer.

pykmp.codec.ApplicationDataBytes = NewType('ApplicationDataBytes', bytes) module-attribute

Distinct type (bytes) representing serialized application (message) data.

pykmp.codec.ApplicationData

Data class for the data in the application layer of the Kamstrup KMP protocol.

command_id: int instance-attribute

data: ApplicationDataBytes instance-attribute

pykmp.codec.DataLinkData

Data class for the data in the data link layer of the Kamstrup KMP protocol.

destination_address: int instance-attribute

application_bytes: ApplicationBytes instance-attribute

crc_value: int | None = None class-attribute instance-attribute

pykmp.codec.PhysicalDirection

Bases: Enum

Specifies the direction of communication for the Kamstrup KMP protocol.

TO_METER = enum.auto() class-attribute instance-attribute

FROM_METER = enum.auto() class-attribute instance-attribute

Exceptions

pykmp.codec.AckReceivedException

Bases: Exception

Not an error; it was just an ACK and no data link bytes to return.

pykmp.codec.BaseCodecError

Bases: Exception

Base error for anything that originates from logic here.

pykmp.codec.OutOfRangeError

Bases: BaseCodecError

A value was found outside of a valid range.

Supports ranges with both lower and upper bounds, only upper and only lower. Range is inclusive.

what: str instance-attribute

valid_range: tuple[int, int] | tuple[int, None] | tuple[None, int] instance-attribute

actual: int instance-attribute

__str__() -> str

pykmp.codec.DataLengthUnexpectedError

Bases: BaseCodecError

what: str instance-attribute

actual: int instance-attribute

length_expected: int | None = None class-attribute instance-attribute

expected_is_minimum: bool = False class-attribute instance-attribute

__str__() -> str

pykmp.codec.BoundaryByteInvalidError

Bases: BaseCodecError

First or last byte is not a start or stop byte respectively.

what: Literal['start', 'stop'] instance-attribute

expected_byte: int instance-attribute

actual_byte: int instance-attribute

__str__() -> str

pykmp.codec.InvalidDestinationAddressError

Bases: BaseCodecError

Destination address for the data link layer is not within valid range.

__str__() -> str

pykmp.codec.UnsupportedDecimalExponentError

Bases: BaseCodecError

The Decimal value cannot be encoded to a base-10 float (non-integer exponent).

actual_exponent: Any instance-attribute

__str__() -> str

pykmp.codec.CrcChecksumInvalidError

Bases: BaseCodecError

CRC checksum validation of the data link byte sequence did not pass.