Skip to content

API reference documentation – client module

Convenience in client communication with the KMP protocol (to meter).

The PySerialClientCommunicator provides the most high-level class to communicate with the meter.

Concrete classes

pykmp.client.ClientCodec

Wires up the codecs of all layers for communication to the meter.

destination_address: int = attrs.field(default=constants.DestinationAddress.HEAT_METER.value) class-attribute instance-attribute

application_codec: codec.ApplicationCodec = attrs.field(factory=codec.ApplicationCodec) class-attribute instance-attribute

physical_codec_encode: ClassVar = codec.PhysicalCodec(direction=codec.PhysicalDirection.TO_METER) class-attribute instance-attribute

physical_codec_decode: ClassVar = codec.PhysicalCodec(direction=codec.PhysicalDirection.FROM_METER) class-attribute instance-attribute

encode(request: ClientContextRequest[ClientContextResponse[CCReq_t_co]]) -> EncodedClientRequest[CCReq_t_co]

Encode a request message to bytes for sending on the physical layer.

decode(*, frame: EncodedClientResponse[ClientContextRequest[CCResp_t_co]]) -> CCResp_t_co

Decode bytes from the physical layer to a response message.

pykmp.client.PySerialClientCommunicator

Bases: ClientCommunicator

Uses PySerial to connect, read and write to a meter.

For connecting over TCP (e.g. with ser2net) use 'socket://:' as 'serial_device' string.

DEFAULT_READ_TIMEOUT_SECONDS: float = 2.0 class-attribute

serial_device: str = attrs.field() class-attribute instance-attribute

timeout_seconds: float = attrs.field(default=DEFAULT_READ_TIMEOUT_SECONDS) class-attribute instance-attribute

send_request(*, message: ClientContextRequest[CCResp_t_co], destination_address: int = DESTINATION_ADDRESS_DEFAULT) -> CCResp_t_co

Encode and send a request, return decoded response.

__attrs_post_init__() -> None

Initialize a serial.Serial object in self._serial.

read(*, num_bytes: int | None = None) -> codec.PhysicalBytes

Read from serial device or network socket.

If num_bytes is provided, it reads num_bytes number of bytes, else it will read until stop byte.

read_num_bytes(num_bytes: int) -> codec.PhysicalBytes

Read num_bytes number of bytes from serial device or network socket.

read_until_stop() -> codec.PhysicalBytes

Read bytes from serial device or network socket until a stop byte.

write(data: codec.PhysicalBytes) -> None

Write the bytes to the serial device or network socket.

Constants

pykmp.client.DESTINATION_ADDRESS_DEFAULT = attrs.fields(ClientCodec).destination_address.default module-attribute

pykmp.client.PYSERIAL_AVAILABLE = bool(importlib.util.find_spec('serial')) module-attribute

Data classes & types

pykmp.client.CCReq_t_co = TypeVar('CCReq_t_co', bound='ClientContextRequest[Any]', covariant=True) module-attribute

pykmp.client.CCResp_t_co = TypeVar('CCResp_t_co', bound='ClientContextResponse[Any]', covariant=True) module-attribute

pykmp.client.ClientContextRequest

Bases: SupportsEncode, BaseRequest[CCResp_t_co], Protocol[CCResp_t_co]

Generic request KMP message type that can be used in a client context.

pykmp.client.ClientContextResponse

Bases: SupportsDecode, BaseResponse[CCReq_t_co], Protocol[CCReq_t_co]

Generic response KMP message type that can be used in a client context.

pykmp.client.EncodedClientRequest

Bases: Generic[CCReq_t_co]

Wraps the physical bytes type to include request type information.

physical_bytes: codec.PhysicalBytes instance-attribute

request_cls: type[ClientContextRequest[ClientContextResponse[CCReq_t_co]]] instance-attribute

pykmp.client.EncodedClientResponse

Bases: Generic[CCReq_t_co]

Wraps the physical bytes type to include request (response) type information.

physical_bytes: codec.PhysicalBytes instance-attribute

request_cls: type[CCReq_t_co] instance-attribute

Base classes

pykmp.client.ClientCommunicator

Bases: Protocol

Wrap the codecs and communication communication with the meter.

read(*, num_bytes: int | None = None) -> codec.PhysicalBytes

Read num_bytes number of bytes (or until stop byte if None) from [...].

If num_bytes is provided, it reads num_bytes number of bytes, else it will read until stop byte.

write(data: codec.PhysicalBytes) -> None

Write the bytes to [...].

send_request(*, message: ClientContextRequest[CCResp_t_co], destination_address: int = DESTINATION_ADDRESS_DEFAULT) -> CCResp_t_co

Encode and send a request, return decoded response.