Skip to content

Split code

Splitting handlers into modules

You can move handlers into different modules to structure your code.

misc.py
from aiohttp import web

from waio.bot import Bot, Dispatcher
from waio.logs import loguru_filter
from waio.storage import RedisStorage

loguru_filter.set_level("DEBUG")

storage = RedisStorage(prefix_fsm="fsm", redis_url="redis://localhost:6379")

bot = Bot(apikey="API_KEY", src_name="SRC_NAME", phone_number=7928994433)

dp = Dispatcher(bot=bot, storage=storage)

webhook = web.Application()
handlers_foo.py
from re import Match

from waio.types import Event


async def start_commands(event: Event):
    await event.answer(
        f"Filter used: [commands and content_type:TEXT], "
        f"msg: {event.message.payload.text}"
    )


async def start_photo(event: Event):
    await event.answer(
        f"Filter used: [content_type:PHOTO], " f"url_photo: {event.message.payload.url}"
    )


async def start_regex(event: Event, regex: Match):
    cart_id = regex.group("cart_id")
    item_id = regex.group("item_id")
    await event.answer(
        f"Filter used: [regex], " f"cart_id: {cart_id}, item_id: {item_id}"
    )


async def start_text_equals(event: Event):
    await event.answer(f"Filter used: [text_equals], " f"msg: {event.text}")

Move handlers to different files, but don't hang decorators for them. Next, register your handlers with
dp.register_message_handler(handler=..., **filters), passing your function and filters.

main.py
from aiohttp import web
from waio.types import ContentType

from misc import dp, webhook
from handlers_foo import start_commands, start_photo, start_regex, start_text_equals

dp.register_message_handler(
    handler=start_commands, commands=["start", "echo"], content_type=[ContentType.TEXT]
)

dp.register_message_handler(handler=start_photo, content_type=[ContentType.PHOTO])
dp.register_message_handler(
    handler=start_regex, regex=r"cart_id_(?P<cart_id>\d+)_item_id_(?P<item_id>\d+)$"
)
dp.register_message_handler(handler=start_text_equals, text_equals=["foo", "bar"])


async def handler_gupshup(request):
    event = await request.json()
    await dp.handle_event(event)
    return web.Response(status=200)


if __name__ == "__main__":
    webhook.add_routes([web.post("/api/v1/gupshup/hook", handler_gupshup)])
    web.run_app(webhook, port=8017)