Source code for granular_configuration_language.yaml.file_ops._chain

from __future__ import annotations

import collections.abc as tabc
import os
import sys
import typing as typ
from functools import partial
from itertools import chain
from pathlib import Path

from granular_configuration_language.exceptions import ParsingTriedToCreateALoop
from granular_configuration_language.yaml.classes import LoadOptions, Tag

if sys.version_info >= (3, 12):

    def walkup(file: Path, relative_to: Path) -> Path:
        return file.relative_to(relative_to, walk_up=True)

else:
    import os

    def _get_segments(path: os.PathLike) -> tabc.Iterator[str]:
        head, tail = os.path.split(path)
        if tail:
            yield from _get_segments(head)
            yield tail
        else:
            yield head

    def get_segments(path: os.PathLike) -> list[str]:
        return list(_get_segments(path))

    def walkup(file: Path, relative_to: Path) -> Path:
        # Modified from the 3.12 pathlib.PurePath.relative_to implementation

        for step, path in enumerate([relative_to] + list(relative_to.parents)):  # noqa: B007
            if file.is_relative_to(path):
                break
            elif path.name == "..":  # pragma: no cover
                raise ValueError(f"'..' segment in {str(relative_to)!r} cannot be walked")
        else:
            raise ValueError(f"{str(file)!r} and {str(relative_to)!r} have different anchors")
        parts = [".."] * step + get_segments(file)[len(get_segments(path)) :]
        return Path(*parts)


ENV_VAR_FILE_EXTENSION: typ.Final = ".environment_variable-a5b55071-b86e-4f22-90fc-c9db335691f6"


def _pretty_source(source: Path, *, relative_to: Path, seen: set[str]) -> str:
    if source.suffix == ENV_VAR_FILE_EXTENSION:
        return "$" + source.stem
    elif source.name not in seen:
        seen.add(source.name)
        return source.name
    else:
        try:
            return str(walkup(source, relative_to))
        except ValueError:
            return "?/" + source.name


def _get_reversed_source_chain(options: LoadOptions) -> tabc.Iterator[Path]:
    if options.previous:
        yield from _get_reversed_source_chain(options.previous)

    if options.file_location:
        yield options.file_location


def stringify_source_chain(sources: tabc.Iterable[Path]) -> str:
    return "→".join(chain(map(partial(_pretty_source, relative_to=Path().resolve(), seen=set()), sources), ("...",)))


def is_in_chain(file: Path, options: LoadOptions) -> bool:
    # Note *.environment_variable don't exist, so `.resolve()` and `.samefile()` fail

    if (
        options.file_location
        and (file.name == options.file_location.name)
        and (file == options.file_location or file.samefile(options.file_location))
    ):
        return True
    elif options.previous:
        return is_in_chain(file, options.previous)
    else:
        return False


def make_chain_message(tag: Tag, value: str, options: LoadOptions) -> ParsingTriedToCreateALoop:
    return ParsingTriedToCreateALoop(
        f"`{tag} {value}` tried to load itself in chain: ({stringify_source_chain(_get_reversed_source_chain(options))})"
    )


[docs] def as_file_path(tag: Tag, file_name: str, options: LoadOptions) -> Path: """ .. versionadded:: 2.3.0 Converts the relative file name to a :py:class:`~pathlib.Path` and checks if it has already been loaded. :param Tag tag: Tag doing this, used for error reporting. :param str file_name: Name of the file being loaded :param LoadOptions options: options from the Tag doing this action, used for tracking chains. :return: Path instance :rtype: ~pathlib.Path """ result = options.relative_to_directory / file_name if is_in_chain(result, options): raise make_chain_message(tag, file_name, options) return result