Skip to content

future

dandy.core.future.future

thread_pool_executor = concurrent.futures.ThreadPoolExecutor(max_workers=(settings.FUTURES_MAX_WORKERS)) module-attribute

R = TypeVar('R') module-attribute

AsyncFuture

Bases: Generic[R]

Source code in dandy/core/future/future.py
def __init__(self, callable_: Callable[..., R], *args, **kwargs):
    self._future: Future = thread_pool_executor.submit(callable_, *args, **kwargs)

    self._result: R = None
    self._result_fetched: bool = False
    self._result_timeout: float | None = None
    self._lock = threading.RLock()

result property

cancel

Source code in dandy/core/future/future.py
def cancel(self) -> bool:
    return self._future.cancel()

cancelled

Source code in dandy/core/future/future.py
def cancelled(self) -> bool:
    return self._future.cancelled()

done

Source code in dandy/core/future/future.py
def done(self) -> bool:
    return self._future.done()

get_result

Source code in dandy/core/future/future.py
def get_result(self, timeout_seconds: float | None = None) -> R:
    with self._lock:
        if self._result_fetched:
            return self._result

        try:
            self.set_timeout(timeout_seconds)

            self._result: R = self._future.result(timeout=self._result_timeout)
            self._result_fetched = True

        except concurrent.futures.TimeoutError as error:
            self.cancel()
            message = f"Future timed out after {self._result_timeout} seconds"
            raise FutureRecoverableException(message) from error

        return self._result

set_timeout

Source code in dandy/core/future/future.py
def set_timeout(self, seconds: float | None = None):
    if seconds is not None and seconds <= 0:
        message = f"Future timeout must be greater than 0.0, not {seconds}"
        raise FutureCriticalException(message)

    self._result_timeout = seconds