lightbulb.commands.execution

class lightbulb.commands.execution.ExecutionHook(step: ExecutionStep, skip_when_failed: bool, func: ExecutionHookFunc)[source]

Dataclass representing a command execution hook executed before the invocation method is called.

Parameters:
  • step (ExecutionStep) – The step that this hook should be run during.

  • skip_when_failed (bool) – Whether this hook should be skipped if the pipeline has already failed.

  • func – The function that this hook executes. May either be synchronous or asynchronous, and must take (at least) two arguments - and instance of ExecutionPipeline and Context respectively.

func: ExecutionHookFunc

The function that this hook executes.

skip_when_failed: bool

Whether this hook should be skipped if the pipeline has already failed.

step: ExecutionStep

The step that this hook should be run during.

class lightbulb.commands.execution.ExecutionPipeline(context: context_.Context, order: t.Sequence[ExecutionStep])[source]

Class representing an entire command execution flow. Handles processing command hooks, including failure handling and collecting, as well as the calling of the command invocation function if all hooks succeed.

Warning

A single hook failure will not prevent future hooks from being executed. If a hook should not be executed if previous ones have failed you can set the skip_when_failed parameter to prevent this from happening.

@lightbulb.hook(lightbulb.ExecutionSteps.CHECKS, skip_when_failed=True)
async def some_hook(pl: lightbulb.ExecutionPipeline, ctx: lightbulb.Context) -> None:
    ...

Alternatively if you wish to customize the behaviour further you can add a guard clause in the hook function.

@lightbulb.hook(lightbulb.ExecutionSteps.CHECKS, skip_when_failed=True)
async def some_hook(pl: lightbulb.ExecutionPipeline, ctx: lightbulb.Context) -> None:
    # Prevent the hook from running if previous hooks (or the command invocation) failed.
    # Also see 'ExecutionPipeline.any_hook_failed' and 'ExecutionPipeline.invocation_failed' for
    # alternative behaviour.
    if pl.failed:
        return

    ...
property any_hook_failed: bool

Whether any single invocation hook threw an exception.

Note

This will be True even if the failed hook(s) were executed after the command invocation function. Use invocation_failed if you need to know if the invocation function threw an exception.

property failed: bool

Whether this pipeline has failed.

A pipeline is considered failed if any single hook execution failed, or the command invocation failed.

Note

This will be True even if the failed hook(s) were executed after the command invocation function. Use invocation_failed if you need to know if the invocation function threw an exception.

property invocation_failed: bool

Whether the command invocation function threw an exception.

class lightbulb.commands.execution.ExecutionStep(name: str)[source]

Dataclass representing an execution step processed prior to the command invocation function being called.

Parameters:

name (str) – The name of the execution step.

name: str

The name of the execution step

final class lightbulb.commands.execution.ExecutionSteps[source]

Collection of the default execution steps lightbulb implements.

CHECKS = ExecutionStep(name='CHECKS')

Step for execution of command check logic.

COOLDOWNS = ExecutionStep(name='COOLDOWNS')

Step for execution of command cooldown logic.

INVOKE = ExecutionStep(name='INVOKE')

Step for command invocation. No hooks should ever use this step.

MAX_CONCURRENCY = ExecutionStep(name='MAX_CONCURRENCY')

Step for execution of maximum command concurrency logic.

POST_INVOKE = ExecutionStep(name='POST_INVOKE')

Step for post-invocation logic.

lightbulb.commands.execution.hook(step: ExecutionStep, skip_when_failed: bool = False) t.Callable[[ExecutionHookFunc], ExecutionHook][source]

Second order decorator to convert a function into an execution hook for the given step. Also enables dependency injection on the decorated function.

The decorated function can be either synchronous or asyncronous and must take at minimum the two arguments seen below. pl is an instance of ExecutionPipeline which is used to manage the command execution flow, and ctx is an instance of Context which contains information about the current invocation.

def example_hook(pl: lightbulb.ExecutionPipeline, ctx: lightbulb.Context) -> None:
    # Hook logic
    ...
Parameters:
  • step (ExecutionStep) – The step that this hook should be run during.

  • skip_when_failed (bool) – Whether this hook should be skipped if the ExecutionPipeline has already failed due to a different hook or command invocation exception. Defaults to False.

Returns:

The created execution hook.

Return type:

ExecutionHook

Example

To implement a custom hook to block execution of a command on days other than monday.

@lightbulb.hook(lightbulb.ExecutionStep.CHECKS)
def only_on_mondays(pl: lightbulb.ExecutionPipeline, _: lightbulb.Context) -> None:
    # Check if today is Monday (0)
    if datetime.date.today().weekday() != 0:
        # Fail the pipeline execution
        raise RuntimeError("This command can only be used on mondays!")
lightbulb.commands.execution.invoke(func: t.Callable[..., t.Awaitable[t.Any]]) t.Callable[[context_.Context], t.Awaitable[t.Any]][source]

First order decorator to mark a method as the invocation method to be used for the command. Also enables dependency injection on the decorated method. The decorated method must have the first parameter (non-self) accepting an instance of Context. Remaining parameters will attempt to be dependency injected.

Parameters:

func – The method to be marked as the command invocation method.

Returns:

The decorated method with dependency injection enabled.

Example

class ExampleCommand(
    lightbulb.SlashCommand,
    name="example",
    description="example"
):
    @lightbulb.invoke
    async def invoke(self, ctx: lightbulb.Context) -> None:
        await ctx.respond("example")

Note

The command invocation function will never be called if any of the hooks for that command caused the pipeline to fail.