Module slack_sdk.http_retry.async_handler

asyncio compatible RetryHandler interface. You can pass an array of handlers to customize retry logics in supported API clients.

Expand source code
"""asyncio compatible RetryHandler interface.
You can pass an array of handlers to customize retry logics in supported API clients.
"""

import asyncio
from typing import Optional

from slack_sdk.http_retry.state import RetryState
from slack_sdk.http_retry.request import HttpRequest
from slack_sdk.http_retry.response import HttpResponse
from slack_sdk.http_retry.interval_calculator import RetryIntervalCalculator
from slack_sdk.http_retry.builtin_interval_calculators import (
    BackoffRetryIntervalCalculator,
)

default_interval_calculator = BackoffRetryIntervalCalculator()


class AsyncRetryHandler:
    """asyncio compatible RetryHandler interface.
    You can pass an array of handlers to customize retry logics in supported API clients.
    """

    max_retry_count: int
    interval_calculator: RetryIntervalCalculator

    def __init__(
        self,
        max_retry_count: int = 1,
        interval_calculator: RetryIntervalCalculator = default_interval_calculator,
    ):
        """RetryHandler interface.

        Args:
            max_retry_count: The maximum times to do retries
            interval_calculator: Pass an interval calculator for customizing the logic
        """
        self.max_retry_count = max_retry_count
        self.interval_calculator = interval_calculator

    async def can_retry_async(
        self,
        *,
        state: RetryState,
        request: HttpRequest,
        response: Optional[HttpResponse] = None,
        error: Optional[Exception] = None,
    ) -> bool:
        if state.current_attempt >= self.max_retry_count:
            return False
        return await self._can_retry_async(
            state=state,
            request=request,
            response=response,
            error=error,
        )

    async def _can_retry_async(
        self,
        *,
        state: RetryState,
        request: HttpRequest,
        response: Optional[HttpResponse] = None,
        error: Optional[Exception] = None,
    ) -> bool:
        raise NotImplementedError()

    async def prepare_for_next_attempt_async(
        self,
        *,
        state: RetryState,
        request: HttpRequest,
        response: Optional[HttpResponse] = None,
        error: Optional[Exception] = None,
    ) -> None:
        state.next_attempt_requested = True
        duration = self.interval_calculator.calculate_sleep_duration(
            state.current_attempt
        )
        await asyncio.sleep(duration)
        state.increment_current_attempt()

Classes

class AsyncRetryHandler (max_retry_count: int = 1, interval_calculator: RetryIntervalCalculator = <slack_sdk.http_retry.builtin_interval_calculators.BackoffRetryIntervalCalculator object>)

asyncio compatible RetryHandler interface. You can pass an array of handlers to customize retry logics in supported API clients.

RetryHandler interface.

Args

max_retry_count
The maximum times to do retries
interval_calculator
Pass an interval calculator for customizing the logic
Expand source code
class AsyncRetryHandler:
    """asyncio compatible RetryHandler interface.
    You can pass an array of handlers to customize retry logics in supported API clients.
    """

    max_retry_count: int
    interval_calculator: RetryIntervalCalculator

    def __init__(
        self,
        max_retry_count: int = 1,
        interval_calculator: RetryIntervalCalculator = default_interval_calculator,
    ):
        """RetryHandler interface.

        Args:
            max_retry_count: The maximum times to do retries
            interval_calculator: Pass an interval calculator for customizing the logic
        """
        self.max_retry_count = max_retry_count
        self.interval_calculator = interval_calculator

    async def can_retry_async(
        self,
        *,
        state: RetryState,
        request: HttpRequest,
        response: Optional[HttpResponse] = None,
        error: Optional[Exception] = None,
    ) -> bool:
        if state.current_attempt >= self.max_retry_count:
            return False
        return await self._can_retry_async(
            state=state,
            request=request,
            response=response,
            error=error,
        )

    async def _can_retry_async(
        self,
        *,
        state: RetryState,
        request: HttpRequest,
        response: Optional[HttpResponse] = None,
        error: Optional[Exception] = None,
    ) -> bool:
        raise NotImplementedError()

    async def prepare_for_next_attempt_async(
        self,
        *,
        state: RetryState,
        request: HttpRequest,
        response: Optional[HttpResponse] = None,
        error: Optional[Exception] = None,
    ) -> None:
        state.next_attempt_requested = True
        duration = self.interval_calculator.calculate_sleep_duration(
            state.current_attempt
        )
        await asyncio.sleep(duration)
        state.increment_current_attempt()

Subclasses

Class variables

var interval_calculatorRetryIntervalCalculator
var max_retry_count : int

Methods

async def can_retry_async(self, *, state: RetryState, request: HttpRequest, response: Optional[HttpResponse] = None, error: Optional[Exception] = None) ‑> bool
Expand source code
async def can_retry_async(
    self,
    *,
    state: RetryState,
    request: HttpRequest,
    response: Optional[HttpResponse] = None,
    error: Optional[Exception] = None,
) -> bool:
    if state.current_attempt >= self.max_retry_count:
        return False
    return await self._can_retry_async(
        state=state,
        request=request,
        response=response,
        error=error,
    )
async def prepare_for_next_attempt_async(self, *, state: RetryState, request: HttpRequest, response: Optional[HttpResponse] = None, error: Optional[Exception] = None) ‑> None
Expand source code
async def prepare_for_next_attempt_async(
    self,
    *,
    state: RetryState,
    request: HttpRequest,
    response: Optional[HttpResponse] = None,
    error: Optional[Exception] = None,
) -> None:
    state.next_attempt_requested = True
    duration = self.interval_calculator.calculate_sleep_duration(
        state.current_attempt
    )
    await asyncio.sleep(duration)
    state.increment_current_attempt()