# -*- coding: utf-8 -*-
# Copyright © tandemdude 2020-present
#
# This file is part of Lightbulb.
#
# Lightbulb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Lightbulb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Lightbulb. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
__all__ = [
"ReactionNavigator",
"ButtonNavigator",
"ReactionButton",
"ComponentButton",
"next_page",
"prev_page",
"first_page",
"last_page",
"stop",
]
import asyncio
import typing as t
import hikari
from hikari.api.special_endpoints import MessageActionRowBuilder
from hikari.components import ButtonStyle
from lightbulb import context as context_
T = t.TypeVar("T")
[docs]
async def next_page(nav: t.Union[ReactionNavigator[T], ButtonNavigator[T]], _: hikari.Event) -> None:
"""
:obj:`NavButton` callback to make the ReactionNavigator go to the next page.
"""
nav.current_page_index += 1
nav.current_page_index %= len(nav.pages)
[docs]
async def prev_page(nav: t.Union[ReactionNavigator[T], ButtonNavigator[T]], _: hikari.Event) -> None:
"""
:obj:`NavButton` callback to make the navigator go to the previous page.
"""
nav.current_page_index -= 1
if nav.current_page_index < 0:
nav.current_page_index = len(nav.pages) - 1
[docs]
async def first_page(nav: t.Union[ReactionNavigator[T], ButtonNavigator[T]], _: hikari.Event) -> None:
"""
:obj:`NavButton` callback to make the navigator go to the first page.
"""
nav.current_page_index = 0
[docs]
async def last_page(nav: t.Union[ReactionNavigator[T], ButtonNavigator[T]], _: hikari.Event) -> None:
"""
:obj:`NavButton` callback to make the navigator go to the last page.
"""
nav.current_page_index = len(nav.pages) - 1
[docs]
async def stop(nav: t.Union[ReactionNavigator[T], ButtonNavigator[T]], _: hikari.Event) -> None:
"""
:obj:`NavButton` callback to make the navigator stop navigation.
"""
assert nav._msg is not None
await nav._remove_listener()
await nav._msg.delete()
nav._msg = None
if nav._timeout_task is not None:
nav._timeout_task.cancel()
[docs]
class ReactionNavigator(t.Generic[T]):
"""
A reaction navigator system for navigating through a list of items that can be sent through the
``content`` argument of :obj:`hikari.Message.respond`.
Default buttons:
- ``\\N{BLACK LEFT-POINTING DOUBLE TRIANGLE WITH VERTICAL BAR}\\N{VARIATION SELECTOR-16}`` (Go to first page)
- ``\\N{BLACK LEFT-POINTING TRIANGLE}\\N{VARIATION SELECTOR-16}`` (Go to previous page)
- ``\\N{BLACK SQUARE FOR STOP}\\N{VARIATION SELECTOR-16}`` (Stop navigation)
- ``\\N{BLACK RIGHT-POINTING TRIANGLE}\\N{VARIATION SELECTOR-16}`` (Go to next page)
- ``\\N{BLACK RIGHT-POINTING DOUBLE TRIANGLE WITH VERTICAL BAR}\\N{VARIATION SELECTOR-16}`` (Go to last page)
Args:
pages (Sequence[T]): Pages to navigate through.
Keyword Args:
buttons (Optional[Sequence[:obj:`~.utils.nav.NavButton`]]): Buttons to
use the navigator with. Uses the default buttons if not specified.
timeout (:obj:`float`): The navigator timeout in seconds. After the timeout has expired, navigator reactions
will no longer work. Defaults to 120 (2 minutes).
Example:
.. code-block:: python
from lightbulb.utils import pag, nav
@bot.command()
async def foo(ctx):
paginated_help = pag.StringPaginator()
for l in thing_that_creates_a_lot_of_text.split("\\n"):
paginated_help.add_line(l)
navigator = nav.ReactionNavigator(paginated_help.build_pages())
await navigator.run(ctx)
"""
__slots__ = ("pages", "buttons", "_timeout", "current_page_index", "_context", "_msg", "_timeout_task")
def __init__(
self,
pages: t.Union[t.Iterable[T], t.Iterator[T]],
*,
buttons: t.Optional[t.Sequence[ReactionButton]] = None,
timeout: float = 120,
) -> None:
if not pages:
raise ValueError("You cannot pass fewer than 1 page to the navigator.")
self.pages: t.Sequence[T] = tuple(pages)
if buttons is not None:
if any(not isinstance(btn, ReactionButton) for btn in buttons):
raise TypeError("Buttons must be an instance of ReactionButton")
self.buttons: t.Sequence[ReactionButton]
if len(self.pages) == 1 and not buttons:
self.buttons = [ReactionButton("\N{CROSS MARK}", stop)]
else:
self.buttons = buttons if buttons is not None else self.create_default_buttons()
self._timeout: float = timeout
self.current_page_index: int = 0
self._context: t.Optional[context_.base.Context] = None
self._msg: t.Optional[hikari.Message] = None
self._timeout_task: t.Optional[asyncio.Task[None]] = None
async def _edit_msg(self, message: hikari.Message, page: T) -> hikari.Message:
return await message.edit(page)
async def _send_initial_msg(self, page: T) -> hikari.Message:
assert self._context is not None
resp = await self._context.respond(page)
return await resp.message()
def create_default_buttons(self) -> t.List[ReactionButton]:
buttons = [
ReactionButton(
"\N{BLACK LEFT-POINTING DOUBLE TRIANGLE WITH VERTICAL BAR}\N{VARIATION SELECTOR-16}", first_page
),
ReactionButton("\N{BLACK LEFT-POINTING TRIANGLE}\N{VARIATION SELECTOR-16}", prev_page),
ReactionButton("\N{BLACK SQUARE FOR STOP}\N{VARIATION SELECTOR-16}", stop),
ReactionButton("\N{BLACK RIGHT-POINTING TRIANGLE}\N{VARIATION SELECTOR-16}", next_page),
ReactionButton(
"\N{BLACK RIGHT-POINTING DOUBLE TRIANGLE WITH VERTICAL BAR}\N{VARIATION SELECTOR-16}", last_page
),
]
return buttons
async def _process_reaction_add(self, event: hikari.ReactionAddEvent) -> None:
assert self._context is not None and self._msg is not None
if event.user_id != self._context.author.id or event.message_id != self._msg.id:
return
for button in self.buttons:
if button.is_pressed(event):
await button.press(self, event)
if self._msg is not None:
await self._edit_msg(self._msg, self.pages[self.current_page_index])
try:
await self._msg.remove_reaction(button.emoji, user=self._context.author)
except hikari.ForbiddenError:
pass
break
async def _remove_listener(self) -> None:
assert self._context is not None
self._context.app.unsubscribe(hikari.ReactionAddEvent, self._process_reaction_add)
if self._msg is None:
return
try:
await self._msg.remove_all_reactions()
except (hikari.ForbiddenError, hikari.NotFoundError):
pass
async def _timeout_coro(self) -> None:
try:
await asyncio.sleep(self._timeout)
await self._remove_listener()
except asyncio.CancelledError:
pass
[docs]
async def run(self, context: context_.base.Context) -> None:
"""
Run the navigator under the given context.
Args:
context (:obj:`~.context.base.Context`): Context
to run the navigator under.
Returns:
``None``
Raises:
:obj:`hikari.MissingIntentError`: If the bot does not have the relevant reaction intent(s) for
the navigator to function.
"""
intent_to_check_for = (
hikari.Intents.GUILD_MESSAGE_REACTIONS
if context.guild_id is not None
else hikari.Intents.DM_MESSAGE_REACTIONS
)
if not (context.app.intents & intent_to_check_for) == intent_to_check_for:
raise hikari.MissingIntentError(intent_to_check_for)
self._context = context
context.app.subscribe(hikari.ReactionAddEvent, self._process_reaction_add)
self._msg = await self._send_initial_msg(self.pages[self.current_page_index])
for emoji in [button.emoji for button in self.buttons]:
await self._msg.add_reaction(emoji)
if self._timeout_task is not None:
self._timeout_task.cancel()
self._timeout_task = asyncio.create_task(self._timeout_coro())