Source code for lightbulb.cooldowns

# -*- coding: utf-8 -*-
# Copyright © tandemdude 2020-present
#
# This file is part of Lightbulb.
#
# Lightbulb is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Lightbulb is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Lightbulb. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations

__all__ = ["CooldownManager"]

import inspect
import time
import typing as t

from lightbulb import buckets
from lightbulb import cooldown_algorithms
from lightbulb import errors

if t.TYPE_CHECKING:
    from lightbulb.context import base as ctx_base


[docs] class CooldownManager: """ The cooldown manager for a command. """ __slots__ = ("callback", "cooldowns") def __init__( self, callback: t.Callable[[ctx_base.Context], t.Union[buckets.Bucket, t.Coroutine[t.Any, t.Any, buckets.Bucket]]], ) -> None: self.callback = callback self.cooldowns: t.MutableMapping[t.Hashable, buckets.Bucket] = {} """Mapping of a hashable to a :obj:`~.buckets.Bucket` representing the currently stored cooldowns.""" async def _get_bucket(self, context: ctx_base.Context) -> buckets.Bucket: bucket = self.callback(context) if inspect.iscoroutine(bucket): bucket = await bucket assert isinstance(bucket, buckets.Bucket) return bucket
[docs] async def add_cooldown(self, context: ctx_base.Context) -> None: """ Add a cooldown under the given context. If an expired bucket already exists then it will be overwritten. Args: context (:obj:`~.context.base.Context`): The context to add a cooldown under. Returns: ``None`` Raises: :obj:`~.errors.CommandIsOnCooldown`: The command is currently on cooldown for the given context. """ bucket = await self._get_bucket(context) cooldown_hash = bucket.extract_hash(context) cooldown_bucket = self.cooldowns.get(cooldown_hash) if cooldown_bucket is not None: cooldown_status = cooldown_bucket.acquire() if inspect.iscoroutine(cooldown_status): cooldown_status = await cooldown_status if cooldown_status is cooldown_algorithms.CooldownStatus.ACTIVE: # Cooldown has been activated assert cooldown_bucket.start_time is not None raise errors.CommandIsOnCooldown( "This command is on cooldown", retry_after=(cooldown_bucket.start_time + cooldown_bucket.length) - time.perf_counter(), ) elif cooldown_status is cooldown_algorithms.CooldownStatus.INACTIVE: # Cooldown has not yet been activated. return self.cooldowns[cooldown_hash] = bucket maybe_coro = self.cooldowns[cooldown_hash].acquire() if inspect.iscoroutine(maybe_coro): await maybe_coro
[docs] async def reset_cooldown(self, context: ctx_base.Context) -> None: """ Reset the cooldown under the given context. Args: context (:obj:`~.context.base.Context`): The context to reset the cooldown under. Returns: ``None`` """ bucket = await self._get_bucket(context) del self.cooldowns[bucket.extract_hash(context)]