# -*- coding: utf-8 -*-
# Copyright © tandemdude 2020-present
#
# This file is part of Lightbulb.
#
# Lightbulb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Lightbulb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Lightbulb. If not, see <https://www.gnu.org/licenses/>.
"""
An implementation of simple repeating asyncio tasks.
.. versionadded:: 2.2.0
Setup
-----
In order for tasks to function correctly you must first call :meth:`load` after initialising
your instance of the :obj:`~lightbulb.app.BotApp` class. See below:
.. code-block:: python
import lightbulb
from lightbulb.ext import tasks
app = lightbulb.BotApp(...)
tasks.load(app)
Creating Tasks
--------------
Tasks are created using the :obj:`~task` decorator.
.. code-block:: python
from lightbulb.ext import tasks
# s=30 means this will run every 30 seconds
# you can also use (m)inutes, (h)ours, and (d)ays
@tasks.task(s=30)
async def print_every_30_seconds():
print("Task called")
# Instead of having to call .start() manually, you can pass auto_start=True
# into the task decorator if you wish
print_every_30_seconds.start()
See the :obj:`~task` decorator api reference for valid kwargs you can pass.
API Reference
-------------
"""
from __future__ import annotations
__all__ = ["load", "task", "wait_until_started", "Task", "Trigger", "UniformTrigger", "CronTrigger", "triggers"]
import asyncio
import functools
import inspect
import logging
import typing as t
import hikari
from . import triggers
from .triggers import *
if t.TYPE_CHECKING:
import lightbulb
_LOGGER = logging.getLogger("lightbulb.ext.tasks")
TaskCallbackT = t.TypeVar("TaskCallbackT", bound=t.Callable[..., t.Union[t.Any, t.Coroutine[t.Any, t.Any, t.Any]]])
TaskErrorHandlerT = t.TypeVar(
"TaskErrorHandlerT", bound=t.Callable[..., t.Union[bool, t.Coroutine[t.Any, t.Any, bool]]]
)
class _BindableObjectWithCallback:
__slots__ = ("_callback", "_bound")
def __init__(self, callback: t.Callable[..., t.Any]) -> None:
self._callback = callback
self._bound: bool = False
def __get__(self, instance: t.Any, _: t.Type[t.Any]) -> _BindableObjectWithCallback:
if self._bound or instance is None:
return self
self._callback = functools.partial(self._callback, instance)
return self
def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Any:
return self._callback(*args, **kwargs)
[docs]
class Task(_BindableObjectWithCallback):
"""
Class representing a repeating task.
Args:
callback: The function that will be executed every time the task is run.
trigger (:obj:`.triggers.Trigger`): The task trigger to use.
auto_start (:obj:`bool`): Whether the task will be automatically started when instantiated.
max_consecutive_failures (:obj:`int`): The number of consecutive task failures that will be ignored
before the task is cancelled.
max_executions (Optional[:obj:`int`]): The maximum number of times that the task will be executed. If ``None``,
the task will run indefinitely.
pass_app (:obj:`bool`): Whether the :obj:`lightbulb.app.BotApp` object will be passed into the task upon
execution.
"""
__slots__ = (
"_callback",
"_trigger",
"_stopped",
"_task",
"_error_handler",
"_consecutive_failures",
"_max_consecutive_failures",
"_max_executions",
"_n_executions",
"_pass_app",
"_wait_before_execution",
)
_app: t.Optional[lightbulb.BotApp] = None
_app_starting: asyncio.Event = asyncio.Event()
_app_started: asyncio.Event = asyncio.Event()
_tasks: t.List[Task] = []
def __init__(
self,
callback: TaskCallbackT,
trigger: triggers.Trigger,
auto_start: bool,
max_consecutive_failures: int,
max_executions: t.Optional[int],
pass_app: bool,
wait_before_execution: hikari.UndefinedOr[bool],
) -> None:
super().__init__(callback)
self._trigger: triggers.Trigger = trigger
self._stopped: bool = False
self._task: t.Optional[asyncio.Task[t.Any]] = None
self._error_handler: t.Optional[
t.Callable[[BaseException], t.Union[t.Any, t.Coroutine[t.Any, t.Any, t.Any]]]
] = None
self._max_consecutive_failures: int = max_consecutive_failures
self._consecutive_failures: int = 0
self._max_executions: t.Optional[int] = max_executions
self._n_executions: int = 0
self._pass_app: bool = pass_app
self._wait_before_execution: bool = (
wait_before_execution if wait_before_execution is not hikari.UNDEFINED else trigger.wait_before_execution
)
if auto_start:
self.start()
def __get__(self, instance: t.Any, owner: t.Type[t.Any]) -> Task:
# We override this to keep linters happy
super().__get__(instance, owner)
return self
@property
def __name__(self) -> str:
return getattr(
self._callback.func if isinstance(self._callback, functools.partial) else self._callback,
"__name__",
"__unknown_name__",
)
@property
def _is_event_loop_running(self) -> bool:
try:
asyncio.get_running_loop()
except RuntimeError:
return False
return True
@classmethod
async def _app_starting_listener(cls, _: hikari.StartingEvent) -> None:
assert Task._app is not None
Task._app_starting.set()
for task_ in Task._tasks:
if not task_.is_running:
task_.start()
@classmethod
async def _app_started_listener(cls, _: hikari.StartedEvent) -> None:
assert Task._app is not None
Task._app_started.set()
@classmethod
async def _app_stopping_listener(cls, _: hikari.StoppingEvent) -> None:
assert Task._app is not None
Task._app_starting.clear()
Task._app_started.clear()
for task_ in Task._tasks:
task_.stop()
async def _loop(self) -> None:
if self._wait_before_execution:
await asyncio.sleep(self._trigger.get_interval())
while not self._stopped:
if self._max_executions is not None and self._n_executions >= self._max_executions:
break
_LOGGER.debug("Running task %r", self.__name__)
self._n_executions += 1
try:
maybe_coro = self._callback(*([Task._app] if self._pass_app else []))
if inspect.iscoroutine(maybe_coro):
await maybe_coro
self._consecutive_failures = 0
except Exception as e:
out: t.Any = False
if self._error_handler is not None:
out = self._error_handler(e)
if inspect.iscoroutine(out):
out = await out
if not out:
self._consecutive_failures += 1
if self._consecutive_failures >= self._max_consecutive_failures:
_LOGGER.error(
"Task failed repeatedly and has been cancelled", exc_info=(type(e), e, e.__traceback__)
)
break
_LOGGER.error(
"Error occurred during task execution and was not handled. "
"Task will be cancelled after %s more consecutive failure(s)",
self._max_consecutive_failures - self._consecutive_failures,
exc_info=(type(e), e, e.__traceback__),
)
await asyncio.sleep(self._trigger.get_interval())
assert self._task is not None
self._task.cancel()
Task._tasks.remove(self)
self._task = None
self._stopped = True
self._n_executions = 0
_LOGGER.debug("Stopped task %r", self.__name__)
@property
def n_executions(self) -> int:
"""The number of times the task has been executed since being started."""
return self._n_executions
@property
def is_running(self) -> bool:
"""Whether the task represented by this object is currently running or not."""
return self._task is not None and not self._task.done()
@t.overload
def set_error_handler(self, func: TaskErrorHandlerT) -> TaskErrorHandlerT:
...
@t.overload
def set_error_handler(self) -> t.Callable[[TaskErrorHandlerT], TaskErrorHandlerT]:
...
[docs]
def set_error_handler(
self, func: t.Optional[TaskErrorHandlerT] = None
) -> t.Union[TaskErrorHandlerT, t.Callable[[TaskErrorHandlerT], TaskErrorHandlerT]]:
"""
Sets the function to use as the error handler for this task. This can be used as a first
or second order decorator, or called with the function to set as the error handler.
The error handler should return a boolean indicating whether the error could be handled.
If ``False`` or a falsy value is returned, the related execution of the task will be considered
to be a failure.
"""
if func is not None:
if isinstance(func, _BindableObjectWithCallback):
return self._error_handler # type: ignore[unreachable]
self._error_handler = _BindableObjectWithCallback(func)
return self._error_handler
def decorate(func_: TaskErrorHandlerT) -> TaskErrorHandlerT:
if isinstance(func_, _BindableObjectWithCallback):
return func_ # type: ignore[unreachable]
self._error_handler = _BindableObjectWithCallback(func_)
return self._error_handler
return decorate
[docs]
def start(self) -> None:
"""
Start the task if the event loop has been established, or schedule the task to be started
once the event loop has been established.
Returns:
``None``
"""
Task._tasks.append(self)
if self._is_event_loop_running:
_LOGGER.debug("Starting task %r", self.__name__)
self._task = asyncio.create_task(self._loop())
[docs]
def stop(self) -> None:
"""
Stop the task after the completion of the current iteration, if it was running.
Returns:
``None``
"""
if self.is_running:
self._stopped = True
[docs]
def cancel(self) -> None:
"""
Cancel the task if it was running.
Returns:
``None``
"""
if not self.is_running:
return
assert self._task is not None
self._task.cancel()
self._task = None
self._stopped = True
self._n_executions = 0
[docs]
async def wait_until_started() -> None:
"""
Wait until the bot has started. Note that :obj:`.load` **must** have been
called in order for this to function.
Roughly equivalent to:
.. code-block:: python
# Where app is an instance of 'lightbulb.BotApp'
await app.wait_for(hikari.StartedEvent, timeout=None)
Returns:
``None``
"""
await Task._app_started.wait()
@t.overload
def task(
*,
s: float = 0,
m: float = 0,
h: float = 0,
d: float = 0,
auto_start: bool = False,
max_consecutive_failures: int = 3,
max_executions: t.Optional[int] = None,
pass_app: bool = False,
wait_before_execution: hikari.UndefinedOr[bool] = hikari.UNDEFINED,
cls: t.Type[Task] = Task,
) -> t.Callable[[TaskCallbackT], Task]:
...
@t.overload
def task(
trigger: triggers.Trigger,
/,
*,
auto_start: bool = False,
max_consecutive_failures: int = 3,
max_executions: t.Optional[int] = None,
pass_app: bool = False,
wait_before_execution: hikari.UndefinedOr[bool] = hikari.UNDEFINED,
cls: t.Type[Task] = Task,
) -> t.Callable[[TaskCallbackT], Task]:
...
@t.overload
def task(
trigger: t.Type[triggers.UniformTrigger],
/,
*,
s: float = 0,
m: float = 0,
h: float = 0,
d: float = 0,
auto_start: bool = False,
max_consecutive_failures: int = 3,
max_executions: t.Optional[int] = None,
pass_app: bool = False,
wait_before_execution: hikari.UndefinedOr[bool] = hikari.UNDEFINED,
cls: t.Type[Task] = Task,
) -> t.Callable[[TaskCallbackT], Task]:
...
[docs]
def task(
trigger: t.Optional[t.Union[triggers.Trigger, t.Type[triggers.Trigger]]] = None,
/,
*,
s: float = 0,
m: float = 0,
h: float = 0,
d: float = 0,
auto_start: bool = False,
max_consecutive_failures: int = 3,
max_executions: t.Optional[int] = None,
pass_app: bool = False,
wait_before_execution: hikari.UndefinedOr[bool] = hikari.UNDEFINED,
cls: t.Type[Task] = Task,
) -> t.Callable[[TaskCallbackT], Task]:
"""
Second order decorator to register a function as a repeating task. The decorated function can
be a synchronous or asynchronous function, and any return value will be discarded.
Args:
trigger (Optional[:obj:`~.triggers.Trigger`]): Trigger to use to set the interval between task executions.
If not provided, and interval values were provided then this defaults to :obj:`~.triggers.UniformTrigger`.
Keyword Args:
s (:obj:`float`): Number of seconds between task executions.
m (:obj:`float`): Number of minutes between task executions.
h (:obj:`float`): Number of hours between task executions.
d (:obj:`float`): Number of days between task executions.
auto_start (:obj:`bool`): Whether the task will be started automatically when created. If ``False``,
:meth:`~Task.start` will have to be called manually in order to start the task's execution.
max_consecutive_failures (:obj:`int`): The number of consecutive task failures that will be ignored
before the task's execution is cancelled. Defaults to ``3``, minimum ``1``.
max_executions (Optional[:obj:`int`]): The maximum number of times that the task will run. If ``None``, the
task will run indefinitely.
pass_app (:obj:`bool`): Whether the :obj:`lightbulb.app.BotApp` instance should be passed into the
task on execution. Defaults to ``False``.
wait_before_execution (UndefinedOr[:obj:`bool`]): Whether the task will wait its given interval before the first
time the task is executed. Defaults to ``UNDEFINED`` (will use the trigger's default).
cls (Type[:obj:`~Task`]): Task class to use.
.. versionadded:: 2.2.2
``wait_before_execution`` kwarg.
"""
def decorate(func: TaskCallbackT) -> Task:
nonlocal trigger
if any([s, m, h, d]):
trigger = trigger or triggers.UniformTrigger
assert inspect.isclass(trigger) and issubclass(trigger, triggers.UniformTrigger)
return cls(
func,
(trigger or triggers.UniformTrigger)(s + (m * 60) + (h * 3600) + (d * 86400)),
auto_start,
max(max_consecutive_failures, 1),
max_executions,
pass_app,
wait_before_execution,
)
if trigger is None:
raise ValueError("Interval values were not provided and no trigger was passed")
assert isinstance(trigger, triggers.Trigger)
return cls(
func, trigger, auto_start, max(max_consecutive_failures, 1), max_executions, pass_app, wait_before_execution
)
return decorate
[docs]
def load(app: lightbulb.BotApp) -> None:
"""
Add the task system to the bot, enabling tasks to be run
once the bot has started.
Args:
app (:obj:`~lightbulb.app.BotApp`): :obj:`~lightbulb.app.BotApp` instance
that tasks will be enabled for.
Returns:
``None``
"""
Task._app = app
app.subscribe(hikari.StartingEvent, Task._app_starting_listener)
app.subscribe(hikari.StartedEvent, Task._app_started_listener)
app.subscribe(hikari.StoppingEvent, Task._app_stopping_listener)