API reference documentation â client module
Convenience in client communication with the KMP protocol (to meter).
The PySerialClientCommunicator provides the most high-level class to communicate with the meter.
Concrete classes
pykmp.client.ClientCodec
Wires up the codecs of all layers for communication to the meter.
destination_address: int = attrs.field(default=constants.DestinationAddress.HEAT_METER.value)
class-attribute
instance-attribute
application_codec: codec.ApplicationCodec = attrs.field(factory=codec.ApplicationCodec)
class-attribute
instance-attribute
data_link_codec: codec.DataLinkCodec = attrs.field(factory=codec.DataLinkCodec)
class-attribute
instance-attribute
physical_codec_encode: ClassVar = codec.PhysicalCodec(direction=codec.PhysicalDirection.TO_METER)
class-attribute
instance-attribute
physical_codec_decode: ClassVar = codec.PhysicalCodec(direction=codec.PhysicalDirection.FROM_METER)
class-attribute
instance-attribute
encode(request: ClientContextRequest[ClientContextResponse[CCReq_t_co]]) -> EncodedClientRequest[CCReq_t_co]
Encode a request message to bytes for sending on the physical layer.
decode(*, frame: EncodedClientResponse[ClientContextRequest[CCResp_t_co]]) -> CCResp_t_co
Decode bytes from the physical layer to a response message.
pykmp.client.PySerialClientCommunicator
Bases: ClientCommunicator
Uses PySerial to connect, read and write to a meter.
For connecting over TCP (e.g. with ser2net) use 'socket://
DEFAULT_READ_TIMEOUT_SECONDS: float = 2.0
class-attribute
serial_device: str = attrs.field()
class-attribute
instance-attribute
timeout_seconds: float = attrs.field(default=DEFAULT_READ_TIMEOUT_SECONDS)
class-attribute
instance-attribute
send_request(*, message: ClientContextRequest[CCResp_t_co], destination_address: int = DESTINATION_ADDRESS_DEFAULT) -> CCResp_t_co
Encode and send a request, return decoded response.
__attrs_post_init__() -> None
Initialize a serial.Serial object in self._serial.
read(*, num_bytes: int | None = None) -> codec.PhysicalBytes
Read from serial device or network socket.
If num_bytes is provided, it reads num_bytes number of bytes, else it will read until stop byte.
read_num_bytes(num_bytes: int) -> codec.PhysicalBytes
Read num_bytes number of bytes from serial device or network socket.
read_until_stop() -> codec.PhysicalBytes
Read bytes from serial device or network socket until a stop byte.
write(data: codec.PhysicalBytes) -> None
Write the bytes to the serial device or network socket.
Constants
pykmp.client.DESTINATION_ADDRESS_DEFAULT = attrs.fields(ClientCodec).destination_address.default
module-attribute
pykmp.client.PYSERIAL_AVAILABLE = bool(importlib.util.find_spec('serial'))
module-attribute
Data classes & types
pykmp.client.CCReq_t_co = TypeVar('CCReq_t_co', bound='ClientContextRequest[Any]', covariant=True)
module-attribute
pykmp.client.CCResp_t_co = TypeVar('CCResp_t_co', bound='ClientContextResponse[Any]', covariant=True)
module-attribute
pykmp.client.ClientContextRequest
Bases: SupportsEncode
, BaseRequest[CCResp_t_co]
, Protocol[CCResp_t_co]
Generic request KMP message type that can be used in a client context.
pykmp.client.ClientContextResponse
Bases: SupportsDecode
, BaseResponse[CCReq_t_co]
, Protocol[CCReq_t_co]
Generic response KMP message type that can be used in a client context.
pykmp.client.EncodedClientRequest
Bases: Generic[CCReq_t_co]
Wraps the physical bytes type to include request type information.
physical_bytes: codec.PhysicalBytes
instance-attribute
request_cls: type[ClientContextRequest[ClientContextResponse[CCReq_t_co]]]
instance-attribute
pykmp.client.EncodedClientResponse
Bases: Generic[CCReq_t_co]
Wraps the physical bytes type to include request (response) type information.
physical_bytes: codec.PhysicalBytes
instance-attribute
request_cls: type[CCReq_t_co]
instance-attribute
Base classes
pykmp.client.ClientCommunicator
Bases: Protocol
Wrap the codecs and communication communication with the meter.
read(*, num_bytes: int | None = None) -> codec.PhysicalBytes
Read num_bytes number of bytes (or until stop byte if None) from [...].
If num_bytes is provided, it reads num_bytes number of bytes, else it will read until stop byte.
write(data: codec.PhysicalBytes) -> None
Write the bytes to [...].
send_request(*, message: ClientContextRequest[CCResp_t_co], destination_address: int = DESTINATION_ADDRESS_DEFAULT) -> CCResp_t_co
Encode and send a request, return decoded response.