Source code for lightbulb.commands.base

# -*- coding: utf-8 -*-
# Copyright © tandemdude 2020-present
#
# This file is part of Lightbulb.
#
# Lightbulb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Lightbulb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Lightbulb. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations

__all__ = ["OptionModifier", "OptionLike", "CommandLike", "Command", "ApplicationCommand", "SubCommandTrait"]

import abc
import asyncio
import collections
import datetime
import enum
import inspect
import re
import typing as t

import attrs
import hikari

from lightbulb import errors

if t.TYPE_CHECKING:
    from lightbulb import app as app_
    from lightbulb import buckets
    from lightbulb import checks
    from lightbulb import context as context_
    from lightbulb import cooldowns
    from lightbulb import events
    from lightbulb import parser as parser_
    from lightbulb import plugins

_AutocompleteableOptionT = t.Union[str, int, float]
AutocompleteCallbackT = t.TypeVar(
    "AutocompleteCallbackT",
    bound=t.Callable[
        ...,
        t.Coroutine[
            t.Any,
            t.Any,
            t.Union[
                _AutocompleteableOptionT,
                hikari.api.AutocompleteChoiceBuilder,
                t.Sequence[t.Union[_AutocompleteableOptionT, hikari.api.AutocompleteChoiceBuilder]],
            ],
        ],
    ],
)
ErrorHandlerCallbackT = t.TypeVar(
    "ErrorHandlerCallbackT", bound=t.Callable[..., t.Coroutine[t.Any, t.Any, t.Optional[bool]]]
)


class CommandCallbackT(t.Protocol):
    def __call__(self, context: context_.base.Context, **kwargs: t.Any) -> t.Coroutine[t.Any, t.Any, None]:
        ...


OPTION_TYPE_MAPPING = {
    str: hikari.OptionType.STRING,
    int: hikari.OptionType.INTEGER,
    float: hikari.OptionType.FLOAT,
    bool: hikari.OptionType.BOOLEAN,
    hikari.User: hikari.OptionType.USER,
    hikari.Member: hikari.OptionType.USER,
    hikari.GuildChannel: hikari.OptionType.CHANNEL,
    hikari.TextableGuildChannel: hikari.OptionType.CHANNEL,
    hikari.TextableChannel: hikari.OptionType.CHANNEL,
    hikari.GuildCategory: hikari.OptionType.CHANNEL,
    hikari.GuildVoiceChannel: hikari.OptionType.CHANNEL,
    hikari.Role: hikari.OptionType.ROLE,
    hikari.Emoji: hikari.OptionType.STRING,
    hikari.Guild: hikari.OptionType.STRING,
    hikari.Message: hikari.OptionType.STRING,
    hikari.Invite: hikari.OptionType.STRING,
    hikari.Colour: hikari.OptionType.STRING,
    hikari.Color: hikari.OptionType.STRING,
    hikari.Snowflake: hikari.OptionType.STRING,
    datetime.datetime: hikari.OptionType.STRING,
    hikari.Attachment: hikari.OptionType.ATTACHMENT,
}
OPTION_NAME_REGEX: re.Pattern[str] = re.compile(r"^[\w-]{1,32}$", re.U)


class _HasRecreateSubcommands(t.Protocol):
    app: app_.BotApp

    def recreate_subcommands(self, raw_cmds: t.Sequence[CommandLike], app: app_.BotApp) -> None:
        ...


class _SubcommandListProxy(collections.UserList):  # type: ignore
    def __init__(self, *args: t.Any, parent: _HasRecreateSubcommands, **kwargs: t.Any) -> None:
        super().__init__(*args, **kwargs)
        self.parents = [parent]

    def append(self, item: t.Any) -> None:
        super().append(item)
        for parent in self.parents:
            parent.recreate_subcommands(self.data, parent.app)

    def add_parent(self, parent: _HasRecreateSubcommands) -> _SubcommandListProxy:
        self.parents.append(parent)
        return self


def _get_choice_objects_from_choices(
    choices: t.Sequence[t.Union[str, int, float, hikari.CommandChoice]]
) -> t.Sequence[hikari.CommandChoice]:
    return [c if isinstance(c, hikari.CommandChoice) else hikari.CommandChoice(name=str(c), value=c) for c in choices]


[docs] class OptionModifier(enum.Enum): """Enum representing option modifiers that affect parsing for prefix commands.""" NONE = enum.auto() """No modifier. This will be parsed as a normal argument.""" GREEDY = enum.auto() """Greedy option. This will consume arguments until the string is exhausted or conversion fails.""" CONSUME_REST = enum.auto() """Consume rest option. This will consume the entire remainder of the string."""
[docs] @attrs.define(slots=True) class OptionLike: """ Generic dataclass representing a command option. Compatible with both prefix and application commands. """ name: str """The name of the option.""" description: str """The description of the option""" arg_type: t.Any = str """The type of the option.""" required: bool = True """Whether or not the option is required. This will be inferred from whether or not a default value was provided if unspecified.""" choices: t.Optional[t.Sequence[t.Union[str, int, float, hikari.CommandChoice]]] = None """The option's choices. This only affects slash commands.""" channel_types: t.Optional[t.Sequence[hikari.ChannelType]] = None """The channel types for this option. This only affects slash commands.""" default: hikari.UndefinedOr[t.Any] = hikari.UNDEFINED """The default value for this option.""" modifier: OptionModifier = OptionModifier.NONE """Additional modifier controlling how the option should be parsed. This only affects prefix commands.""" min_value: t.Optional[t.Union[float, int]] = None """ The minimum value permitted for this option (inclusive). The option must be ``INTEGER`` or ``FLOAT`` to use this. .. versionadded:: 2.1.3 """ max_value: t.Optional[t.Union[float, int]] = None """ The maximum value permitted for this option (inclusive). The option must be ``INTEGER`` or ``FLOAT`` to use this. .. versionadded:: 2.1.3 """ min_length: t.Optional[int] = None """ The minimum length permitted for this option. The option must be ``STRING`` to use this. .. versionadded:: 2.3.2 """ max_length: t.Optional[int] = None """ The maximum length permitted for this option. The option must be ``STRING`` to use this. .. versionadded:: 2.3.2 """ autocomplete: bool = False """Whether the option should be autocompleted or not. This only affects slash commands.""" name_localizations: t.Mapping[t.Union[hikari.Locale, str], str] = attrs.field(factory=dict) """ A mapping of locale to name localizations for this option .. versionadded:: 2.3.0 """ description_localizations: t.Mapping[t.Union[hikari.Locale, str], str] = attrs.field(factory=dict) """ A mapping of locale to description localizations for this option .. versionadded:: 2.3.0 """
[docs] def as_application_command_option(self) -> hikari.CommandOption: """ Convert this object into a :obj:`~hikari.commands.CommandOption`. Returns: :obj:`~hikari.commands.CommandOption`: Created ``CommandOption`` object. """ if not OPTION_NAME_REGEX.fullmatch(self.name) or self.name != self.name.lower(): raise ValueError( f"Application command option {self.name!r}: " + "name must match regex '^[\\w-]{1, 32}$' and be all lowercase" ) if len(self.description) < 1 or len(self.description) > 100: raise ValueError( f"Application command option {self.name!r}: description must be from 1-100 characters long" ) arg_type = OPTION_TYPE_MAPPING.get(self.arg_type, self.arg_type) if not isinstance(arg_type, hikari.OptionType): arg_type = hikari.OptionType.STRING if (self.min_value is not None or self.max_value is not None) and arg_type not in ( hikari.OptionType.INTEGER, hikari.OptionType.FLOAT, ): raise ValueError( f"Application command option {self.name!r}: " + "'min_value' or 'max_value' was provided but the option type is not numeric" ) if (self.min_length is not None or self.max_length is not None) and arg_type is not hikari.OptionType.STRING: raise ValueError( f"Application command option {self.name!r}: " + "'min_length' or 'max_length' was provided but the option type is not string" ) if ( arg_type not in (hikari.OptionType.INTEGER, hikari.OptionType.FLOAT, hikari.OptionType.STRING) and self.autocomplete ): raise ValueError( f"Application command option {self.name!r}: " + "'autocomplete' is True but the option type does not support choices" ) kwargs: t.MutableMapping[str, t.Any] = { "type": arg_type, "name": self.name, "description": self.description, "is_required": self.required, "autocomplete": self.autocomplete, "name_localizations": self.name_localizations, "description_localizations": self.description_localizations, } if self.choices: if len(self.choices) > 25: raise ValueError("Application command options can have at most 25 choices") kwargs["choices"] = _get_choice_objects_from_choices(self.choices) if self.channel_types: kwargs["channel_types"] = self.channel_types if self.min_value is not None: kwargs["min_value"] = ( int(self.min_value) if arg_type is hikari.OptionType.INTEGER else float(self.min_value) ) if self.max_value is not None: kwargs["max_value"] = ( int(self.max_value) if arg_type is hikari.OptionType.INTEGER else float(self.max_value) ) if self.min_length is not None: kwargs["min_length"] = self.min_length if self.max_length is not None: kwargs["max_length"] = self.max_length return hikari.CommandOption(**kwargs)
[docs] @attrs.define(slots=True) class CommandLike: """Generic dataclass representing a command. This can be converted into any command object.""" callback: CommandCallbackT """The callback function for the command.""" name: str """The name of the command.""" description: str """The description of the command.""" options: t.MutableMapping[str, OptionLike] = attrs.field(factory=dict) """The options for the command.""" checks: t.Sequence[t.Union[checks.Check, checks._ExclusiveCheck]] = attrs.field(factory=list) """The checks for the command.""" error_handler: t.Optional[ t.Callable[[events.CommandErrorEvent], t.Coroutine[t.Any, t.Any, t.Optional[bool]]] ] = None """The error handler for the command.""" aliases: t.Sequence[str] = attrs.field(factory=list) """The aliases for the command. This only affects prefix commands.""" guilds: hikari.UndefinedOr[t.Sequence[int]] = hikari.UNDEFINED """The guilds for the command. This only affects application commands.""" subcommands: t.List[CommandLike] = attrs.field(factory=list) """Subcommands for the command.""" parser: t.Optional[t.Type[parser_.BaseParser]] = None """The argument parser to use for prefix commands.""" cooldown_manager: t.Optional[cooldowns.CooldownManager] = None """The cooldown manager for the command.""" help_getter: t.Optional[t.Callable[[Command, context_.base.Context], str]] = None """The function to call to get the command's long help text.""" auto_defer: bool = False """Whether or not to automatically defer the response when the command is invoked.""" ephemeral: bool = False """Whether or not to send responses from this command as ephemeral messages by default.""" check_exempt: t.Optional[t.Callable[[context_.base.Context], t.Union[bool, t.Coroutine[t.Any, t.Any, bool]]]] = None """Check exempt predicate to use for the command.""" hidden: bool = False """Whether or not the command should be hidden from the help command.""" inherit_checks: bool = False """Whether or not the command should inherit checks from the parent group.""" pass_options: bool = False """ Whether or not the command will have its options passed as keyword arguments when invoked. .. versionadded:: 2.2.1 """ max_concurrency: t.Optional[t.Tuple[int, t.Type[buckets.Bucket]]] = None """ The max concurrency rule for the command. .. versionadded:: 2.2.1 """ app_command_default_member_permissions: t.Optional[hikari.Permissions] = None """ The default member permissions for this command, if an application command. .. versionadded:: 2.2.3 """ app_command_dm_enabled: bool = True """ Whether this command will be enabled in DMs, if an application command. .. versionadded:: 2.2.3 """ app_command_bypass_author_permission_checks: bool = False """ Whether invocations of this command will bypass author permission checks, if an application command. .. versionadded:: 2.2.3 """ name_localizations: t.Mapping[t.Union[hikari.Locale, str], str] = attrs.field(factory=dict) """ A mapping of locale to name localizations for this command .. versionadded:: 2.3.0 """ description_localizations: t.Mapping[t.Union[hikari.Locale, str], str] = attrs.field(factory=dict) """ A mapping of locale to description localizations for this command .. versionadded:: 2.3.0 """ nsfw: bool = False """ Whether the command should only be enabled in NSFW channels. For prefix commands, this will add an NSFW-channel only check to the command automatically. For slash commands, this will behave as specified in the Discord documentation. .. versionadded:: 2.3.1 """ _autocomplete_callbacks: t.Dict[ str, t.Callable[ [hikari.CommandInteractionOption, hikari.AutocompleteInteraction], t.Coroutine[ t.Any, t.Any, t.Union[ _AutocompleteableOptionT, hikari.api.AutocompleteChoiceBuilder, t.Sequence[t.Union[_AutocompleteableOptionT, hikari.api.AutocompleteChoiceBuilder]], ], ], ], ] = attrs.field(factory=dict, init=False) async def __call__(self, context: context_.base.Context) -> None: await self.callback(context) @t.overload def set_error_handler(self) -> t.Callable[[ErrorHandlerCallbackT], ErrorHandlerCallbackT]: ... @t.overload def set_error_handler(self, func: ErrorHandlerCallbackT) -> ErrorHandlerCallbackT: ...
[docs] def set_error_handler( self, func: t.Optional[ErrorHandlerCallbackT] = None, ) -> t.Union[ErrorHandlerCallbackT, t.Callable[[ErrorHandlerCallbackT], ErrorHandlerCallbackT]]: """ Registers a coroutine function as an error handler for this command. This can be used as a first or second order decorator, or called manually with the function to register. Example: .. code-block:: python @lightbulb.command(...) @lightbulb.implements(...) async def foo(ctx: lightbulb.Context) -> None: ... # Valid @foo.set_error_handler async def foo_error_handler(event: lightbulb.CommandErrorEvent) -> bool: ... # Also valid @foo.set_error_handler() async def foo_error_handler(event: lightbulb.CommandErrorEvent) -> bool: ... # Also valid async def foo_error_handler(event: lightbulb.CommandErrorEvent) -> bool: ... foo.set_error_handler(foo_error_handler) """ if func is not None: self.error_handler = func return func def decorate(func_: ErrorHandlerCallbackT) -> ErrorHandlerCallbackT: self.error_handler = func_ return func_ return decorate
@t.overload def child(self) -> t.Callable[[CommandLike], CommandLike]: ... @t.overload def child(self, cmd_like: CommandLike) -> CommandLike: ...
[docs] def child( self, cmd_like: t.Optional[CommandLike] = None ) -> t.Union[CommandLike, t.Callable[[CommandLike], CommandLike]]: """ Registers a :obj:`~CommandLike` object as a child to this command. This can be used as a first or second order decorator, or called manually with the :obj:`~CommandLike` instance to add as a child. Example: .. code-block:: python @lightbulb.command(...) @lightbulb.implements(...) async def foo(ctx: lightbulb.Context) -> None: ... # Valid @foo.child @lightbulb.command(...) @lightbulb.implements(...) async def foo_child(event: lightbulb.Context) -> None: ... # Also valid @foo.child() @lightbulb.command(...) @lightbulb.implements(...) async def foo_child(event: lightbulb.Context) -> None: ... # Also valid @lightbulb.command(...) @lightbulb.implements(...) async def foo_child(event: lightbulb.Context) -> None: ... foo.child(foo_child) """ if cmd_like is not None: self.subcommands.append(cmd_like) return cmd_like def decorate(cmd_like_: CommandLike) -> CommandLike: self.subcommands.append(cmd_like_) return cmd_like_ return decorate
[docs] def autocomplete(self, opt1: str, *opts: str) -> t.Callable[[AutocompleteCallbackT], AutocompleteCallbackT]: """ Second order decorator that registers a function as an autocomplete callback for this command. The autocomplete callback **must** be an asynchronous function that takes exactly two arguments: ``option`` (an instance of :obj:`hikari.interactions.command_interactions.AutocompleteInteractionOption`) which is the option being autocompleted, and ``interaction`` (an instance of :obj:`hikari.interactions.command_interactions.AutocompleteInteraction`) which is the interaction that triggered the autocomplete. Autocomplete can only be enabled for options with type :obj:`str`, :obj:`int`, or :obj:`float`. The callback should return one of the following: a single item of the option type, a sequence of items of the option type, a single :obj:`hikari.commands.CommandChoice`, or a sequence of :obj:`hikari.commands.CommandChoice`. Args: opt1 (:obj:`str`): Option that this callback will do autocomplete for. *opts (:obj:`str`): Additional options that this callback will do autocomplete for. Example: .. code-block:: python @lightbulb.option("foo", "bar", autocomplete=True) @lightbulb.command(...) @lightbulb.implements(lightbulb.SlashCommand) async def foo(ctx: lightbulb.Context) -> None ... @foo.autocomplete("foo") # Name of the option you want to autocomplete async def foo_autocomplete( opt: hikari.AutocompleteInteractionOption, inter: hikari.AutocompleteInteraction ) -> Union[str, Sequence[str], hikari.api.AutocompleteChoiceBuilder, Sequence[hikari.api.AutocompleteChoiceBuilder]]: ... """ # noqa: E501 def decorate(func: AutocompleteCallbackT) -> AutocompleteCallbackT: for opt in [opt1, *opts]: self._autocomplete_callbacks[opt] = func return func return decorate
[docs] class SubCommandTrait(abc.ABC): """ Trait that all subcommands and subgroups have. You can check if any given command is a subcommand by checking ``issubclass`` on the command's class or ``isinstance`` if you have the object. """ __slots__ = ()
[docs] class Command(abc.ABC): """ Abstract base class for all command types. Args: app (:obj:`~.app.BotApp`): The ``BotApp`` instance that the command is registered to. initialiser (:obj:`~CommandLike`): The ``CommandLike`` object to create the command from. """ __slots__ = ( "_initialiser", "app", "parent", "_plugin", "_max_concurrency_semaphores", ) def __init__(self, app: app_.BotApp, initialiser: CommandLike) -> None: self._initialiser = initialiser self._plugin: t.Optional[plugins.Plugin] = None self.app: app_.BotApp = app """The ``BotApp`` instance the command is registered to.""" self.parent: t.Optional[Command] = None """The parent for the command.""" self._max_concurrency_semaphores: t.Dict[t.Hashable, asyncio.Semaphore] = {} @property def _help_getter(self) -> t.Optional[t.Callable[[Command, context_.base.Context], str]]: return self._initialiser.help_getter @property def callback(self) -> CommandCallbackT: """The callback function for the command.""" return self._initialiser.callback @property def name(self) -> str: """The name of the command.""" return self._initialiser.name @property def description(self) -> str: """The description of the command.""" return self._initialiser.description @property def options(self) -> t.MutableMapping[str, OptionLike]: """The options for the command.""" return self._initialiser.options @property def checks(self) -> t.Sequence[t.Union[checks.Check, checks._ExclusiveCheck]]: """The checks for the command.""" return self._initialiser.checks @property def aliases(self) -> t.Sequence[str]: """The aliases for the command. This value means nothing for application commands.""" return self._initialiser.aliases @property def error_handler( self, ) -> t.Optional[t.Callable[[events.CommandErrorEvent], t.Coroutine[t.Any, t.Any, t.Optional[bool]]]]: """The error handler function for the command.""" return self._initialiser.error_handler @property def parser(self) -> t.Optional[t.Type[parser_.BaseParser]]: """The argument parser to use for prefix commands.""" return self._initialiser.parser @property def cooldown_manager(self) -> t.Optional[cooldowns.CooldownManager]: """The cooldown manager instance to use for the command.""" return self._initialiser.cooldown_manager @property def auto_defer(self) -> bool: """Whether to automatically defer the response when the command is invoked.""" return self._initialiser.auto_defer @property def default_ephemeral(self) -> bool: """Whether to send responses from this command as ephemeral messages by default.""" return self._initialiser.ephemeral @property def check_exempt(self) -> t.Callable[[context_.base.Context], t.Union[bool, t.Coroutine[t.Any, t.Any, bool]]]: """Check exempt predicate to use for the command.""" return self._initialiser.check_exempt or (lambda _: False) @property def hidden(self) -> bool: """Whether the command should be hidden from the help command.""" return self._initialiser.hidden @property def inherit_checks(self) -> bool: """Whether the command should inherit checks from the parent group.""" return self._initialiser.inherit_checks @property def pass_options(self) -> bool: """Whether the command will have its options passed as keyword arguments when invoked.""" return self._initialiser.pass_options @property def max_concurrency(self) -> t.Optional[t.Tuple[int, t.Type[buckets.Bucket]]]: """The max concurrency rule for the command.""" return self._initialiser.max_concurrency @property def app_command_default_member_permissions(self) -> t.Optional[hikari.Permissions]: """The default member permissions for this command, as an application command.""" return self._initialiser.app_command_default_member_permissions @property def app_command_dm_enabled(self) -> bool: """Whether this command, as an application command, is enabled in DMs.""" return self._initialiser.app_command_dm_enabled @property def app_command_bypass_author_permission_checks(self) -> bool: """Whether invocations of this command as an application command will bypass author permission checks.""" return self._initialiser.app_command_bypass_author_permission_checks @property def name_localizations(self) -> t.Mapping[t.Union[hikari.Locale, str], str]: """ A mapping of locale to name localizations for this command .. versionadded:: 2.3.0 """ return self._initialiser.name_localizations @property def description_localizations(self) -> t.Mapping[t.Union[hikari.Locale, str], str]: """ A mapping of locale to description localizations for this command .. versionadded:: 2.3.0 """ return self._initialiser.description_localizations @property def nsfw(self) -> bool: """ Whether the command should only be enabled in NSFW channels. For prefix commands, this will add an NSFW-channel only check to the command automatically. For slash commands, this will behave as specified in the Discord documentation. .. versionadded:: 2.3.1 """ return self._initialiser.nsfw def __hash__(self) -> int: return hash(self.name) async def __call__(self, context: context_.base.Context, **kwargs: t.Any) -> None: if self.pass_options: for opt in context.raw_options: kwargs.setdefault(opt, context.raw_options[opt]) return await self.callback(context, **kwargs) def _validate_attributes(self) -> None: pass def _set_plugin(self, pl: plugins.Plugin) -> None: self._plugin = pl @property def plugin(self) -> t.Optional[plugins.Plugin]: """The plugin that the command belongs to.""" return self._plugin @plugin.setter def plugin(self, pl: plugins.Plugin) -> None: self._set_plugin(pl) @property def bot(self) -> app_.BotApp: """Alias for :obj:`~Context.app`""" return self.app
[docs] def get_help(self, context: context_.base.Context) -> str: """ Get the help text for the command under the given context. This method calls the help getter provided by the :obj:`~.decorators.set_help` decorator. An empty string will be returned if no help getter function was set. Args: context (:obj:`~.context.base.Context`): Context to get the help text under. Returns: :obj:`str`: Command's help text. """ if self._help_getter is None: return "" return self._help_getter(self, context)
@property def is_subcommand(self) -> bool: """Boolean representing whether this object is a subcommand.""" return isinstance(self, SubCommandTrait) @property def qualname(self) -> str: """The qualified name for the command.""" return self.name @property @abc.abstractmethod def signature(self) -> str: """The command's text signature.""" ... async def _evaluate_max_concurrency(self, context: context_.base.Context) -> None: if self.max_concurrency is None: return bucket_hash = self.max_concurrency[1].extract_hash(context) if bucket_hash not in self._max_concurrency_semaphores: self._max_concurrency_semaphores[bucket_hash] = asyncio.Semaphore(self.max_concurrency[0]) if self._max_concurrency_semaphores[bucket_hash].locked(): assert context.invoked is not None raise errors.MaxConcurrencyLimitReached( f"Maximum concurrency limit for command '{context.invoked.qualname}' exceeded", bucket=self.max_concurrency[1], ) await self._max_concurrency_semaphores[bucket_hash].acquire() def _release_max_concurrency(self, context: context_.base.Context) -> None: if self.max_concurrency is None: return if sem := self._max_concurrency_semaphores.get(self.max_concurrency[1].extract_hash(context)): sem.release()
[docs] async def invoke(self, context: context_.base.Context, **kwargs: t.Any) -> None: """ Invokes the command under the given context. All checks, cooldowns and concurrency limits will be processed prior to invocation. """ context._invoked = self await self._evaluate_max_concurrency(context) try: await self.evaluate_checks(context) await self.evaluate_cooldowns(context) await self(context, **kwargs) except Exception: raise finally: self._release_max_concurrency(context)
[docs] async def evaluate_checks(self, context: context_.base.Context) -> bool: """ Evaluate the command's checks under the given context. This method will either return ``True`` if all the checks passed, or it will raise :obj:`~.errors.CheckFailure`. """ exempt = self.check_exempt(context) if inspect.iscoroutine(exempt): exempt = await exempt if exempt: return True parent_checks = self.parent.checks if self.inherit_checks and self.parent is not None else [] failed_checks: t.List[errors.CheckFailure] = [] for check in [*self.app._checks, *getattr(self.plugin, "_checks", []), *self.checks, *parent_checks]: try: result = check(context) if inspect.iscoroutine(result): result = await result if not result: failed_checks.append(errors.CheckFailure(f"Check {check.__name__} failed for command {self.name}")) except Exception as ex: if not isinstance(ex, errors.CheckFailure): error = errors.CheckFailure(str(ex)) error.__cause__ = ex else: error = ex failed_checks.append(error) if len(failed_checks) > 1: raise errors.CheckFailure( "Multiple checks failed: " + ", ".join(str(ex) for ex in failed_checks), causes=failed_checks ) elif failed_checks: raise failed_checks[0] return True
[docs] async def evaluate_cooldowns(self, context: context_.base.Context) -> None: """ Evaluate the command's cooldown under the given context. This method will either return ``None`` if the command is not on cooldown or raise :obj:`.errors.CommandIsOnCooldown`. """ if self.cooldown_manager is not None: await self.cooldown_manager.add_cooldown(context)
[docs] class ApplicationCommand(Command, abc.ABC): """Abstract base class for all application command types.""" __slots__ = ("_guilds", "instances") def __init__(self, app: app_.BotApp, initialiser: CommandLike) -> None: super().__init__(app, initialiser) self._guilds = initialiser.guilds self.instances: t.Dict[t.Union[int, None], hikari.PartialCommand] = {} """Mapping of guild ID to created hikari ``PartialCommand`` objects for this command.""" @property def guilds(self) -> t.Sequence[int]: """The guilds that this command is available in.""" return self.app.default_enabled_guilds if self._guilds is hikari.UNDEFINED else self._guilds @property def signature(self) -> str: sig = self.qualname if self.options: sig += " " + " ".join( f"<{o.name}>" if o.required else f"[{o.name}={o.default}]" for o in self.options.values() ) return sig
[docs] async def create(self, guild: t.Optional[int] = None) -> hikari.PartialCommand: """ Creates the command in the guild with the given ID, or globally if no guild ID was provided. Args: guild (Optional[:obj:`int`]): ID of the guild to create the command in, or ``None`` if to create globally. Returns: :obj:`~hikari.commands.PartialCommand`: Created hikari ``Command`` object. """ assert self.app.application is not None kwargs = self.as_create_kwargs() kwargs.update({"guild": guild} if guild is not None else {}) kwargs["nsfw"] = self.nsfw if guild is None: kwargs["dm_enabled"] = self.app_command_dm_enabled if self.app_command_default_member_permissions is not None: kwargs["default_member_permissions"] = self.app_command_default_member_permissions if self.name_localizations: kwargs["name_localizations"] = self.name_localizations if self.description_localizations: kwargs["description_localizations"] = self.description_localizations cmd_type: hikari.CommandType = kwargs.pop("type") created_cmd: hikari.PartialCommand if cmd_type is hikari.CommandType.SLASH: created_cmd = await self.app.rest.create_slash_command( self.app.application, **kwargs, ) else: created_cmd = await self.app.rest.create_context_menu_command( self.app.application, type=cmd_type, **kwargs, ) self.instances[guild] = created_cmd assert isinstance(created_cmd, hikari.PartialCommand) return created_cmd
async def _auto_create(self) -> None: if self.guilds: for guild_id in self.guilds: await self.create(guild_id) else: await self.create()
[docs] async def delete(self, guild: t.Optional[int]) -> None: """ Deletes the command in the guild with the given ID, or globally if no guild ID was provided. Args: guild (Optional[:obj:`int`]): ID of the guild to delete the command in, or ``None`` if to delete globally. Returns: ``None`` """ cmd = self.instances.pop(guild, None) if cmd is None: return await cmd.delete()
async def _auto_delete(self) -> None: for cmd in self.instances.values(): await cmd.delete() self.instances.clear()
[docs] @abc.abstractmethod def as_create_kwargs(self) -> t.Dict[str, t.Any]: """ Converts this class into a dictionary of kwargs required by :obj:`~hikari.api.rest.RESTClient.create_application_command`. Returns: Dict[:obj:`str`, Any]: Kwargs required in order to create the command. """ ...