Dependency injection

mediatpy.Mediator class does not know anything about third-party dependency containers, but for easing its integration with them, it supplies optional parameters in its constructor that you can use to create (or maybe delegate) objects when they are needed.

The available callbacks are:

  • request_handler_factory

  • pipeline_behavior_factory

  • notification_handler_factory

import asyncio
from typing import Type

from mediatpy import Mediator, Notification, NotificationHandler


class MyNotification(Notification):
    pass


async def my_custom_notification_handler_factory(notification_handler: Type[NotificationHandler]) -> NotificationHandler:
    # Here you could delegate to the container or do anything else to create the requested object
    return notification_handler()


mediator = Mediator(notification_handler_factory=my_custom_notification_handler_factory)


@mediator.register_notification_handler
class MyNotificationHandler(NotificationHandler[MyNotification]):
    async def handle(self, notification: MyNotification) -> None:
        print(self.__class__.__name__)


async def main():
    notification = MyNotification()
    await mediator.publish(notification)


if __name__ == '__main__':
    asyncio.run(main())