# -*- coding: utf-8 -*-
# Copyright © tandemdude 2020-present
#
# This file is part of Lightbulb.
#
# Lightbulb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Lightbulb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Lightbulb. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
__all__ = ["BaseParser", "Parser"]
import abc
import inspect
import logging
import typing as t
import hikari
from lightbulb import commands
from lightbulb import context as context_
from lightbulb import errors
from lightbulb.commands.base import OptionLike
from lightbulb.commands.base import OptionModifier
from lightbulb.converters import CONVERTER_TYPE_MAPPING
from lightbulb.converters import BaseConverter
T = t.TypeVar("T")
_quotes = {
'"': '"',
"'": "'",
"‘": "’",
"‚": "‛",
"“": "”",
"„": "‟",
"⹂": "⹂",
"「": "」",
"『": "』",
"〝": "〞",
"﹁": "﹂",
"﹃": "﹄",
""": """,
"「": "」",
"«": "»",
"‹": "›",
"《": "》",
"〈": "〉",
}
_LOGGER = logging.getLogger("lightbulb.utils.parser")
[docs]
class BaseParser(abc.ABC):
__slots__ = ("ctx", "_buffer", "len", "options")
len: int
ctx: context_.base.Context
_buffer: str
options: t.List[OptionLike]
def __init__(
self,
context: context_.base.Context,
buffer: t.Optional[str] = None,
options: t.Optional[t.List[OptionLike]] = None,
) -> None:
self.ctx = context
if buffer is None:
if not isinstance(self.ctx, context_.prefix.PrefixContext):
raise RuntimeError("Please provide the buffer to parse") # todo: proper error
message = self.ctx.event.message
assert message.content is not None
buffer = message.content[len(self.ctx.prefix) + len(self.ctx.invoked_with) :]
self.buffer = buffer
if options is not None:
self.options = options
else:
self.options = list(self.ctx.command.options.values()) if self.ctx.command else []
@property
def buffer(self) -> str:
return self._buffer
@buffer.setter
def buffer(self, val: str) -> None:
self._buffer = val
self.len = len(val)
@abc.abstractmethod
async def parse(self) -> t.Dict[str, t.Any]:
...
[docs]
class Parser(BaseParser):
__slots__ = ("_idx", "prev")
def __init__(
self,
context: context_.base.Context,
buffer: t.Optional[str] = None,
options: t.Optional[t.List[OptionLike]] = None,
) -> None:
self._idx = 0
self.prev = 0
super().__init__(context, buffer, options)
@property
def is_eof(self) -> bool:
return self.idx >= self.len
@property
def idx(self) -> int:
return self._idx
@idx.setter
def idx(self, val: int) -> None:
self.prev = self._idx
self._idx = val
def undo(self) -> None:
self.idx = self.prev
return None
def skip_ws(self) -> None:
prev = self.idx
if (char := self.get_current()) is not None and not char.isspace():
return None
while (char := self.get_char()) is not None and char.isspace():
pass
self.prev = prev
def get_char(self) -> t.Optional[str]:
self.idx += 1
return self.get_current()
def get_current(self) -> t.Optional[str]:
return None if self.is_eof else self.buffer[self.idx]
def get_previous(self) -> t.Optional[str]:
return None if self.idx == 0 else self.buffer[self.idx - 1]
[docs]
def get_word(self) -> str:
"""Gets the next word, will return an empty string if EOF."""
self.skip_ws()
prev = self.idx
while (char := self.get_char()) is not None and not char.isspace():
pass
self.prev = prev
return self.buffer[prev : self.idx]
def get_quoted_word(self) -> str:
self.skip_ws()
prev = self.idx
if (closing := _quotes.get(t.cast(str, self.get_current()))) is None:
return self.get_word()
while (char := self.get_char()) is not None:
if char == closing and self.get_previous() != "\\":
break
else:
# EOF
raise RuntimeError("expected a closing quote") # TODO: raise proper error
if (current := self.get_char()) is not None and not current.isspace():
raise RuntimeError("expected a space after the closing quote") # TODO: raise proper error
self.prev = prev
return self.buffer[prev + 1 : self.idx - 1].replace(f"\\{closing}", closing)
def read_rest(self) -> str:
self.skip_ws()
self.idx = self.len
return self.buffer[self.prev :]
def get_option(self) -> t.Optional[commands.base.OptionLike]:
if self.options:
return self.options.pop(0)
return None
async def parse(self) -> t.Dict[str, t.Any]:
ret = {}
attachments = list(self.ctx.attachments) if isinstance(self.ctx, context_.prefix.PrefixContext) else []
while option := self.get_option():
if option.arg_type in (hikari.OptionType.ATTACHMENT, hikari.Attachment):
if not attachments:
if not option.required:
ret[option.name] = option.default
continue
raise errors.MissingRequiredAttachmentArgument(
"Command invocation expects an attachment but none were found.", missing=option
)
ret[option.name] = attachments.pop(0)
continue
_LOGGER.debug("Getting arg for %s with type %s", option.name, option.arg_type)
if not (
raw_arg := self.read_rest()
if option.modifier is OptionModifier.CONSUME_REST
else self.get_quoted_word()
):
_LOGGER.debug("Arguments have exhausted")
if option.required:
raise errors.NotEnoughArguments(
"Command invocation is missing one or more required arguments.",
missing=[option, *(o for o in self.options if o.required)],
)
ret[option.name] = option.default
continue
_LOGGER.debug("Got raw arg %s", raw_arg)
convert = self._greedy_convert if option.modifier is OptionModifier.GREEDY else self._try_convert
await convert(raw_arg, option, ret)
self._validate(option, ret[option.name])
return ret
async def _try_convert(self, raw: str, option: commands.base.OptionLike, out: t.Dict[str, t.Any]) -> None:
try:
arg = await self._convert(raw, option.arg_type)
except Exception as e:
_LOGGER.debug("Failed to convert", exc_info=e)
if option.required:
raise errors.ConverterFailure(
f"Conversion failed for option {option.name!r}", opt=option, raw=raw
) from e
out[option.name] = option.default
_LOGGER.debug("Option has a default value, shifting to the next parameter")
self.undo()
else:
_LOGGER.debug("Successfully converted %s to %s", raw, arg)
out[option.name] = arg
async def _greedy_convert(self, raw: str, option: commands.base.OptionLike, out: t.Dict[str, t.Any]) -> None:
out[option.name] = args = []
_LOGGER.debug("Attempting to greedy convert %s to %s", raw, option.arg_type)
while raw:
try:
arg = await self._convert(raw, option.arg_type)
except Exception as e:
_LOGGER.debug("Done greedy converting", exc_info=e)
self.undo()
break
else:
_LOGGER.debug("Appending %s", arg)
args.append(arg)
raw = self.get_quoted_word()
async def _convert(
self,
value: str,
callback_or_type: t.Union[
t.Callable[[str], t.Union[T, t.Coroutine[t.Any, t.Any, T]]], t.Type[BaseConverter[T]]
],
) -> T:
callback_or_type = CONVERTER_TYPE_MAPPING.get(callback_or_type, callback_or_type)
_LOGGER.debug("Attempting to convert %s to %s", value, callback_or_type)
conversion_func = callback_or_type
if inspect.isclass(callback_or_type) and issubclass(callback_or_type, BaseConverter):
conversion_func = callback_or_type(self.ctx).convert
converted = conversion_func(value) # type: ignore
if inspect.iscoroutine(converted):
assert isinstance(converted, t.Awaitable)
converted = await converted
converted = t.cast(T, converted)
return converted
def _validate(self, option: commands.base.OptionLike, arg: t.Any) -> None:
if option.max_length and len(arg) > option.max_length:
raise errors.InvalidArgument("Value too long", opt=option, value=arg)
if option.min_length and len(arg) < option.min_length:
raise errors.InvalidArgument("Value too short", opt=option, value=arg)
if option.min_value and len(arg) < option.min_value:
raise errors.InvalidArgument("Value too small", opt=option, value=arg)
if option.max_value and len(arg) > option.max_value:
raise errors.InvalidArgument("Value too big", opt=option, value=arg)
if option.choices and arg not in option.choices:
raise errors.InvalidArgument("Value not in available choices", opt=option, value=arg)
if option.channel_types and isinstance(arg, hikari.PartialChannel):
if arg.type not in option.channel_types:
raise errors.InvalidArgument("Invalid channel type", opt=option, value=arg)