Skip to content

dandy

dandy

__all__ = ['Agent', 'BaseIntel', 'BaseListIntel', 'Bot', 'cache_to_memory', 'cache_to_sqlite', 'generate_cache_key', 'Map', 'MemoryCache', 'Prompt', 'recorder_to_html_file', 'recorder_to_json_file', 'recorder_to_markdown_file', 'Recorder', 'SqliteCache'] module-attribute

BaseIntel

Bases: BaseModel, ABC

Base class for all Dandy intel

check_inc_ex classmethod

Source code in dandy/intel/intel.py
@classmethod
def check_inc_ex(
        cls,
        include_dict: Dict,
        exclude_dict: Dict,
):

    field_names = set(cls.model_fields.keys())

    if include_dict:
        include_field_names = set(include_dict.keys())

        if not include_field_names.issubset(field_names):
            raise IntelCriticalException(
                f'include failed on {cls.__name__} because it does not have the following fields: {field_names.difference(include_field_names)}.'
            )

    if exclude_dict:
        exclude_field_names = set(exclude_dict.keys())

        if not exclude_field_names.issubset(field_names):
            raise IntelCriticalException(
                f'exclude failed on {cls.__name__} because it does not have the following fields: {field_names.difference(exclude_field_names)}.'
            )

model_to_kwargs

Source code in dandy/intel/intel.py
def model_to_kwargs(self) -> dict:
    return dict(self)

model_inc_ex_class_copy classmethod

Source code in dandy/intel/intel.py
@classmethod
def model_inc_ex_class_copy(
        cls,
        include: Union[IncEx, Dict, None] = None,
        exclude: Union[IncEx, Dict, None] = None,
        intel_object: Union[Self, None] = None
) -> Type[BaseIntel]:
    if include is None and exclude is None:
        return create_model(
            cls.__name__,
            __base__=cls
        )

    if include and exclude:
        raise IntelCriticalException('include and exclude cannot be used together')

    def inc_ex_dict(inc_ex: Union[IncEx, None]) -> Dict:
        if inc_ex is not None:
            return inc_ex if isinstance(inc_ex, dict) else {key: True for key in inc_ex}
        else:
            return {}

    include_dict = inc_ex_dict(include)
    exclude_dict = inc_ex_dict(exclude)

    cls.check_inc_ex(include_dict, exclude_dict)

    processed_fields = {}

    for field_name, field_info in cls.model_fields.items():

        include_value = include_dict.get(field_name)
        exclude_value = exclude_dict.get(field_name)

        if not isinstance(include_value, Dict) and not isinstance(exclude_value, Dict):
            if include is None and exclude_value and field_info.is_required():
                if intel_object is None:
                    raise IntelCriticalException(f"{field_name} is required and cannot be excluded")

                elif getattr(intel_object, field_name) is None:
                    raise IntelCriticalException(f"{field_name} is required and has no value therefore cannot be excluded")

            if exclude is None and include_value is None and field_info.is_required():
                if intel_object is None:
                    raise IntelCriticalException(f"{field_name} is required and must be included")

                elif getattr(intel_object, field_name) is None:
                    raise IntelCriticalException(f"{field_name} is required and has no value therefore it must be included")

        field_annotation = FieldAnnotation(field_info.annotation, field_name)
        field_factory = field_info.default_factory or field_info.default

        if isinstance(include_value, Dict) or isinstance(exclude_value, Dict):

            if issubclass(field_annotation.first_inner, BaseIntel):
                sub_model: Type[BaseIntel] = field_annotation.first_inner

                new_sub_model = sub_model.model_inc_ex_class_copy(
                    include=include_value,
                    exclude=exclude_value,
                )

                processed_fields[field_name] = (
                    new_sub_model if field_annotation.origin is None else field_annotation.origin[new_sub_model],
                    field_factory,
                )

            else:
                processed_fields[field_name] = (
                    field_annotation.first_inner if field_annotation.origin is None else field_annotation.origin[
                        field_annotation.first_inner],
                    field_factory,
                )

        elif (include_value and exclude is None) or (exclude_value is None and include is None):
            processed_fields[field_name] = (
                field_annotation.base,
                field_factory,
            )

    return create_model(
        cls.__name__,
        **processed_fields,
        __base__=BaseIntel
    )

model_json_inc_ex_schema classmethod

Source code in dandy/intel/intel.py
@classmethod
def model_json_inc_ex_schema(
        cls,
        include: Union[IncEx, None] = None,
        exclude: Union[IncEx, None] = None,
) -> Dict:
    return cls.model_inc_ex_class_copy(
        include=include,
        exclude=exclude,
    ).model_json_schema()

model_object_json_inc_ex_schema

Source code in dandy/intel/intel.py
def model_object_json_inc_ex_schema(
        self,
        include: Union[IncEx, None] = None,
        exclude: Union[IncEx, None] = None
) -> Dict:
    return self.model_inc_ex_class_copy(
        include=include,
        exclude=exclude,
        intel_object=self
    ).model_json_schema()

model_validate_and_copy

Copies this object with field updates from a dict and validates

Parameters:

  • update (dict) –

Returns:

Source code in dandy/intel/intel.py
def model_validate_and_copy(self, update: dict) -> Self:
    """
    Copies this object with field updates from a dict and validates

    :param update:
    :return:
    """
    return self.model_validate(
        obj=self.model_copy(update=update).model_dump(
            warnings=False
        ),
    )

model_validate_json_and_copy

Copies this object with field updates from a json str and validates

Parameters:

  • json_data (str) –

Returns:

Source code in dandy/intel/intel.py
def model_validate_json_and_copy(self, json_data: str) -> Self:
    """
    Copies this object with field updates from a json str and validates

    :param json_data:
    :return:
    """
    return self.model_validate_and_copy(update=from_json(json_data))

BaseListIntel

Bases: BaseIntel, ABC, Generic[T]

model_post_init

Source code in dandy/intel/intel.py
def model_post_init(self, __context):
    list_fields = [
        name for name, field in self.__class__.model_fields.items()
        if get_origin(field.annotation) is list
    ]

    if len(list_fields) != 1:
        raise ValueError(f'BaseListIntel sub classes can only have exactly one list field attribute and must be declared with typing')

    self._list_name = list_fields[0]

__getitem__

Source code in dandy/intel/intel.py
def __getitem__(self, index) -> List[T] | T:
    return getattr(self, self._list_name)[index]

__iter__

Source code in dandy/intel/intel.py
def __iter__(self) -> Generator[T, None, None]:
    yield from getattr(self, self._list_name)

__len__

Source code in dandy/intel/intel.py
def __len__(self) -> int:
    return len(getattr(self, self._list_name))

__setitem__

Source code in dandy/intel/intel.py
def __setitem__(self, index, value: T):
    getattr(self, self._list_name)[index] = value

append

Source code in dandy/intel/intel.py
def append(self, item: T):
    getattr(self, self._list_name).append(item)

extend

Source code in dandy/intel/intel.py
def extend(self, items: List[T]):
    getattr(self, self._list_name).extend(items)

Bot dataclass

Bases: BaseProcessor, LlmProcessorMixin, HttpProcessorMixin, VisionProcessorMixin

services = BotService() class-attribute

description = 'Base Dandy Bot Class That Can Do Anything' class-attribute instance-attribute

process

Source code in dandy/processor/bot/bot.py
def process(
        self,
        prompt: PromptOrStr,
        intel_class: type[IntelType] | None = None,
) -> IntelType:
    return self.llm.prompt_to_intel(
        prompt=prompt,
        intel_class=intel_class,
    )

Agent dataclass

Bases: BaseProcessor, LlmProcessorMixin, HttpProcessorMixin, VisionProcessorMixin

plan_time_limit_seconds = settings.DEFAULT_AGENT_PLAN_TIME_LIMIT_SECONDS class-attribute instance-attribute

plan_task_count_limit = settings.DEFAULT_AGENT_PLAN_TASK_COUNT_LIMIT class-attribute instance-attribute

processors = (Bot,) class-attribute instance-attribute

services = AgentService() class-attribute

__init_subclass__

Source code in dandy/processor/agent/agent.py
def __init_subclass__(cls, **kwargs):
    if cls.processors is None or len(cls.processors) == 0:
        raise AgentCriticalException(
            f'{cls.__name__} must have a sequence of "BaseProcessor" sub classes defined on the "processors" class attribute.'
        )

    if cls._processors_strategy_class is None:
        raise AgentCriticalException(
            f'{cls.__name__} must have a "BaseProcessorsStrategy" sub class defined on the "_processors_strategy_class" class attribute.'
        )

__post_init__

Source code in dandy/processor/agent/agent.py
def __post_init__(self):
    if self._processors_strategy is None:
        self._processors_strategy = self._processors_strategy_class(
            self.processors
        )

process

Source code in dandy/processor/agent/agent.py
def process(
        self,
        prompt: PromptOrStr,
        intel_class: Type[IntelType] | None = None,
        intel_object: IntelType | None = None,
        images: List[str] | None = None,
        image_files: List[str | Path] | None = None,
        include_fields: IncEx | None = None,
        exclude_fields: IncEx | None = None,
        postfix_system_prompt: PromptOrStrOrNone = None,
        message_history: MessageHistory | None = None,
) -> IntelType:

    recorder_event_id = self._recorder_event_id

    recorder_add_llm_agent_create_plan_event(
        prompt,
        self._processors_strategy,
        recorder_event_id
    )

    plan = self._create_plan(prompt)

    recorder_add_llm_agent_finished_creating_plan_event(
        plan,
        recorder_event_id
    )

    recorder_add_llm_agent_running_plan_event(
        plan,
        recorder_event_id
    )

    while plan.is_incomplete:
        if plan.has_exceeded_time_limit:
            raise AgentOverThoughtRecoverableException(
                f'{self.__class__.__name__} exceeded the time limit of {self.plan_time_limit_seconds} seconds running a plan.'
            )

        task = plan.active_task

        recorder_add_llm_agent_start_task_event(
            task,
            self._processors_strategy,
            recorder_event_id
        )

        resource = self._processors_strategy.get_processor_from_key(task.processors_key)

        updated_task = resource.use(
            prompt=agent_do_task_prompt(task),
            intel_object=task,
            include_fields={'actual_result'}
        )

        task.actual_result = updated_task.actual_result
        plan.set_active_task_complete()

        recorder_add_llm_agent_completed_task_event(
            task,
            self._processors_strategy,
            recorder_event_id
        )

    recorder_add_llm_agent_done_executing_plan_event(
        plan,
        recorder_event_id
    )

    recorder_add_llm_agent_processing_final_result_event(
        plan,
        recorder_event_id
    )

    if postfix_system_prompt is None:
        postfix_system_prompt = Prompt()

    postfix_system_prompt.text(f'Use the results of the below simulated plan to accomplish the user request:')
    postfix_system_prompt.line_break()
    postfix_system_prompt.prompt(plan.to_prompt())

    return self.llm.prompt_to_intel(
        prompt=prompt,
        intel_class=intel_class,
        intel_object=intel_object,
        images=images,
        image_files=image_files,
        include_fields=include_fields,
        exclude_fields=exclude_fields,
        postfix_system_prompt=postfix_system_prompt,
        message_history=message_history,
    )

Map dataclass

Bases: BaseProcessor, LlmProcessorMixin

mapping_keys_description = None class-attribute instance-attribute

mapping = None class-attribute instance-attribute

services = MapService() class-attribute

__init_subclass__

Source code in dandy/processor/map/map.py
def __init_subclass__(cls):
    super().__init_subclass__()

    if cls.mapping_keys_description is None:
        raise MapCriticalException(f'{cls.__name__} `mapping_keys_description` is not set.')

    if cls.mapping is None:
        raise MapCriticalException(f'{cls.__name__} `mapping` is not set.')

__post_init__

Source code in dandy/processor/map/map.py
def __post_init__(self):
    if self.mapping_keys_description is None:
        self.mapping_keys_description = self.__class__.mapping_keys_description

    if self.mapping is None:
        self.mapping = self.__class__.mapping

    for key in self.mapping.keys():
        if not isinstance(key, str):
            raise MapCriticalException(f'Mapping keys must be strings, found {key} ({type(key)}).')

__getitem__

Source code in dandy/processor/map/map.py
def __getitem__(self, item):
    return self.mapping[item]

as_enum

Source code in dandy/processor/map/map.py
def as_enum(self) -> Enum:
    return Enum(
        f'{self.__class__.__name__}Enum',
        {
            value[0]: key
            for key, value in self._keyed_mapping.items()
        }
    )

process

Source code in dandy/processor/map/map.py
def process(
        self,
        prompt: PromptOrStr,
        max_return_values: int | None = None,
) -> MapValuesIntel:
    return self._process_map_to_intel(
        prompt,
        max_return_values
    )

process_to_future

Source code in dandy/processor/map/map.py
def process_to_future(self, *args, **kwargs) -> AsyncFuture[MapValuesIntel]:
    return AsyncFuture[MapValuesIntel](self.process, *args, **kwargs)

MemoryCache

Bases: BaseCache

cache_name instance-attribute

limit instance-attribute

__len__

Source code in dandy/cache/memory/cache.py
def __len__(self) -> int:
    return len(self._cache)

get

Source code in dandy/cache/memory/cache.py
def get(self, key: str) -> Union[Any, None]:
    return self._cache.get(key)

set

Source code in dandy/cache/memory/cache.py
def set(self, key: str, value: Any):
    self._cache[key] = value
    self.clean()

clean

Source code in dandy/cache/memory/cache.py
def clean(self):
    if len(self._cache) > self.limit:
        self._cache.popitem(last=False)

clear classmethod

Source code in dandy/cache/memory/cache.py
@classmethod
def clear(cls, cache_name: str = dandy.constants.DEFAULT_CACHE_NAME):
    if cache_name in _memory_cache:
        _memory_cache[cache_name].clear()

clear_all classmethod

Source code in dandy/cache/memory/cache.py
@classmethod
def clear_all(cls):
    _memory_cache.clear()

destroy_all classmethod

Source code in dandy/cache/memory/cache.py
@classmethod
def destroy_all(cls):
    cls.clear_all()

Prompt dataclass

input = (None,) class-attribute instance-attribute

tag = (None,) class-attribute instance-attribute

estimated_token_count property

__post_init__

Source code in dandy/llm/prompt/prompt.py
def __post_init__(
        self,
):
    self.snippets: List[snippet.BaseSnippet] = []

    if isinstance(self.input, Prompt):
        self.text(text=self.input.to_str())

    if isinstance(self.input, str):
        self.text(text=self.input)

__str__

Source code in dandy/llm/prompt/prompt.py
def __str__(self) -> str:
    return self.to_str()

to_str

Source code in dandy/llm/prompt/prompt.py
def to_str(self) -> str:
    prompt_string = ''.join([_.to_str() for _ in self.snippets])

    if isinstance(self.tag, str):
        return f'<{self.tag}>\n{prompt_string}\n</{self.tag}>\n'
    else:
        return prompt_string

dict

Source code in dandy/llm/prompt/prompt.py
def dict(
        self,
        dictionary: Dict,
        triple_quote: bool = False
) -> Self:

    self.snippets.append(
        snippet.DictionarySnippet(
            dictionary=dictionary,
            triple_quote=triple_quote
        )
    )

    return self

divider

Source code in dandy/llm/prompt/prompt.py
def divider(self) -> Self:
    self.snippets.append(snippet.DividerSnippet())

    return self

array

Source code in dandy/llm/prompt/prompt.py
def array(self, items: List[str]) -> Self:
    self.snippets.append(snippet.ArraySnippet(items=items))

    return self

array_random_order

Source code in dandy/llm/prompt/prompt.py
def array_random_order(self, items: List[str]) -> Self:
    self.snippets.append(snippet.ArrayRandomOrderSnippet(items=items))

    return self

file

Source code in dandy/llm/prompt/prompt.py
def file(
        self,
        file_path: Union[str, Path],
        encoding: str = 'utf-8',
        triple_quote: bool = False
) -> Self:

    self.snippets.append(
        snippet.FileSnippet(
            file_path=file_path,
            encoding=encoding,
            triple_quote=triple_quote
        )
    )

    return self

heading

Source code in dandy/llm/prompt/prompt.py
def heading(
        self,
        heading: str,
) -> Self:

    self.snippets.append(
        snippet.HeadingSnippet(
            heading=heading,
        )
    )

    return self

line_break

Source code in dandy/llm/prompt/prompt.py
def line_break(self) -> Self:
    self.snippets.append(snippet.LineBreakSnippet())

    return self

list

Source code in dandy/llm/prompt/prompt.py
def list(
        self,
        items: List[str],
        triple_quote: bool = False
) -> Self:

    self.unordered_list(
        items=items,
        triple_quote=triple_quote
    )

    return self

intel

Source code in dandy/llm/prompt/prompt.py
def intel(
        self,
        intel: BaseIntel,
        triple_quote: bool = False
) -> Self:

    self.snippets.append(
        snippet.IntelSnippet(
            intel=intel,
            triple_quote=triple_quote
        )
    )

    return self

intel_schema

Source code in dandy/llm/prompt/prompt.py
def intel_schema(
        self,
        intel_class: Type[BaseIntel],
        triple_quote: bool = False
) -> Self:

    self.snippets.append(
        snippet.IntelSchemaSnippet(
            intel_class=intel_class,
            triple_quote=triple_quote
        )
    )

    return self

module_source

Source code in dandy/llm/prompt/prompt.py
def module_source(
        self,
        module_name: str,
        triple_quote: bool = True
) -> Self:

    self.snippets.append(
        snippet.ModuleSourceSnippet(
            module_name=module_name,
            triple_quote=triple_quote,
            triple_quote_label=module_name
        )
    )

    return self

object_source

Source code in dandy/llm/prompt/prompt.py
def object_source(
        self,
        object_module_name: str,
        triple_quote: bool = True
) -> Self:

    self.snippets.append(
        snippet.ObjectSourceSnippet(
            object_module_name=object_module_name,
            triple_quote=triple_quote,
            triple_quote_label=object_module_name
        )
    )

    return self

ordered_list

Source code in dandy/llm/prompt/prompt.py
def ordered_list(
        self,
        items: List[str],
        triple_quote: bool = False
) -> Self:

    self.snippets.append(
        snippet.OrderedListSnippet(
            items=items,
            triple_quote=triple_quote
        )
    )

    return self

prompt

Source code in dandy/llm/prompt/prompt.py
def prompt(
        self,
        prompt: Self | str,
        triple_quote: bool = False
) -> Self:

    self.snippets.append(
        snippet.PromptSnippet(
            prompt=prompt,
            triple_quote=triple_quote
        )
    )

    return self

random_choice

Source code in dandy/llm/prompt/prompt.py
def random_choice(
        self,
        choices: List[str],
        triple_quote: bool = False
) -> Self:

    self.snippets.append(
        snippet.RandomChoiceSnippet(
            choices=choices,
            triple_quote=triple_quote,
        )
    )

    return self

sub_heading

Source code in dandy/llm/prompt/prompt.py
def sub_heading(
        self,
        sub_heading: str,
) -> Self:

    self.snippets.append(
        snippet.SubHeadingSnippet(
            sub_heading=sub_heading,
        )
    )

    return self

text

Source code in dandy/llm/prompt/prompt.py
def text(
        self,
        text: str = '',
        label: str = '',
        triple_quote: bool = False,
        triple_quote_label: Union[str, None] = None,
) -> Self:

    self.snippets.append(
        snippet.TextSnippet(
            text=text,
            label=label,
            triple_quote=triple_quote,
            triple_quote_label=triple_quote_label,
        )
    )

    return self

title

Source code in dandy/llm/prompt/prompt.py
def title(
        self,
        title: str,
) -> Self:

    self.snippets.append(
        snippet.TitleSnippet(
            title=title,
        )
    )

    return self

unordered_list

Source code in dandy/llm/prompt/prompt.py
def unordered_list(
        self,
        items: List[str],
        triple_quote: bool = False
) -> Self:

    self.snippets.append(
        snippet.UnorderedListSnippet(
            items=items,
            triple_quote=triple_quote
        )
    )

    return self

unordered_random_list

Source code in dandy/llm/prompt/prompt.py
def unordered_random_list(
        self,
        items: List[str],
        triple_quote: bool = False
) -> Self:

    self.snippets.append(
        snippet.UnorderedRandomListSnippet(
            items=items,
            triple_quote=triple_quote
        )
    )

    return self

Recorder

Bases: Singleton

recordings = dict() class-attribute instance-attribute

renderers = {'html': HtmlRecordingRenderer, 'json': JsonRecordingRenderer, 'markdown': MarkdownRecordingRenderer} class-attribute instance-attribute

add_event classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def add_event(cls, event: Event):
    for recording in cls.recordings.values():
        if recording.is_running:
            recording.event_store.add_event(event)

check_recording_is_valid classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def check_recording_is_valid(cls, recording_name: str = RECORDING_DEFAULT_NAME):
    if recording_name not in cls.recordings:
        choices_message = ''

        if len(cls.recordings.keys()) == 0:
            choices_message = f' Choices are {list(cls.recordings.keys())}'

        raise RecorderCriticalException(f'Recording "{recording_name}" does not exist. {choices_message}')

delete_all_recordings classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def delete_all_recordings(cls):
    cls.recordings.clear()

delete_recording classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def delete_recording(cls, recording_name: str = RECORDING_DEFAULT_NAME):
    cls.check_recording_is_valid(recording_name)
    del cls.recordings[recording_name]

get_recording classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def get_recording(cls, recording_name: str = RECORDING_DEFAULT_NAME) -> Recording:
    cls.check_recording_is_valid(recording_name)
    return cls.recordings[recording_name]

is_recording classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def is_recording(cls):
    return any([recording.is_running for recording in cls.recordings.values()])

start_recording classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def start_recording(cls, recording_name: str = RECORDING_DEFAULT_NAME):
    cls.recordings[recording_name] = Recording(name=recording_name)
    cls.recordings[recording_name].start()

stop_recording classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def stop_recording(cls, recording_name: str = RECORDING_DEFAULT_NAME):
    cls.check_recording_is_valid(recording_name)
    cls.recordings[recording_name].stop()

stop_all_recording classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def stop_all_recording(cls):
    for recording in cls.recordings.values():
        recording.stop()

to_html_file classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def to_html_file(
        cls,
        recording_name: str = RECORDING_DEFAULT_NAME,
        path: Path | str = DEFAULT_RECORDER_OUTPUT_PATH
):
    cls._to_file(
        recording_name,
        'html',
        path
    )

to_html_str classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def to_html_str(
        cls,
        recording_name: str = RECORDING_DEFAULT_NAME,
) -> str:
    return cls._to_str(
        recording_name,
        'html',
    )

to_json_file classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def to_json_file(
        cls,
        recording_name: str = RECORDING_DEFAULT_NAME,
        path: Path | str = DEFAULT_RECORDER_OUTPUT_PATH
):
    cls._to_file(
        recording_name,
        'json',
        path
    )

to_json_str classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def to_json_str(
        cls,
        recording_name: str = RECORDING_DEFAULT_NAME,
) -> str:
    return cls._to_str(
        recording_name,
        'json',
    )

to_markdown_file classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def to_markdown_file(
        cls,
        recording_name: str = RECORDING_DEFAULT_NAME,
        path: Path | str = DEFAULT_RECORDER_OUTPUT_PATH
):
    cls._to_file(
        recording_name,
        'markdown',
        path
    )

to_markdown_str classmethod

Source code in dandy/recorder/recorder.py
@classmethod
def to_markdown_str(
        cls,
        recording_name: str = RECORDING_DEFAULT_NAME,
) -> str:
    return cls._to_str(
        recording_name,
        'markdown',
    )

SqliteCache

Bases: BaseCache

cache_name instance-attribute

limit instance-attribute

model_post_init

Source code in dandy/cache/sqlite/cache.py
def model_post_init(self, __context: Any):
    self._create_table()

__len__

Source code in dandy/cache/sqlite/cache.py
def __len__(self) -> int:
    if not self._table_exists():
        return 0

    with SqliteConnection(SQLITE_CACHE_DB_NAME) as connection:
        cursor = connection.cursor()

        cursor.execute(
            f'SELECT COUNT(*) FROM {SQLITE_CACHE_TABLE_NAME} WHERE cache_name = ?',
            (self.cache_name,)
        )

        return cursor.fetchone()[0]

get

Source code in dandy/cache/sqlite/cache.py
def get(self, key: str) -> Union[Any, None]:
    if not self._table_exists():
        return None

    with SqliteConnection(SQLITE_CACHE_DB_NAME) as connection:
        cursor = connection.cursor()

        cursor.execute(
            f'SELECT value FROM {SQLITE_CACHE_TABLE_NAME} WHERE key = ? AND cache_name = ?',
            (key, self.cache_name)
        )
        result = cursor.fetchone()

        if result:
            return pickle.loads(result[0])

        return None

set

Source code in dandy/cache/sqlite/cache.py
def set(self, key: str, value: Any):
    with SqliteConnection(SQLITE_CACHE_DB_NAME) as connection:
        cursor = connection.cursor()

        value_string = pickle.dumps(value)

        try:
            cursor.execute(
                f'INSERT INTO {SQLITE_CACHE_TABLE_NAME} (key, value, cache_name) VALUES (?, ?, ?)',
                (key, value_string, self.cache_name)
            )
        except sqlite3.IntegrityError:
            cursor.execute(
                f'UPDATE {SQLITE_CACHE_TABLE_NAME} SET value = ? WHERE key = ? AND cache_name = ?',
                (value_string, key, self.cache_name)
            )

        connection.commit()
        self.clean()

clean

Source code in dandy/cache/sqlite/cache.py
def clean(self):
    with SqliteConnection(SQLITE_CACHE_DB_NAME) as connection:
        cursor = connection.cursor()

        excess_threshold = int(self.limit * 0.10)
        excess_rows = self.__len__() - self.limit

        if excess_rows >= excess_threshold:
            cursor.execute(
                f'DELETE FROM {SQLITE_CACHE_TABLE_NAME} WHERE key IN (SELECT key FROM {SQLITE_CACHE_TABLE_NAME} WHERE cache_name = ? ORDER BY created_at LIMIT ?)',
                (self.cache_name, excess_threshold)
            )

        connection.commit()

clear classmethod

Source code in dandy/cache/sqlite/cache.py
@classmethod
def clear(cls, cache_name: str = dandy.constants.DEFAULT_CACHE_NAME):
    if cls._table_exists():
        with SqliteConnection(SQLITE_CACHE_DB_NAME) as connection:
            cursor = connection.cursor()
            cursor.execute(
                f'DELETE FROM {SQLITE_CACHE_TABLE_NAME} WHERE cache_name = ?',
                (cache_name,)
            )
            connection.commit()

clear_all classmethod

Source code in dandy/cache/sqlite/cache.py
@classmethod
def clear_all(cls):
    if cls._table_exists():
        with SqliteConnection(SQLITE_CACHE_DB_NAME) as connection:
            cursor = connection.cursor()
            cursor.execute(f'DELETE FROM {SQLITE_CACHE_TABLE_NAME}')
            connection.commit()

destroy_all classmethod

Source code in dandy/cache/sqlite/cache.py
@classmethod
def destroy_all(cls, cache_name: str = dandy.constants.DEFAULT_CACHE_NAME):
    SqliteConnection(SQLITE_CACHE_DB_NAME).delete_db_file()

generate_cache_key

Source code in dandy/cache/utils.py
def generate_cache_key(func: object, *args, **kwargs) -> str:
    # Todo: Why are the args all over the map
    # hashable_args = tuple([convert_to_hashable_str(arg) for arg in args])

    hashable_kwargs = tuple(
        sorted(
            (key, convert_to_hashable_str(value)) for key, value in kwargs.items()
        )
    )

    hashable_tuple = (
        func.__module__,
        func.__qualname__,
        # Todo: Why are the args all over the map
        # hashable_args,
        hashable_kwargs,
    )

    hash_key = hashlib.sha256(
        str(hashable_tuple).encode()
    ).hexdigest()

    return hash_key

cache_to_memory

Source code in dandy/cache/memory/decorators.py
def cache_to_memory(
        cache_name: str = dandy.constants.DEFAULT_CACHE_NAME,
        limit: int = settings.CACHE_MEMORY_LIMIT
):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return cache_decorator_function(
                MemoryCache(
                    cache_name=cache_name,
                    limit=limit
                ),
                func,
                *args,
                **kwargs
            )

        return wrapper

    return decorator

cache_to_sqlite

Source code in dandy/cache/sqlite/decorators.py
def cache_to_sqlite(
        cache_name: str = dandy.constants.DEFAULT_CACHE_NAME,
        limit: int = settings.CACHE_SQLITE_LIMIT
):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return cache_decorator_function(
                SqliteCache(
                    cache_name=cache_name,
                    limit=limit,
                ),
                func,
                *args,
                **kwargs
            )

        return wrapper

    return decorator

recorder_to_html_file

Source code in dandy/recorder/decorators.py
def recorder_to_html_file(recording_name: str = RECORDING_DEFAULT_NAME, path: Path | str = DEFAULT_RECORDER_OUTPUT_PATH):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return _recorder_to_file_decorator_function(func, args, kwargs, recording_name, 'html', path)

        return wrapper

    return decorator

recorder_to_json_file

Source code in dandy/recorder/decorators.py
def recorder_to_json_file(recording_name: str = RECORDING_DEFAULT_NAME, path: Path | str = DEFAULT_RECORDER_OUTPUT_PATH):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return _recorder_to_file_decorator_function(func, args, kwargs, recording_name, 'json', path)

        return wrapper

    return decorator

recorder_to_markdown_file

Source code in dandy/recorder/decorators.py
def recorder_to_markdown_file(recording_name: str = RECORDING_DEFAULT_NAME, path: Path | str = DEFAULT_RECORDER_OUTPUT_PATH):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            return _recorder_to_file_decorator_function(func, args, kwargs, recording_name, 'markdown', path)

        return wrapper

    return decorator