Source code for lightbulb.converters.special

# -*- coding: utf-8 -*-
# Copyright © tandemdude 2020-present
#
# This file is part of Lightbulb.
#
# Lightbulb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Lightbulb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Lightbulb. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations

__all__ = [
    "BooleanConverter",
    "UserConverter",
    "MemberConverter",
    "GuildChannelConverter",
    "TextableGuildChannelConverter",
    "GuildCategoryConverter",
    "GuildVoiceChannelConverter",
    "RoleConverter",
    "EmojiConverter",
    "GuildConverter",
    "MessageConverter",
    "InviteConverter",
    "ColourConverter",
    "ColorConverter",
    "TimestampConverter",
    "SnowflakeConverter",
]

import datetime
import re
import typing as t

import hikari

from lightbulb.converters import base
from lightbulb.utils import search

if t.TYPE_CHECKING:
    from lightbulb import context as context_

T = t.TypeVar("T")

USER_MENTION_REGEX: t.Final[re.Pattern[str]] = re.compile(r"<@!?(\d+)>")
CHANNEL_MENTION_REGEX: t.Final[re.Pattern[str]] = re.compile(r"<#(\d+)>")
ROLE_MENTION_REGEX: t.Final[re.Pattern[str]] = re.compile(r"<@&(\d+)>")
EMOJI_MENTION_REGEX: t.Final[re.Pattern[str]] = re.compile(r"<a?:\w+:(\d+)>")
TIMESTAMP_MENTION_REGEX: t.Final[re.Pattern[str]] = re.compile(r"<t:(\d+)(?::[tTdDfFR])?>")
CHANNEL_MESSAGE_REGEX: t.Final[re.Pattern[str]] = re.compile(r"(\d+)-(\d+)")

BOOLEAN_MAPPING: t.Dict[str, bool] = {"yes": True, "y": True, "1": True, "no": False, "n": False, "0": False}


def _resolve_id_from_arg(arg_string: str, regex: re.Pattern[str]) -> hikari.Snowflake:
    if match := regex.match(arg_string):
        arg_string = match.group(1)
    return hikari.Snowflake(arg_string)


async def _get_or_fetch_guild_channel_from_id(
    context: context_.base.Context, channel_id: hikari.Snowflake
) -> t.Optional[hikari.GuildChannel]:
    channel = context.app.cache.get_guild_channel(channel_id)
    if channel is None:
        channel = await context.app.rest.fetch_channel(channel_id)  # type: ignore
    return channel


async def _try_convert_to_guild_channel(context: context_.base.Context, arg: str) -> t.Optional[hikari.GuildChannel]:
    if context.guild_id is None:
        raise TypeError("Cannot resolve a guild channel object for a command run outside of a guild")

    channel: t.Optional[hikari.GuildChannel]
    try:
        channel_id = _resolve_id_from_arg(arg, CHANNEL_MENTION_REGEX)
    except ValueError:
        channels = context.app.cache.get_guild_channels_view_for_guild(context.guild_id)
        channel = search.get(channels.values(), name=arg)
    else:
        channel = await _get_or_fetch_guild_channel_from_id(context, channel_id)
    return channel


def _raise_if_not_none(obj: t.Optional[T]) -> T:
    if obj is None:
        raise TypeError("No object could be resolved from the argument")
    return obj


[docs] class BooleanConverter(base.BaseConverter[bool]): """Implementation of the base converter for converting arguments into a boolean.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> bool: try: return BOOLEAN_MAPPING[arg] except KeyError: raise TypeError("Invalid input for boolean type. Valid inputs are: 'yes', 'y', '1', 'no', 'n', '0'")
[docs] class UserConverter(base.BaseConverter[hikari.User]): """Implementation of the base converter for converting arguments into a User object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.User: try: user_id = _resolve_id_from_arg(arg, USER_MENTION_REGEX) except ValueError: users = self.context.app.cache.get_users_view() user = search.find(users.values(), lambda u: u.username == arg or f"{u.username}#{u.discriminator}" == arg) else: user = self.context.app.cache.get_user(user_id) if user is None: user = await self.context.app.rest.fetch_user(user_id) return _raise_if_not_none(user)
[docs] class MemberConverter(base.BaseConverter[hikari.Member]): """Implementation of the base converter for converting arguments into a Member object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.Member: if self.context.guild_id is None: raise TypeError("Cannot resolve a member object for a command run outside of a guild") try: user_id = _resolve_id_from_arg(arg, USER_MENTION_REGEX) except ValueError: members = self.context.app.cache.get_members_view_for_guild(self.context.guild_id) member = search.find( members.values(), lambda m: m.username == arg or m.nickname == arg or f"{m.username}#{m.discriminator}" == arg, ) else: member = self.context.app.cache.get_member(self.context.guild_id, user_id) if member is None: member = await self.context.app.rest.fetch_member(self.context.guild_id, user_id) return _raise_if_not_none(member)
[docs] class GuildChannelConverter(base.BaseConverter[hikari.GuildChannel]): """Implementation of the base converter for converting arguments into a GuildChannel object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.GuildChannel: channel = await _try_convert_to_guild_channel(self.context, arg) if not isinstance(channel, hikari.GuildChannel): raise TypeError("No object could be resolved from the argument") return channel
[docs] class TextableGuildChannelConverter(base.BaseConverter[hikari.TextableGuildChannel]): """Implementation of the base converter for converting arguments into a TextableGuildChannel object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.TextableGuildChannel: channel = await _try_convert_to_guild_channel(self.context, arg) if not isinstance(channel, hikari.TextableGuildChannel): raise TypeError("No object could be resolved from the argument") return channel
[docs] class GuildCategoryConverter(base.BaseConverter[hikari.GuildCategory]): """Implementation of the base converter for converting arguments into a GuildCategory object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.GuildCategory: channel = await _try_convert_to_guild_channel(self.context, arg) if not isinstance(channel, hikari.GuildCategory): raise TypeError("No object could be resolved from the argument") return channel
[docs] class GuildVoiceChannelConverter(base.BaseConverter[hikari.GuildVoiceChannel]): """Implementation of the base converter for converting arguments into a GuildVoiceChannel object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.GuildVoiceChannel: channel = await _try_convert_to_guild_channel(self.context, arg) if not isinstance(channel, hikari.GuildVoiceChannel): raise TypeError("No object could be resolved from the argument") return channel
[docs] class RoleConverter(base.BaseConverter[hikari.Role]): """Implementation of the base converter for converting arguments into a Role object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.Role: if self.context.guild_id is None: raise TypeError("Cannot resolve a role object for a command run outside of a guild") try: role_id = _resolve_id_from_arg(arg, ROLE_MENTION_REGEX) except ValueError: roles = self.context.app.cache.get_roles_view_for_guild(self.context.guild_id) role = search.get(roles.values(), name=arg) else: role = self.context.app.cache.get_role(role_id) if role is None: fetched_roles = await self.context.app.rest.fetch_roles(self.context.guild_id) role = {r.id: r for r in fetched_roles}.get(role_id) return _raise_if_not_none(role)
[docs] class EmojiConverter(base.BaseConverter[hikari.Emoji]): """Implementation of the base converter for converting arguments into an Emoji object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.Emoji: return hikari.Emoji.parse(arg)
[docs] class GuildConverter(base.BaseConverter[hikari.GuildPreview]): """Implementation of the base converter for converting arguments into a GuildPreview object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.GuildPreview: if self.context.guild_id is None: raise TypeError("Cannot resolve a guild object for a command run outside of a guild") guild_preview = None if arg.isdigit(): guild_id = int(arg) guild_preview = await self.context.app.rest.fetch_guild_preview(guild_id) else: guilds = self.context.app.cache.get_available_guilds_view() guild = search.get(guilds.values(), name=arg) if guild is not None: guild_preview = await self.context.app.rest.fetch_guild_preview(guild.id) return _raise_if_not_none(guild_preview)
[docs] class MessageConverter(base.BaseConverter[hikari.Message]): """Implementation of the base converter for converting arguments into a Message object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.Message: if arg.isdigit(): m_id, c_id = int(arg), int(self.context.channel_id) elif match := CHANNEL_MESSAGE_REGEX.match(arg): m_id, c_id = int(match.group(2)), int(match.group(1)) else: parts = arg.rstrip("/").split("/") m_id, c_id = int(parts[-1]), int(parts[-2]) if c_id != self.context.channel_id: raise ValueError("Cannot convert to message for a different channel than the command was invoked in") return await self.context.app.rest.fetch_message(c_id, m_id)
[docs] class InviteConverter(base.BaseConverter[hikari.Invite]): """Implementation of the base converter for converting arguments into an Invite object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.Invite: inv_code = arg.rstrip("/").split("/")[-1] invite: t.Optional[hikari.Invite] = self.context.app.cache.get_invite(inv_code) if invite is None: invite = await self.context.app.rest.fetch_invite(inv_code) return _raise_if_not_none(invite)
[docs] class ColourConverter(base.BaseConverter[hikari.Colour]): """Implementation of the base converter for converting arguments into a Colour object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.Colour: return hikari.Colour.of(arg)
ColorConverter = ColourConverter """Alias for :obj:`~ColorConverter`."""
[docs] class TimestampConverter(base.BaseConverter[datetime.datetime]): """Implementation of the base converter for converting arguments into a datetime object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> datetime.datetime: timestamp: t.Optional[str] = None if match := TIMESTAMP_MENTION_REGEX.match(arg): timestamp = match.group(1) if timestamp is None: raise TypeError("Could not resolve timestamp") return datetime.datetime.fromtimestamp(int(timestamp), datetime.timezone.utc)
[docs] class SnowflakeConverter(base.BaseConverter[hikari.Snowflake]): """Implementation of the base converter for converting arguments into a Snowflake object.""" __slots__ = ()
[docs] async def convert(self, arg: str) -> hikari.Snowflake: try: snowflake = hikari.Snowflake(arg.strip("<#!&>")) except ValueError: raise TypeError("Could not resolve snowflake") return snowflake