Source code for osmium_chat.bot

import asyncio
from collections.abc import Awaitable, Callable
from logging import Logger
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from osmium_chat.invite import InvitePreview

from osmium_protos import PB_CommunityMember, PB_LookupInvite, PB_UpdateMessageCreated, PB_UseInvite

from osmium_chat.channel import Channel
from osmium_chat.client import Client
from osmium_chat.community import Community
from osmium_chat.commands import Command, Commands, CommandRestriction, StringView
from osmium_chat.context import Context
from osmium_chat.errors import CommandError, CommandNotFound, CommandRestrictionError
from osmium_chat.member import Member
from osmium_chat.message import Message
from osmium_chat.user.user import User


EventHandler = Callable[..., Awaitable[None]]


[docs] class Bot: """The main entry point for an Osmium bot. Holds connection state, the registered event listeners, and the authenticated :class:`~osmium_chat.user.user.User` once connected. Commands and event listeners are defined by subclassing :class:`~osmium_chat.commands.Commands` and registering the subclass with :meth:`add_commands`. **Events** The built-in events (used with :func:`~osmium_chat.commands.listen`) are: - ``connect`` — fired with no arguments once the bot has authorized. - ``message`` — fired with a :class:`~osmium_chat.message.Message` for *every* inbound message, regardless of where it came from. - ``guild_message`` — fired with a :class:`~osmium_chat.message.Message` when the message was sent in a community (guild) channel. - ``dm_message`` — fired with a :class:`~osmium_chat.message.Message` when the message was a direct message to the bot. - ``command_error`` — fired with ``(ctx, error)`` when a command lookup or invocation fails. .. code-block:: python from osmium_chat import Bot, Context, Message, commands class MyCommands(commands.Commands): @commands.listen("message") async def on_message(self, message: Message) -> None: ... @commands.command("ping") async def ping(self, ctx: Context) -> None: await ctx.channel.send("pong") bot = Bot(prefix="!", client_id=12345) bot.add_commands(MyCommands) bot.run(token="...") """ __slots__: tuple[str, ...] = ( "prefix", "_logger", "_client", "_listeners", "_commands", "user", ) def __init__( self, prefix: str, client_id: int, *, logger: Logger | None = None, ) -> None: """Create a bot. :param prefix: The command prefix the bot responds to (e.g. ``"!"``). :param client_id: The Osmium client id this bot authenticates as. :param logger: Optional logger; a default one is created if omitted. """ self.prefix = prefix self._logger = logger or Logger(__name__) self._client: Client = Client( client_id=client_id, bot=self, logger=self._logger, ) self._listeners: dict[str, list[EventHandler]] = {} self._commands: dict[str, Command] = {} self.user: User | None = None
[docs] async def dispatch(self, event: str, *args: Any, **kwargs: Any) -> None: """Invoke every listener registered for ``event``. Handler errors are logged and swallowed so one faulty listener can't take down the connection or block the others. """ for handler in self._listeners.get(event, []): try: await handler(*args, **kwargs) except Exception: self._logger.exception("Error in '%s' event handler", event)
[docs] async def lookup_invite(self, code: str) -> "InvitePreview": """Fetch an invite by code and return its full metadata. :param code: The invite code to resolve. :returns: The :class:`~osmium_chat.invite.InvitePreview` for the invite. :raises RequestError: If the gateway cannot find the invite. """ from osmium_chat.invite import InvitePreview result = await self._client.request(PB_LookupInvite(code=code)) preview = result.invite_preview if preview is None: raise RuntimeError("Gateway did not return an invite preview") return InvitePreview(preview, self._client)
def _add_command(self, command: Command) -> None: for key in (command.name, *command.aliases): if key in self._commands: raise ValueError(f"Command name {key!r} is already registered") self._commands[key] = command
[docs] def add_commands(self, cls: type[Commands], *args: Any, **kwargs: Any) -> None: """Instantiate a :class:`~osmium_chat.commands.Commands` subclass and register all its decorated commands and listeners. The bot calls ``cls(self, *args, **kwargs)``, so any extra arguments are forwarded directly to the subclass ``__init__`` after ``bot``. This lets command collections accept configuration at registration time without needing globals or post-init setters: .. code-block:: python class Greeter(commands.Commands): def __init__(self, bot: Bot, greeting: str) -> None: super().__init__(bot) self.greeting = greeting @commands.command("hi") async def hi(self, ctx: Context) -> None: await ctx.channel.send(self.greeting) bot.add_commands(Greeter, greeting="Howdy!") :param cls: An uninitialised :class:`~osmium_chat.commands.Commands` subclass. :param args: Extra positional arguments passed to ``cls.__init__`` after ``bot``. :param kwargs: Extra keyword arguments passed to ``cls.__init__``. :raises ValueError: If any command name or alias is already registered. """ instance = cls(self, *args, **kwargs) for attr_name in dir(cls): if attr_name.startswith("_"): continue attr = getattr(cls, attr_name, None) if attr is None: continue cmd_meta = getattr(attr, "_command_meta", None) if cmd_meta is not None: bound = getattr(instance, attr_name) name = cmd_meta.name or attr_name self._add_command(Command(bound, name=name, aliases=cmd_meta.aliases, restriction=cmd_meta.restriction)) continue listen_meta = getattr(attr, "_listen_meta", None) if listen_meta is not None: bound = getattr(instance, attr_name) self._listeners.setdefault(listen_meta.event, []).append(bound)
[docs] def remove_commands(self, cls: type[Commands]) -> None: """Remove all commands and listeners that were registered from *cls*. :param cls: The same :class:`~osmium_chat.commands.Commands` subclass that was passed to :meth:`add_commands`. """ for attr_name in dir(cls): if attr_name.startswith("_"): continue attr = getattr(cls, attr_name, None) if attr is None: continue cmd_meta = getattr(attr, "_command_meta", None) if cmd_meta is not None: name = cmd_meta.name or attr_name cmd = self._commands.get(name) if cmd is not None: for key in [k for k, v in self._commands.items() if v is cmd]: del self._commands[key] continue listen_meta = getattr(attr, "_listen_meta", None) if listen_meta is not None: event = listen_meta.event handlers = self._listeners.get(event, []) self._listeners[event] = [ h for h in handlers if getattr(h, "__func__", None) is not attr ]
[docs] def get_command(self, name: str) -> Command | None: """Look up a command by name or alias. :param name: The name or alias to resolve. """ return self._commands.get(name)
[docs] async def process_commands(self, update: PB_UpdateMessageCreated) -> None: """Turn an inbound message into a command invocation. Builds the :class:`~osmium_chat.context.Context`, fires the ``message`` event, and — if the message starts with the prefix and names a known command — parses its arguments and invokes it. Command failures are surfaced through the ``command_error`` event. :param update: The decoded ``message_created`` payload from the gateway. """ if update.message is None or update.message.chat_ref is None: return chat_ref = update.message.chat_ref channel_ref = chat_ref.channel community = ( Community.from_id(channel_ref.community_id, self._client) if channel_ref is not None else None ) if update.author and community is not None: author: Member | User | None = Member( PB_CommunityMember(id=update.author.id, community_id=community.id), update.author, self._client, community=community, ) elif update.author: author = User(update.author, self._client) else: author = None channel = Channel( chat_ref, self._client, id=channel_ref.channel_id if channel_ref is not None else None, community_id=channel_ref.community_id if channel_ref is not None else None, community=community, ) message = Message( update.message, self._client, author=author, channel=channel, community=community, ) ctx = Context( bot=self, message=message, author=author, channel=channel, community=community, prefix=self.prefix, ) await self.dispatch("message", message) # Fire the finer-grained event for where the message came from. A # ``chat_ref`` carrying a ``channel`` is a community (guild) channel; one # carrying a ``user`` is a direct message. if chat_ref.channel is not None: await self.dispatch("guild_message", message) elif chat_ref.user is not None: await self.dispatch("dm_message", message) # Never react to our own messages, to avoid command loops. if self.user is not None and message.author_id == self.user.id: return content = str(message.content_raw) if not content.startswith(self.prefix): return view = StringView(content[len(self.prefix):]) name = view.get_word() if not name: return ctx.invoked_with = name command = self.get_command(name) if command is None: await self.dispatch("command_error", ctx, CommandNotFound(name)) return is_dm = ctx.community is None if command.restriction is CommandRestriction.DM_ONLY and not is_dm: await self.dispatch("command_error", ctx, CommandRestrictionError(command.name, command.restriction)) return if command.restriction is CommandRestriction.COMMUNITY_ONLY and is_dm: await self.dispatch("command_error", ctx, CommandRestrictionError(command.name, command.restriction)) return try: await command.invoke(ctx, view) except CommandError as error: await self.dispatch("command_error", ctx, error) except Exception as error: self._logger.exception("Unhandled error in command %r", name) await self.dispatch("command_error", ctx, error)
[docs] async def use_invite(self, invite_code: str) -> None: """Redeem an invite code on behalf of the bot. :param invite_code: The invite code to redeem. """ self._logger.info(f"Using invite with code: {invite_code}") await self._client.send_pb(PB_UseInvite(code=invite_code))
[docs] async def connect(self, token: str) -> None: """Connect to Osmium and run the bot until the connection closes. This authenticates with ``token``, fires the ``connect`` event, then blocks processing incoming messages. :param token: The authorization token for this bot. """ await self._client.connect(token)
[docs] def run(self, token: str) -> None: """Start the bot's event loop and connect, blocking until it closes. A synchronous convenience wrapper around :meth:`connect` for use as a program's entry point. :param token: The authorization token for this bot. """ asyncio.run(self.connect(token))