Skip to content

md.message

md.message component defines message and message queue interaction contracts and
provides few useful tools out of the box.

Architecture overview

Architecture overview

Installation

pip install md.message --index-url https://source.md.land/python/

Usage

Receive and process message

Example of implementation (see examples/retrieve-and-process-message.py):

import typing

import md.message


# Implement contracts:
class ExampleMessage(md.message.MessageInterface[str]):
    def __init__(self, payload: str) -> None:
        self._payload = payload

    def get_payload(self) -> str:
        return self._payload


class ExampleReceive(md.message.ReceiveInterface[ExampleMessage]):
    def receive(self) -> typing.Iterable[ExampleMessage]:
        for i in range(42):
            yield ExampleMessage(f'message #{i}')

    def accept(self, message: ExampleMessage) -> None:  # ack
        print('Receive: Message accepted: ' + message.get_payload())

    def reject(self, message: ExampleMessage) -> None:  # nack
        print('Receive: Message rejected: ' + message.get_payload())


class ExampleHandle(md.message.HandleInterface[ExampleMessage]):
    def __init__(self) -> None:
        self.__counter = 0

    def handle(self, message: ExampleMessage) -> None:
        # ... example implementation  will fail each 2nd message
        try:
            if self.__counter % 2 == 0:
                print('Handle: Message processed (success): ' + message.get_payload())
            else:
                print('Handle: Message processed (failure): ' + message.get_payload())
                raise RuntimeError
        finally:
            self.__counter += 1 


if __name__ == '__main__':
    # example 1: receive & handle message:
    receive_message: md.message.ReceiveInterface[ExampleMessage] = ExampleReceive()
    handle_message: md.message.HandleInterface[ExampleMessage] = ExampleHandle()

    receive_application: md.message.ReceiveApplication[ExampleMessage] = (
        md.message.ReceiveApplication(
            receive_message=receive_message,
            handle_message=handle_message,
        )
    )
    receive_application.run()

Message send

Example of implementation (see examples/send-message.py):

import md.message


# Implement contracts:
class ExampleMessage(md.message.MessageInterface[str]):
    def __init__(self, payload: str) -> None:
        self._payload = payload

    def get_payload(self) -> str:
        return self._payload


class ExampleSend(md.message.SendInterface[ExampleMessage]):
    def send(self, message: ExampleMessage) -> None:
        print('Message sent: ' + message.get_payload())


if __name__ == '__main__':
    # example 2: send message:
    send: md.message.SendInterface[ExampleMessage] = ExampleSend()
    send.send(message=ExampleMessage(payload='example message'))