# -*- coding: utf-8 -*-
# Copyright © tandemdude 2020-present
# This file is part of Lightbulb.
# Lightbulb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Lightbulb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public License
# along with Lightbulb. If not, see <>.
from __future__ import annotations

__all__ = ["BotApp", "when_mentioned_or"]

import asyncio
import functools
import importlib
import inspect
import logging
import os
import pathlib
import re
import sys
import typing as t
from importlib import util

import hikari
from multidict import CIMultiDict

from lightbulb import checks
from lightbulb import commands
from lightbulb import context as context_
from lightbulb import decorators
from lightbulb import errors
from lightbulb import events
from lightbulb import help_command as help_command_
from lightbulb import internal
from lightbulb import parser
from lightbulb import plugins as plugins_
from lightbulb.utils import data_store

_LOGGER = logging.getLogger("")
_APPLICATION_CMD_ERROR_REGEX: re.Pattern[str] = re.compile(

PrefixT = t.Union[
    t.Callable[["BotApp", hikari.Message], t.Union[t.Sequence[str], t.Coroutine[t.Any, t.Any, t.Sequence[str]]]],
CheckCoroT = t.TypeVar("CheckCoroT", bound=t.Callable[..., t.Union[bool, t.Coroutine[t.Any, t.Any, bool]]])

class _ExtensionT(t.Protocol):
    def load(self, bot: BotApp) -> None:

    def unload(self, bot: BotApp) -> None:

    commands.slash.SlashCommand: (
    commands.message.MessageCommand: (
    commands.user.UserCommand: (

[docs] def when_mentioned_or( prefix_provider: PrefixT, ) -> t.Callable[[BotApp, hikari.Message], t.Coroutine[t.Any, t.Any, t.Sequence[str]]]: """ Helper function which allows the bot's mentions to be used as the command prefix, as well as any other prefix(es) passed in or supplied by the ``prefix_provider``. Args: prefix_provider: A :obj:`str` prefix, Sequence[:obj:`str`] of prefixes, or sync or async callable that returns a prefix or sequence of prefixes. If ``None``, only the bot's mentions will be used. Example: .. code-block:: python # The below are all valid app = lightbulb.BotApp(prefix=lightbulb.when_mentioned_or("!"), ...) app = lightbulb.BotApp(prefix=lightbulb.when_mentioned_or(["!", "?"]), ...) # Using only mentions as the prefix app = lightbulb.BotApp(prefix=lightbulb.when_mentioned_or(None), ...) # Using with a get_prefix function def get_prefix(app, message): # Do something to get the prefixes return prefixes app = lightbulb.BotApp(prefix=lightbulb.when_mentioned_or(get_prefix), ...) """ async def get_prefixes(app: BotApp, message: hikari.Message) -> t.Sequence[str]: me = app.get_me() assert me is not None mentions = [f"<@{}> ", f"<@!{}> "] if callable(prefix_provider): prefixes = prefix_provider(app, message) if inspect.iscoroutine(prefixes): prefixes = await prefixes else: prefixes = prefix_provider if isinstance(prefixes, str): return mentions + [prefixes] elif isinstance(prefixes, t.Sequence): return mentions + list(prefixes) return mentions return get_prefixes
# str is by definition a sequence of str so these type hints are correct def _default_get_prefix(_: BotApp, __: hikari.Message, *, prefixes: t.Sequence[str]) -> t.Sequence[str]: return prefixes
[docs] class BotApp(hikari.GatewayBot): """ A subclassed implementation of the :obj:`~hikari.impl.gateway_bot.GatewayBot` class containing a command handler. This should be instantiated instead of the superclass if you wish to use the command handler implementation provided. Args: token (:obj:`str`): The bot account's token. prefix (Optional[PrefixT]): The command prefix to use for prefix commands, or ``None`` if prefix commands will not be used. ignore_bots (:obj:`bool`): Whether or not prefix commands should ignore bots for invocation. Defaults to ``True``. owner_ids (Sequence[:obj:`int`]): The IDs of the users that own the bot. If not provided then it will be fetched by :obj:`~BotApp.fetch_owner_ids`. default_enabled_guilds (Union[:obj:`int`, Sequence[:obj:`int`]]): The guild(s) to create application commands in by default if no guilds are specified per-command. Defaults to an empty tuple. help_class (Optional[Type[:obj:`~.help_command.BaseHelpCommand`]]): Class to use for the bot's help command. Defaults to :obj:`~.help_command.DefaultHelpCommand`. If ``None``, no help command will be added to the bot by default. help_slash_command (:obj:`bool`): Whether or not the help command should be implemented as a slash command as well as a prefix command. Defaults to ``False``. delete_unbound_commands (:obj:`bool`): Whether or not the bot should delete application commands that it cannot find an implementation for when the bot starts. Defaults to ``True``. case_insensitive_prefixes (:obj:`bool`): Wheter or not command prefixes should be case-insensitive. Defaults to ``False``. case_insensitive_prefix_commands (:obj:`bool`): Whether or not prefix command names should be case-insensitive. Defaults to ``False``. **kwargs (Any): Additional keyword arguments passed to the constructor of the :obj:`~hikari.impl.gateway_bot.GatewayBot` class. """ # noqa: E501 __slots__ = ( "get_prefix", "ignore_bots", "owner_ids", "application", "d", "_prefix_commands", "_slash_commands", "_message_commands", "_user_commands", "_plugins", "_checks", "extensions", "_current_extension", "default_enabled_guilds", "_help_command", "_delete_unbound_commands", "_case_insensitive_prefixes", "_case_insensitive_prefix_commands", "_running_tasks", ) def __init__( self, token: str, prefix: t.Optional[PrefixT] = None, ignore_bots: bool = True, owner_ids: t.Sequence[int] = (), default_enabled_guilds: t.Union[int, t.Sequence[int]] = (), help_class: t.Optional[t.Type[help_command_.BaseHelpCommand]] = help_command_.DefaultHelpCommand, help_slash_command: bool = False, delete_unbound_commands: bool = True, case_insensitive_prefixes: bool = False, case_insensitive_prefix_commands: bool = False, **kwargs: t.Any, ) -> None: super().__init__(token, **kwargs) # The prefix command handler expects an iterable to be returned from the get_prefix function, # so we have to wrap a single string prefix in a list here. if prefix is not None: prefix = [prefix] if isinstance(prefix, str) else prefix if isinstance(prefix, t.Sequence): # Create the default get prefix from the passed-in prefixes if a get_prefix function # was not provided prefix = functools.partial(_default_get_prefix, prefixes=prefix) self.get_prefix: t.Callable[ [BotApp, hikari.Message], t.Union[t.Sequence[str], t.Coroutine[t.Any, t.Any, t.Sequence[str]]] ] = prefix self._delete_unbound_commands = delete_unbound_commands self._case_insensitive_prefixes = case_insensitive_prefixes self._case_insensitive_prefix_commands = case_insensitive_prefix_commands self.ignore_bots: bool = ignore_bots """Whether or not other bots will be ignored when invoking prefix commands.""" self.owner_ids: t.Sequence[int] = owner_ids """The owner ID(s) for the owner(s) of the bot account.""" self.default_enabled_guilds: t.Sequence[int] = ( (default_enabled_guilds,) if isinstance(default_enabled_guilds, int) else default_enabled_guilds ) """The default guilds that application commands will be enabled in.""" self.application: t.Optional[hikari.Application] = None """The :obj:`~hikari.applications.Application` for the bot account. This will always be ``None`` before the bot has logged in.""" self.d: data_store.DataStore = data_store.DataStore() """A :obj:`~.utils.data_store.DataStore` instance enabling storage of custom data without subclassing.""" self.extensions: t.List[str] = [] """A list of the currently loaded extensions.""" self._current_extension: t.Optional[_ExtensionT] = None self._prefix_commands: t.MutableMapping[str, commands.prefix.PrefixCommand] = ( {} if not case_insensitive_prefix_commands else CIMultiDict() # type: ignore ) self._slash_commands: t.MutableMapping[str, commands.slash.SlashCommand] = {} self._message_commands: t.MutableMapping[str, commands.message.MessageCommand] = {} self._user_commands: t.MutableMapping[str, commands.user.UserCommand] = {} self._plugins: t.MutableMapping[str, plugins_.Plugin] = {} self._checks: t.List[t.Union[checks.Check, checks._ExclusiveCheck]] = [] self._help_command: t.Optional[help_command_.BaseHelpCommand] = None if help_class is not None: help_cmd_types: t.List[t.Type[commands.base.Command]] = [] if prefix is not None: help_cmd_types.append(commands.prefix.PrefixCommand) if help_slash_command: help_cmd_types.append(commands.slash.SlashCommand) if help_cmd_types: self._help_command = help_class(self) @decorators.option( "obj", "Object to get help for", required=False, modifier=commands.base.OptionModifier.CONSUME_REST ) @decorators.command("help", "Get help information for the bot", auto_defer=True) @decorators.implements(*help_cmd_types) async def __default_help(ctx: context_.base.Context) -> None: assert self._help_command is not None await self._help_command.send_help(ctx, ctx.options.obj) self.command(__default_help) # We need to store created tasks internally to ensure that they do not # get destroyed mid-execution. See asyncio.create_task documentation for more. self._running_tasks: t.List[asyncio.Task[t.Any]] = [] if prefix is not None: self.subscribe(hikari.MessageCreateEvent, self.handle_message_create_for_prefix_commands) self.subscribe(hikari.StartedEvent, self._manage_application_commands) self.subscribe(hikari.InteractionCreateEvent, self.handle_interaction_create_for_application_commands) self.subscribe(hikari.InteractionCreateEvent, self.handle_interaction_create_for_autocomplete)
[docs] def create_task(self, coro: t.Awaitable[t.Any], *, name: t.Optional[str] = None) -> asyncio.Task[t.Any]: """ Wrap the given awaitable into an :obj:`asyncio.Task` and schedule its execution. This method functions the same as :meth:`asyncio.create_task`, but keeps a reference to the task alive until execution is completed to ensure the task is not destroyed mid-execution. Args: coro (Awaitable[Any]): Coroutine to wrap into a task. Keyword Args: name (Optional[:obj:`str`]): The name of the task. Not required, defaults to ``None``. Returns: :obj:`asyncio.Task`: Created task object. .. versionadded:: 2.2.0 """ task: asyncio.Task[None] = asyncio.create_task(coro, name=name) # type: ignore[arg-type] self._running_tasks.append(task) task.add_done_callback(lambda task_: self._running_tasks.remove(task_)) return task
@property def help_command(self) -> t.Optional[help_command_.BaseHelpCommand]: """The current help command instance registered to the bot.""" return self._help_command @help_command.setter def help_command(self, val: help_command_.BaseHelpCommand) -> None: self._help_command = val @property def prefix_commands(self) -> t.MutableMapping[str, commands.prefix.PrefixCommand]: """Mapping of command name to command object containing all prefix commands registered to the bot.""" return self._prefix_commands @property def slash_commands(self) -> t.MutableMapping[str, commands.slash.SlashCommand]: """Mapping of command name to command object containing all slash commands registered to the bot.""" return self._slash_commands @property def message_commands(self) -> t.MutableMapping[str, commands.message.MessageCommand]: """Mapping of command name to command object containing all message commands registered to the bot.""" return self._message_commands @property def user_commands(self) -> t.MutableMapping[str, commands.user.UserCommand]: """Mapping of command name to command object containing all user commands registered to the bot.""" return self._user_commands @property def plugins(self) -> t.MutableMapping[str, plugins_.Plugin]: """Mapping of plugin name to plugin object containing all plugins registered to the bot.""" return self._plugins def _add_command_to_correct_attr(self, command: commands.base.Command) -> None: if isinstance(command, commands.prefix.PrefixCommand): for item in [, *command.aliases]: if item in self._prefix_commands: raise errors.CommandAlreadyExists( f"A prefix command with name or alias {item!r} is already registered." ) for item in [, *command.aliases]: self._prefix_commands[item] = command elif isinstance(command, commands.slash.SlashCommand): if in self._slash_commands: raise errors.CommandAlreadyExists(f"A slash command with name {!r} is already registered.") self._slash_commands[] = command elif isinstance(command, commands.message.MessageCommand): if in self._message_commands: raise errors.CommandAlreadyExists( f"A message command with name {!r} is already registered." ) self._message_commands[] = command elif isinstance(command, commands.user.UserCommand): if in self._user_commands: raise errors.CommandAlreadyExists(f"A user command with name {!r} is already registered.") self._user_commands[] = command def _get_application_command( self, interaction: hikari.CommandInteraction ) -> t.Optional[commands.base.ApplicationCommand]: if interaction.command_type is hikari.CommandType.SLASH: return self.get_slash_command(interaction.command_name) elif interaction.command_type is hikari.CommandType.USER: return self.get_user_command(interaction.command_name) elif interaction.command_type is hikari.CommandType.MESSAGE: return self.get_message_command(interaction.command_name) return None async def _manage_application_commands(self, _: hikari.StartingEvent) -> None: if self.application is None: self.application = await try: await internal.manage_application_commands(self) except hikari.ForbiddenError as exc: error_msg = str(exc) match = guild_id = "unknown" if match is None else raise errors.ApplicationCommandCreationFailed( f"Application command creation failed for guild {guild_id!r}. " + "Is your bot in the guild and was it invited with the 'applications.commands' scope?" ) from exc finally: await self.dispatch(events.LightbulbStartedEvent(app=self))
[docs] async def sync_application_commands(self) -> None: """ Sync all application commands registered to the bot with discord. Returns: ``None`` .. versionadded:: 2.2.0 """ await internal.manage_application_commands(self)
@staticmethod def _get_events_for_application_command( command: commands.base.ApplicationCommand, ) -> t.Tuple[ t.Type[events.CommandInvocationEvent], t.Type[events.CommandCompletionEvent], t.Type[events.CommandErrorEvent] ]: for k in APPLICATION_COMMANDS_EVENTS_MAPPING: if isinstance(command, k): return APPLICATION_COMMANDS_EVENTS_MAPPING[k] raise TypeError("Application command type not recognised")
[docs] def print_banner( # type: ignore[override] self, banner: t.Optional[str], allow_color: bool, force_color: bool, extra_args: t.Optional[t.Dict[str, str]] = None, ) -> None: super().print_banner(banner, allow_color, force_color) if banner == "hikari": sys.stdout.write("Thank you for using lightbulb!\n")
[docs] def load_extensions(self, *extensions: str) -> None: """ Load external extension(s) into the bot. Extension name follows the format ``<directory>.<filename>`` Each extension **must** contain a function ``load`` which takes a single argument which will be the ``BotApp`` instance you are loading the extension into. Args: extensions (:obj:`str`): The name of the extension(s) to load. Returns: ``None`` Raises: :obj:`~.errors.ExtensionAlreadyLoaded`: If the extension has already been loaded. :obj:`~.errors.ExtensionMissingLoad`: If the extension to be loaded does not contain a ``load`` function. :obj:`~.errors.ExtensionNotFound`: If the extension to be loaded does not exist. """ if len(extensions) > 1 or not extensions: for extension in extensions: self.load_extensions(extension) return extension = extensions[0] if extension in self.extensions: raise errors.ExtensionAlreadyLoaded(f"Extension {extension!r} is already loaded.") spec = util.find_spec(extension) if spec is None: raise errors.ExtensionNotFound(f"No extension by the name {extension!r} was found") module = importlib.import_module(extension) ext = t.cast(_ExtensionT, module) self._current_extension = ext if not hasattr(module, "load"): raise errors.ExtensionMissingLoad(f"Extension {extension!r} is missing a load function") else: ext.load(self) self.extensions.append(extension)"Extension loaded %r", extension) self._current_extension = None
[docs] def unload_extensions(self, *extensions: str) -> None: """ Unload external extension(s) from the bot. This method relies on a function, ``unload`` existing in the extensions which the bot will use to remove all commands and/or plugins from the bot. Args: extensions (:obj:`str`): The name of the extension(s) to unload. Returns: ``None`` Raises: :obj:`~.errors.ExtensionNotLoaded`: If the extension has not been loaded. :obj:`~.errors.ExtensionMissingUnload`: If the extension does not contain an ``unload`` function. :obj:`~.errors.ExtensionNotFound`: If the extension to be unloaded does not exist. """ if len(extensions) > 1 or not extensions: for extension in extensions: self.unload_extensions(extension) return extension = extensions[0] if extension not in self.extensions: raise errors.ExtensionNotLoaded(f"Extension {extension!r} is not loaded.") try: module = importlib.import_module(extension) except ModuleNotFoundError: raise errors.ExtensionNotFound(f"No extension by the name {extension!r} was found") from None ext = t.cast(_ExtensionT, module) self._current_extension = ext if not hasattr(module, "unload"): raise errors.ExtensionMissingUnload(f"Extension {extension!r} is missing an unload function") else: ext.unload(self) self.extensions.remove(extension) del sys.modules[extension]"Extension unloaded %r", extension) self._current_extension = None
[docs] def reload_extensions(self, *extensions: str) -> None: """ Reload bot extension(s). This method is atomic and so the bot will revert to the previous loaded state if an extension encounters a problem during unloading or loading. Args: extensions (:obj:`str`): The name of the extension(s) to be reloaded. Returns: ``None`` """ if len(extensions) > 1 or not extensions: for extension in extensions: self.reload_extensions(extension) return extension = extensions[0] try: old = sys.modules[extension] except KeyError: raise errors.ExtensionNotLoaded(f"Extension {extension!r} is not loaded.") try: self.unload_extensions(extension) self.load_extensions(extension) except Exception as e: sys.modules[extension] = old if not isinstance(e, errors.ExtensionAlreadyLoaded): self.load_extensions(extension) raise e else: del old
[docs] def load_extensions_from( self, *paths: t.Union[str, pathlib.Path], recursive: bool = False, must_exist: bool = True ) -> None: """ Load all external extensions from the given directories. Files that begin with an underscore ( _ ) are ignored. Every extension **must** contain a function ``load`` which takes a single argument which will be the ``BotApp`` instance you are loading the extension into. Args: *paths (Union[:obj:`str`, :obj:`pathlib.Path`]): The directories to load extensions from. These can be relative or absolute directories. In the case that a directory is absolute, the extension name will be set as though the directory was relative. Keyword Args: recursive (:obj:`bool`): Whether to search the directories recursively. Defaults to False. must_exist (:obj:`bool`): Whether all directories must exist before extensions can be loaded. If this is False and a directory does not exist, the directory will be ignored. If this is True, a :obj:`FileNotFoundError` is thrown if any directory does not exist. Defaults to True. Returns: ``None`` Raises: :obj:`~.errors.ExtensionAlreadyLoaded`: If the extension has already been loaded. :obj:`~.errors.ExtensionMissingLoad`: If the extension to be loaded does not contain a ``load`` function. :obj:`~.errors.ExtensionNotFound`: If the extension to be loaded does not exist. :obj:`FileNotFoundError`: If any directory to load extensions from does not exist and ``must_exist`` is True. :obj:`ValueError`: If the path provided is not relative to the current working directory. """ if len(paths) > 1 or not paths: for path_ in paths: self.load_extensions_from(path_, recursive=recursive, must_exist=must_exist) return path = paths[0] if isinstance(path, str): path = pathlib.Path(path) try: path = path.resolve().relative_to(pathlib.Path.cwd()) except ValueError: raise ValueError(f"'{path}' must be relative to the working directory") from None if not path.is_dir(): if must_exist: raise FileNotFoundError(f"'{path}' is not an existing directory") return glob = path.rglob if recursive else path.glob for ext_path in glob("[!_]*.py"): ext = str(ext_path.with_suffix("")).replace(os.sep, ".") self.load_extensions(ext)
[docs] async def fetch_owner_ids(self) -> t.Sequence[hikari.Snowflakeish]: """ Fetch the bot's owner IDs, or return the given owner IDs on instantiation if provided. Returns: Sequence[Snowflakeish]: The IDs of the bot's owners. """ if self.owner_ids: return self.owner_ids self.application = self.application or await owner_ids: t.List[hikari.Snowflake] = [] if self.application.owner is not None: owner_ids.append( if is not None: owner_ids.extend([member_id for member_id in]) return owner_ids
[docs] async def maybe_dispatch_error_event( self, event: events.CommandErrorEvent, priority_handlers: t.Sequence[ t.Union[None, t.Callable[[events.CommandErrorEvent], t.Coroutine[t.Any, t.Any, t.Optional[bool]]]] ], ) -> bool: """ Attempts to handle the event first using the given ``priority_handlers``, falling back to dispatching the given event to global listeners. Args: event (:obj:``): Event to attempt to handle. priority_handlers (Sequence[Union[``None``, ListenerT)]]: Handlers to attempt to use to handle the event before falling back to global event listeners. Returns: :obj:`bool`: Whether or not the given event was handled or dispatched successfully. """ handled = False for listener in priority_handlers: if handled: break if listener is not None: handled = bool(await listener(event)) if not handled: if self.get_listeners(type(event), polymorphic=True): await self.dispatch(event) handled = True elif self.get_listeners(events.CommandErrorEvent, polymorphic=True): await self.dispatch(event) handled = True return handled
@t.overload def check(self, check: t.Union[checks.Check, CheckCoroT]) -> checks.Check: ... @t.overload def check(self) -> t.Callable[[CheckCoroT], checks.Check]: ...
[docs] def check( self, check: t.Optional[t.Union[checks.Check, CheckCoroT, checks._ExclusiveCheck]] = None, ) -> t.Union[checks.Check, t.Callable[[CheckCoroT], checks.Check]]: """ Adds a :obj:`~.checks.Check` object or check function the bot's checks. This method can be used as a first or second order decorator, or called manually with the :obj:`~.checks.Check` instance or function to add as a check. If a function is decorated or passed in then it will be wrapped in a :obj:`~.checks.Check` object before it is added to the bot. """ if check is not None: if not isinstance(check, checks.Check): check = checks.Check(check) self._checks.append(check) check.add_to_object_hook(self) return check def decorate( check_func: t.Callable[[context_.base.Context], t.Union[bool, t.Coroutine[t.Any, t.Any, bool]]] ) -> checks.Check: new_check = checks.Check(check_func) self._checks.append(new_check) new_check.add_to_object_hook(self) return new_check return decorate
[docs] def get_prefix_command(self, name: str) -> t.Optional[commands.prefix.PrefixCommand]: """ Gets the prefix command with the given name, or ``None`` if no command with that name was found. Args: name (:obj:`str`): Name of the prefix command to get. Returns: Optional[:obj:`~.commands.prefix.PrefixCommand`]: Prefix command object with the given name, or ``None`` if not found. """ parts = name.split() if len(parts) == 1: return self._prefix_commands.get(name) maybe_group = self._prefix_commands.get(parts.pop(0)) if not isinstance(maybe_group, commands.prefix.PrefixCommandGroup): return None this: t.Optional[ t.Union[ commands.prefix.PrefixCommandGroup, commands.prefix.PrefixSubGroup, commands.prefix.PrefixSubCommand ] ] = maybe_group for part in parts: if this is None or isinstance(this, commands.prefix.PrefixSubCommand): return None this = this.get_subcommand(part) return this
[docs] def get_slash_command(self, name: str) -> t.Optional[commands.slash.SlashCommand]: """ Gets the slash command with the given name, or ``None`` if no command with that name was found. Args: name (:obj:`str`): Name of the slash command to get. Returns: Optional[:obj:`~.commands.slash.SlashCommand`]: Slash command object with the given name, or ``None`` if not found. """ return self._slash_commands.get(name)
[docs] def get_message_command(self, name: str) -> t.Optional[commands.message.MessageCommand]: """ Gets the message command with the given name, or ``None`` if no command with that name was found. Args: name (:obj:`str`): Name of the message command to get. Returns: Optional[:obj:`~.commands.message.MessageCommand`]: Message command object with the given name, or ``None`` if not found. """ return self._message_commands.get(name)
[docs] def get_user_command(self, name: str) -> t.Optional[commands.user.UserCommand]: """ Gets the user command with the given name, or ``None`` if no command with that name was found. Args: name (:obj:`str`): Name of the user command to get. Returns: Optional[:obj:`~.commands.user.UserCommand`]: User command object with the given name, or ``None`` if not found. """ return self._user_commands.get(name)
@t.overload def command(self, cmd_like: commands.base.CommandLike) -> commands.base.CommandLike: ... @t.overload def command(self) -> t.Callable[[commands.base.CommandLike], commands.base.CommandLike]: ...
[docs] def command( self, cmd_like: t.Optional[commands.base.CommandLike] = None ) -> t.Union[commands.base.CommandLike, t.Callable[[commands.base.CommandLike], commands.base.CommandLike]]: """ Adds a :obj:`~.commands.base.CommandLike` object as a command to the bot. This method can be used as a first or second order decorator, or called manually with the :obj:`~.commands.CommandLike` instance to add as a command. """ if cmd_like is not None: commands_to_impl: t.Sequence[t.Type[commands.base.Command]] = getattr( cmd_like.callback, "__cmd_types__", [] ) _LOGGER.debug( "Registering command %r. Requested types are: %s",, ",".join(c.__name__ for c in commands_to_impl), ) for command_cls in commands_to_impl: cmd = command_cls(self, cmd_like) if cmd.is_subcommand: continue cmd._validate_attributes() self._add_command_to_correct_attr(cmd) return cmd_like def decorate(cmd_like_: commands.base.CommandLike) -> commands.base.CommandLike: self.command(cmd_like_) return cmd_like_ return decorate
[docs] def remove_command(self, command: t.Union[commands.base.Command, commands.base.CommandLike]) -> None: """ Removes a command or command-like object from the bot. Args: command (Union[:obj:`~.commands.base.Command`, :obj:`~.commands.base.CommandLike`): Command or command-like object to remove from the bot. Returns: ``None`` """ _LOGGER.debug("Removing command %r (%s)",, command.__class__.__name__) if isinstance(command, commands.base.CommandLike): self._remove_commandlike(command) return if isinstance(command, commands.prefix.PrefixCommand): for item in [, *command.aliases]: self._prefix_commands.pop(item, None) elif isinstance(command, commands.slash.SlashCommand): self._slash_commands.pop(, None) elif isinstance(command, commands.message.MessageCommand): self._message_commands.pop(, None) elif isinstance(command, commands.user.UserCommand): self._user_commands.pop(, None)
def _remove_commandlike(self, cmd_like: commands.base.CommandLike) -> None: commands_to_remove: t.List[t.Optional[commands.base.Command]] = [] cmd_types: t.Sequence[t.Type[commands.base.Command]] = getattr(cmd_like.callback, "__cmd_types__", []) for cmd_type in cmd_types: if issubclass(cmd_type, commands.prefix.PrefixCommand): commands_to_remove.append(self.get_prefix_command( elif issubclass(cmd_type, commands.slash.SlashCommand): commands_to_remove.append(self.get_slash_command( elif issubclass(cmd_type, commands.message.MessageCommand): commands_to_remove.append(self.get_message_command( elif issubclass(cmd_type, commands.user.UserCommand): commands_to_remove.append(self.get_user_command( for command in filter(None, commands_to_remove): self.remove_command(command)
[docs] def get_plugin(self, name: str) -> t.Optional[plugins_.Plugin]: """ Gets the plugin with the given name, or ``None`` if no plugin with that name was found. Args: name (:obj:`str`): Name of the plugin to get. Returns: Optional[:obj:`~.plugins.Plugin`]: Plugin object with the given name, or ``None`` if not found. """ return self._plugins.get(name)
[docs] def add_plugin(self, plugin: plugins_.Plugin) -> None: """ Registers a plugin to the bot, adding all commands and listeners present in the plugin. Args: plugin (:obj:`~.plugins.Plugin`): Plugin to register to the bot. Returns: ``None`` """ = self for command in plugin._all_commands: self._add_command_to_correct_attr(command) for event, listeners in plugin._listeners.items(): for listener in listeners: self.subscribe(event, listener) _LOGGER.debug("Plugin registered %r", self._plugins[] = plugin
[docs] def remove_plugin(self, plugin_or_name: t.Union[plugins_.Plugin, str]) -> None: """ Unregisters a plugin from the bot, removing all commands and listeners present in the plugin. Args: plugin_or_name (Union[:obj:`~.plugins.Plugin`, :obj:`str`]): Plugin or name of the plugin to unregister from the bot. Returns: ``None`` """ plugin: t.Optional[t.Union[plugins_.Plugin, str]] = plugin_or_name if isinstance(plugin, str): plugin = self.get_plugin(plugin) if plugin is None: return assert isinstance(plugin, plugins_.Plugin) for command in plugin._raw_commands: self.remove_command(command) for event, listeners in plugin._listeners.items(): for listener in listeners: self.unsubscribe(event, listener) if plugin._remove_hook is not None: maybe_coro = plugin._remove_hook() if inspect.iscoroutine(maybe_coro): self.create_task(maybe_coro) self._plugins.pop(, None) _LOGGER.debug("Plugin removed %r",
[docs] async def purge_application_commands(self, *guild_ids: hikari.Snowflakeish, global_commands: bool = False) -> None: """ Purges all application commands from the guilds with the specified IDs, and all the global application commands if ``global_commands`` is ``True``. Useful if you want to teardown old slash commands from the bot and cannot be bothered to fetch them individually yourself. If neither `guild_ids` nor `global_commands` is specified then this method will do nothing. Args: *guild_ids (:obj:`hikari.Snowflakeish`): IDs for the guilds to purge application commands from. Keyword Args: global_commands (:obj:`bool`): Whether or not to purge global slash commands from the bot. Returns: ``None`` """ assert self.application is not None if global_commands: await, ()) if guild_ids: for guild_id in guild_ids: await, (), guild_id)
[docs] async def get_prefix_context( self, event: hikari.MessageCreateEvent, cls: t.Type[context_.prefix.PrefixContext] = context_.prefix.PrefixContext, ) -> t.Optional[context_.prefix.PrefixContext]: """ Get the :obj:`~.context.prefix.PrefixContext` instance for the given event, or ``None`` if no context could be created. Args: event (:obj:``): Event to get the prefix context for. cls (Type[:obj:`~.context.prefix.PrefixContext`]): Context class to instantiate. Defaults to :obj:`~.context.prefix.PrefixContext`. Returns: Optional[:obj:`~.context.prefix.PrefixContext`]: Prefix context instance for the given event. """ assert event.message.content is not None prefixes = self.get_prefix(self, event.message) if inspect.iscoroutine(prefixes): prefixes = await prefixes prefixes = t.cast(t.Sequence[str], prefixes) if isinstance(prefixes, str): prefixes = (prefixes,) message = event.message.content if self._case_insensitive_prefixes: message = message.lower() prefixes = tuple(map(str.lower, prefixes)) prefixes = sorted(prefixes, key=len, reverse=True) invoked_prefix = None for prefix in prefixes: if message.startswith(prefix): invoked_prefix = prefix break if invoked_prefix is None: return None new_content = event.message.content[len(invoked_prefix) :] if not new_content or new_content.isspace(): return None split_content = new_content.split(maxsplit=1) invoked_with, args = split_content[0], "".join(split_content[1:]) if not invoked_with: return None command = self.get_prefix_command(invoked_with) ctx = cls(self, event, command, invoked_with, invoked_prefix) if ctx.command is not None: ctx._parser = (ctx.command.parser or parser.Parser)(ctx, args) return ctx
[docs] async def process_prefix_commands(self, context: context_.prefix.PrefixContext) -> None: """ Invokes the appropriate command for the given context. Args: context (:obj:`.context.prefix.PrefixContext`): Context to invoke the command under. Returns: ``None`` """ if context.command is None: raise errors.CommandNotFound( f"A command with name or alias {context.invoked_with!r} does not exist", invoked_with=context.invoked_with, ) await context.invoke()
[docs] async def handle_message_create_for_prefix_commands(self, event: hikari.MessageCreateEvent) -> None: """ Prefix command :obj:`` listener. This handles fetching the context, dispatching events, and invoking the appropriate command. Args: event (:obj:``): Event that prefix commands will be processed for. Returns: ``None`` """ if self.ignore_bots and not event.is_human: return if not event.message.content: return context = await self.get_prefix_context(event) if context is None: return if context.command is not None: await self.dispatch(events.PrefixCommandInvocationEvent(app=self, command=context.command, context=context)) try: await self.process_prefix_commands(context) except Exception as exc: new_exc = exc if not isinstance(exc, errors.LightbulbError): assert context.command is not None new_exc = errors.CommandInvocationError( f"An error occurred during command {!r} invocation", original=exc ) assert isinstance(new_exc, errors.LightbulbError) error_event = events.PrefixCommandErrorEvent(app=self, exception=new_exc, context=context) handled = await self.maybe_dispatch_error_event( error_event, [ getattr(context.command, "error_handler", None), getattr(context.command.plugin, "_error_handler", None) if context.command is not None else None, ], ) if not handled: raise new_exc else: assert context.command is not None await self.dispatch(events.PrefixCommandCompletionEvent(app=self, command=context.command, context=context))
[docs] async def get_slash_context( self, event: hikari.InteractionCreateEvent, command: commands.slash.SlashCommand, cls: t.Type[context_.slash.SlashContext] = context_.slash.SlashContext, ) -> context_.slash.SlashContext: """ Get the :obj:`~.context.slash.SlashContext` instance for the given event. Args: event (:obj:``): Event to get the slash context for. command (:obj:`~.commands.slash.SlashCommand`): Command that the context is for. cls (Type[:obj:`~.context.slash.SlashContext`]): Context class to instantiate. Defaults to :obj:`~.context.slash.SlashContext`. Returns: :obj:`~.context.slash.SlashContext`: Slash context instance for the given event. """ return cls(self, event, command)
[docs] async def get_message_context( self, event: hikari.InteractionCreateEvent, command: commands.message.MessageCommand, cls: t.Type[context_.message.MessageContext] = context_.message.MessageContext, ) -> context_.message.MessageContext: """ Get the :obj:`~.context.message.MessageContext` instance for the given event. Args: event (:obj:``): Event to get the message context for. command (:obj:`~.commands.message.MessageCommand`): Command that the context is for. cls (Type[:obj:`~.context.message.MessageContext`]): Context class to instantiate. Defaults to :obj:`~.context.message.MessageContext`. Returns: :obj:`~.context.message.MessageContext`: Message context instance for the given event. """ return cls(self, event, command)
[docs] async def get_user_context( self, event: hikari.InteractionCreateEvent, command: commands.user.UserCommand, cls: t.Type[context_.user.UserContext] = context_.user.UserContext, ) -> context_.user.UserContext: """ Get the :obj:`~.context.user.UserContext` instance for the given event. Args: event (:obj:``): Event to get the user context for. command (:obj:`~.commands.slash.SlashCommand`): Command that the context is for. cls (Type[:obj:`~.context.user.UserContext`]): Context class to instantiate. Defaults to :obj:`~.context.user.UserContext`. Returns: :obj:`~.context.user.UserContext`: User context instance for the given event. """ return cls(self, event, command)
[docs] async def get_application_command_context( self, event: hikari.InteractionCreateEvent ) -> t.Optional[context_.base.ApplicationContext]: """ Get the appropriate subclass instance of :obj:`~.context.base.Application` for the given event. Args: event (:obj:``): Event to get the context for. Returns: :obj:`~.context.base.ApplicationContext`: Context instance for the given event. """ assert isinstance(event.interaction, hikari.CommandInteraction) cmd = self._get_application_command(event.interaction) if cmd is None: return None if isinstance(cmd, commands.slash.SlashCommand): return await self.get_slash_context(event, cmd) elif isinstance(cmd, commands.user.UserCommand): return await self.get_user_context(event, cmd) elif isinstance(cmd, commands.message.MessageCommand): return await self.get_message_context(event, cmd) return None
[docs] async def invoke_application_command(self, context: context_.base.ApplicationContext) -> None: """ Invokes the appropriate application command for the given context and handles event dispatching for the command invocation. Args: context (:obj:`~.context.base.ApplicationContext`): Context to invoke application commands under. Returns: ``None`` """ cmd_events = self._get_events_for_application_command(context.command) await self.dispatch(cmd_events[0](app=self, command=context.command, context=context)) try: await context.invoke() except Exception as exc: new_exc = exc if not isinstance(exc, errors.LightbulbError): new_exc = errors.CommandInvocationError( f"An error occurred during command {!r} invocation", original=exc ) assert isinstance(new_exc, errors.LightbulbError) error_event = cmd_events[2](app=self, exception=new_exc, context=context) handled = await self.maybe_dispatch_error_event( error_event, [ getattr(context.command, "error_handler", None), getattr(context.command.plugin, "_error_handler", None), ], ) if not handled: raise new_exc else: await self.dispatch(cmd_events[1](app=self, command=context.command, context=context))
[docs] async def handle_interaction_create_for_application_commands(self, event: hikari.InteractionCreateEvent) -> None: """ Application command :obj:`` listener. This handles fetching the context, dispatching events, and invoking the appropriate command. Args: event (:obj:``): Event that application commands will be processed for. Returns: ``None`` """ if not isinstance(event.interaction, hikari.CommandInteraction): return context = await self.get_application_command_context(event) if context is None: return await self.invoke_application_command(context)
[docs] async def handle_interaction_create_for_autocomplete(self, event: hikari.InteractionCreateEvent) -> None: """ Autocomplete :obj:`` listener. This handles resolving the function to use for autocompletion, response conversion into :obj:`~hikari.commands.CommandChoice` and responding to the interaction with the provided options Args: event (:obj:``): Event that autocomplete will be processed for. Returns: ``None`` """ if not isinstance(event.interaction, hikari.AutocompleteInteraction): return assert event.interaction.command_type is hikari.CommandType.SLASH assert event.interaction.options is not None and len(event.interaction.options) > 0 def is_focused(opt: hikari.AutocompleteInteractionOption) -> bool: return opt.is_focused def get_focused(opts: t.Sequence[hikari.AutocompleteInteractionOption]) -> hikari.AutocompleteInteractionOption: return next(filter(is_focused, opts), opts[0]) def flatten_command_option( opt: hikari.AutocompleteInteractionOption, ) -> t.Tuple[hikari.AutocompleteInteractionOption, t.Sequence[str]]: current = opt name = [] while current.type in (hikari.OptionType.SUB_COMMAND, hikari.OptionType.SUB_COMMAND_GROUP): assert current.options is not None current = get_focused(current.options) name.append( return current, name option, full_name = flatten_command_option(get_focused(event.interaction.options)) cmd = self.get_slash_command(event.interaction.command_name) if cmd is None: return for part in full_name[:-1]: if not isinstance(cmd, commands.slash.SlashGroupMixin): return cmd = cmd.get_subcommand(part) if cmd is None: return callback = cmd._initialiser._autocomplete_callbacks.get(full_name[-1]) if callback is None: return # Invoke the autocomplete callback response = await callback(option, event.interaction) if not response: await event.interaction.create_response([]) return def convert_response_value( val: t.Union[str, int, float, hikari.api.AutocompleteChoiceBuilder] ) -> hikari.api.AutocompleteChoiceBuilder: if isinstance(val, (str, int, float)): return hikari.impl.AutocompleteChoiceBuilder(name=str(val), value=val) return val resp_to_send: t.List[hikari.api.AutocompleteChoiceBuilder] = [] if isinstance(response, (str, int, float, hikari.api.AutocompleteChoiceBuilder)): resp_to_send.append(convert_response_value(response)) elif isinstance(response, for item in response: resp_to_send.append(convert_response_value(item)) else: _LOGGER.error("Invalid response returned from autocomplete handler %r", callback.__name__) # type: ignore[unreachable] try: await event.interaction.create_response(resp_to_send) except hikari.NotFoundError as e: _LOGGER.debug("Failed sending autocomplete response", exc_info=(type(e), e, e.__traceback__))