# This module is part of GitPython and is released under the
# 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/

"""Standalone functions to accompany the index implementation and make it more
versatile."""

__all__ = [
    "write_cache",
    "read_cache",
    "write_tree_from_cache",
    "entry_key",
    "stat_mode_to_index_mode",
    "S_IFGITLINK",
    "run_commit_hook",
    "hook_path",
]

from io import BytesIO
import os
import os.path as osp
from pathlib import Path
from stat import S_IFDIR, S_IFLNK, S_IFMT, S_IFREG, S_ISDIR, S_ISLNK, S_IXUSR
import subprocess
import sys

from gitdb.base import IStream
from gitdb.typ import str_tree_type

from git.cmd import handle_process_output, safer_popen
from git.compat import defenc, force_bytes, force_text, safe_decode
from git.exc import HookExecutionError, UnmergedEntriesError
from git.objects.fun import (
    traverse_tree_recursive,
    traverse_trees_recursive,
    tree_to_stream,
)
from git.util import IndexFileSHA1Writer, finalize_process

from .typ import BaseIndexEntry, IndexEntry, CE_NAMEMASK, CE_STAGESHIFT
from .util import pack, unpack

# typing -----------------------------------------------------------------------------

from typing import Dict, IO, List, Sequence, TYPE_CHECKING, Tuple, Type, Union, cast

from git.types import PathLike

if TYPE_CHECKING:
    from git.db import GitCmdObjectDB
    from git.objects.tree import TreeCacheTup

    from .base import IndexFile

# ------------------------------------------------------------------------------------

S_IFGITLINK = S_IFLNK | S_IFDIR
"""Flags for a submodule."""

CE_NAMEMASK_INV = ~CE_NAMEMASK


def hook_path(name: str, git_dir: PathLike) -> str:
    """:return: path to the given named hook in the given git repository directory"""
    return osp.join(git_dir, "hooks", name)


def _has_file_extension(path: str) -> str:
    return osp.splitext(path)[1]


def run_commit_hook(name: str, index: "IndexFile", *args: str) -> None:
    """Run the commit hook of the given name. Silently ignore hooks that do not exist.

    :param name:
        Name of hook, like ``pre-commit``.

    :param index:
        :class:`~git.index.base.IndexFile` instance.

    :param args:
        Arguments passed to hook file.

    :raise git.exc.HookExecutionError:
    """
    hp = hook_path(name, index.repo.git_dir)
    if not os.access(hp, os.X_OK):
        return

    env = os.environ.copy()
    env["GIT_INDEX_FILE"] = safe_decode(str(index.path))
    env["GIT_EDITOR"] = ":"
    cmd = [hp]
    try:
        if sys.platform == "win32" and not _has_file_extension(hp):
            # Windows only uses extensions to determine how to open files
            # (doesn't understand shebangs). Try using bash to run the hook.
            relative_hp = Path(hp).relative_to(index.repo.working_dir).as_posix()
            cmd = ["bash.exe", relative_hp]

        process = safer_popen(
            cmd + list(args),
            env=env,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            cwd=index.repo.working_dir,
        )
    except Exception as ex:
        raise HookExecutionError(hp, ex) from ex
    else:
        stdout_list: List[str] = []
        stderr_list: List[str] = []
        handle_process_output(process, stdout_list.append, stderr_list.append, finalize_process)
        stdout = "".join(stdout_list)
        stderr = "".join(stderr_list)
        if process.returncode != 0:
            stdout = force_text(stdout, defenc)
            stderr = force_text(stderr, defenc)
            raise HookExecutionError(hp, process.returncode, stderr, stdout)
    # END handle return code


def stat_mode_to_index_mode(mode: int) -> int:
    """Convert the given mode from a stat call to the corresponding index mode and
    return it."""
    if S_ISLNK(mode):  # symlinks
        return S_IFLNK
    if S_ISDIR(mode) or S_IFMT(mode) == S_IFGITLINK:  # submodules
        return S_IFGITLINK
    return S_IFREG | (mode & S_IXUSR and 0o755 or 0o644)  # blobs with or without executable bit


def write_cache(
    entries: Sequence[Union[BaseIndexEntry, "IndexEntry"]],
    stream: IO[bytes],
    extension_data: Union[None, bytes] = None,
    ShaStreamCls: Type[IndexFileSHA1Writer] = IndexFileSHA1Writer,
) -> None:
    """Write the cache represented by entries to a stream.

    :param entries:
        **Sorted** list of entries.

    :param stream:
        Stream to wrap into the AdapterStreamCls - it is used for final output.

    :param ShaStreamCls:
        Type to use when writing to the stream. It produces a sha while writing to it,
        before the data is passed on to the wrapped stream.

    :param extension_data:
        Any kind of data to write as a trailer, it must begin a 4 byte identifier,
        followed by its size (4 bytes).
    """
    # Wrap the stream into a compatible writer.
    stream_sha = ShaStreamCls(stream)

    tell = stream_sha.tell
    write = stream_sha.write

    # Header
    version = 2
    write(b"DIRC")
    write(pack(">LL", version, len(entries)))

    # Body
    for entry in entries:
        beginoffset = tell()
        write(entry.ctime_bytes)  # ctime
        write(entry.mtime_bytes)  # mtime
        path_str = str(entry.path)
        path: bytes = force_bytes(path_str, encoding=defenc)
        plen = len(path) & CE_NAMEMASK  # Path length
        assert plen == len(path), "Path %s too long to fit into index" % entry.path
        flags = plen | (entry.flags & CE_NAMEMASK_INV)  # Clear possible previous values.
        write(
            pack(
                ">LLLLLL20sH",
                entry.dev,
                entry.inode,
                entry.mode,
                entry.uid,
                entry.gid,
                entry.size,
                entry.binsha,
                flags,
            )
        )
        write(path)
        real_size = (tell() - beginoffset + 8) & ~7
        write(b"\0" * ((beginoffset + real_size) - tell()))
    # END for each entry

    # Write previously cached extensions data.
    if extension_data is not None:
        stream_sha.write(extension_data)

    # Write the sha over the content.
    stream_sha.write_sha()


def read_header(stream: IO[bytes]) -> Tuple[int, int]:
    """Return tuple(version_long, num_entries) from the given stream."""
    type_id = stream.read(4)
    if type_id != b"DIRC":
        raise AssertionError("Invalid index file header: %r" % type_id)
    unpacked = cast(Tuple[int, int], unpack(">LL", stream.read(4 * 2)))
    version, num_entries = unpacked

    # TODO: Handle version 3: extended data, see read-cache.c.
    assert version in (1, 2)
    return version, num_entries


def entry_key(*entry: Union[BaseIndexEntry, PathLike, int]) -> Tuple[PathLike, int]:
    """
    :return:
        Key suitable to be used for the
        :attr:`index.entries <git.index.base.IndexFile.entries>` dictionary.

    :param entry:
        One instance of type BaseIndexEntry or the path and the stage.
    """

    # def is_entry_key_tup(entry_key: Tuple) -> TypeGuard[Tuple[PathLike, int]]:
    #     return isinstance(entry_key, tuple) and len(entry_key) == 2

    if len(entry) == 1:
        entry_first = entry[0]
        assert isinstance(entry_first, BaseIndexEntry)
        return (entry_first.path, entry_first.stage)
    else:
        # assert is_entry_key_tup(entry)
        entry = cast(Tuple[PathLike, int], entry)
        return entry
    # END handle entry


def read_cache(
    stream: IO[bytes],
) -> Tuple[int, Dict[Tuple[PathLike, int], "IndexEntry"], bytes, bytes]:
    """Read a cache file from the given stream.

    :return:
        tuple(version, entries_dict, extension_data, content_sha)

        * *version* is the integer version number.
        * *entries_dict* is a dictionary which maps IndexEntry instances to a path at a
          stage.
        * *extension_data* is ``""`` or 4 bytes of type + 4 bytes of size + size bytes.
        * *content_sha* is a 20 byte sha on all cache file contents.
    """
    version, num_entries = read_header(stream)
    count = 0
    entries: Dict[Tuple[PathLike, int], "IndexEntry"] = {}

    read = stream.read
    tell = stream.tell
    while count < num_entries:
        beginoffset = tell()
        ctime = unpack(">8s", read(8))[0]
        mtime = unpack(">8s", read(8))[0]
        (dev, ino, mode, uid, gid, size, sha, flags) = unpack(">LLLLLL20sH", read(20 + 4 * 6 + 2))
        path_size = flags & CE_NAMEMASK
        path = read(path_size).decode(defenc)

        real_size = (tell() - beginoffset + 8) & ~7
        read((beginoffset + real_size) - tell())
        entry = IndexEntry((mode, sha, flags, path, ctime, mtime, dev, ino, uid, gid, size))
        # entry_key would be the method to use, but we save the effort.
        entries[(path, entry.stage)] = entry
        count += 1
    # END for each entry

    # The footer contains extension data and a sha on the content so far.
    # Keep the extension footer,and verify we have a sha in the end.
    # Extension data format is:
    #   4 bytes ID
    #   4 bytes length of chunk
    #   Repeated 0 - N times
    extension_data = stream.read(~0)
    assert len(extension_data) > 19, (
        "Index Footer was not at least a sha on content as it was only %i bytes in size" % len(extension_data)
    )

    content_sha = extension_data[-20:]

    # Truncate the sha in the end as we will dynamically create it anyway.
    extension_data = extension_data[:-20]

    return (version, entries, extension_data, content_sha)


def write_tree_from_cache(
    entries: List[IndexEntry], odb: "GitCmdObjectDB", sl: slice, si: int = 0
) -> Tuple[bytes, List["TreeCacheTup"]]:
    R"""Create a tree from the given sorted list of entries and put the respective
    trees into the given object database.

    :param entries:
        **Sorted** list of :class:`~git.index.typ.IndexEntry`\s.

    :param odb:
        Object database to store the trees in.

    :param si:
        Start index at which we should start creating subtrees.

    :param sl:
        Slice indicating the range we should process on the entries list.

    :return:
        tuple(binsha, list(tree_entry, ...))

        A tuple of a sha and a list of tree entries being a tuple of hexsha, mode, name.
    """
    tree_items: List["TreeCacheTup"] = []

    ci = sl.start
    end = sl.stop
    while ci < end:
        entry = entries[ci]
        if entry.stage != 0:
            raise UnmergedEntriesError(entry)
        # END abort on unmerged
        ci += 1
        rbound = entry.path.find("/", si)
        if rbound == -1:
            # It's not a tree.
            tree_items.append((entry.binsha, entry.mode, entry.path[si:]))
        else:
            # Find common base range.
            base = entry.path[si:rbound]
            xi = ci
            while xi < end:
                oentry = entries[xi]
                orbound = oentry.path.find("/", si)
                if orbound == -1 or oentry.path[si:orbound] != base:
                    break
                # END abort on base mismatch
                xi += 1
            # END find common base

            # Enter recursion.
            # ci - 1 as we want to count our current item as well.
            sha, _tree_entry_list = write_tree_from_cache(entries, odb, slice(ci - 1, xi), rbound + 1)
            tree_items.append((sha, S_IFDIR, base))

            # Skip ahead.
            ci = xi
        # END handle bounds
    # END for each entry

    # Finally create the tree.
    sio = BytesIO()
    tree_to_stream(tree_items, sio.write)  # Writes to stream as bytes, but doesn't change tree_items.
    sio.seek(0)

    istream = odb.store(IStream(str_tree_type, len(sio.getvalue()), sio))
    return (istream.binsha, tree_items)


def _tree_entry_to_baseindexentry(tree_entry: "TreeCacheTup", stage: int) -> BaseIndexEntry:
    return BaseIndexEntry((tree_entry[1], tree_entry[0], stage << CE_STAGESHIFT, tree_entry[2]))


def aggressive_tree_merge(odb: "GitCmdObjectDB", tree_shas: Sequence[bytes]) -> List[BaseIndexEntry]:
    R"""
    :return:
        List of :class:`~git.index.typ.BaseIndexEntry`\s representing the aggressive
        merge of the given trees. All valid entries are on stage 0, whereas the
        conflicting ones are left on stage 1, 2 or 3, whereas stage 1 corresponds to the
        common ancestor tree, 2 to our tree and 3 to 'their' tree.

    :param tree_shas:
        1, 2 or 3 trees as identified by their binary 20 byte shas. If 1 or two, the
        entries will effectively correspond to the last given tree. If 3 are given, a 3
        way merge is performed.
    """
    out: List[BaseIndexEntry] = []

    # One and two way is the same for us, as we don't have to handle an existing
    # index, instrea
    if len(tree_shas) in (1, 2):
        for entry in traverse_tree_recursive(odb, tree_shas[-1], ""):
            out.append(_tree_entry_to_baseindexentry(entry, 0))
        # END for each entry
        return out
    # END handle single tree

    if len(tree_shas) > 3:
        raise ValueError("Cannot handle %i trees at once" % len(tree_shas))

    # Three trees.
    for base, ours, theirs in traverse_trees_recursive(odb, tree_shas, ""):
        if base is not None:
            # Base version exists.
            if ours is not None:
                # Ours exists.
                if theirs is not None:
                    # It exists in all branches. Ff it was changed in both
                    # its a conflict. Otherwise, we take the changed version.
                    # This should be the most common branch, so it comes first.
                    if (base[0] != ours[0] and base[0] != theirs[0] and ours[0] != theirs[0]) or (
                        base[1] != ours[1] and base[1] != theirs[1] and ours[1] != theirs[1]
                    ):
                        # Changed by both.
                        out.append(_tree_entry_to_baseindexentry(base, 1))
                        out.append(_tree_entry_to_baseindexentry(ours, 2))
                        out.append(_tree_entry_to_baseindexentry(theirs, 3))
                    elif base[0] != ours[0] or base[1] != ours[1]:
                        # Only we changed it.
                        out.append(_tree_entry_to_baseindexentry(ours, 0))
                    else:
                        # Either nobody changed it, or they did. In either
                        # case, use theirs.
                        out.append(_tree_entry_to_baseindexentry(theirs, 0))
                    # END handle modification
                else:
                    if ours[0] != base[0] or ours[1] != base[1]:
                        # They deleted it, we changed it, conflict.
                        out.append(_tree_entry_to_baseindexentry(base, 1))
                        out.append(_tree_entry_to_baseindexentry(ours, 2))
                    # else:
                    #   # We didn't change it, ignore.
                    #   pass
                    # END handle our change
                # END handle theirs
            else:
                if theirs is None:
                    # Deleted in both, its fine - it's out.
                    pass
                else:
                    if theirs[0] != base[0] or theirs[1] != base[1]:
                        # Deleted in ours, changed theirs, conflict.
                        out.append(_tree_entry_to_baseindexentry(base, 1))
                        out.append(_tree_entry_to_baseindexentry(theirs, 3))
                    # END theirs changed
                    # else:
                    #   # Theirs didn't change.
                    #   pass
                # END handle theirs
            # END handle ours
        else:
            # All three can't be None.
            if ours is None:
                # Added in their branch.
                assert theirs is not None
                out.append(_tree_entry_to_baseindexentry(theirs, 0))
            elif theirs is None:
                # Added in our branch.
                out.append(_tree_entry_to_baseindexentry(ours, 0))
            else:
                # Both have it, except for the base, see whether it changed.
                if ours[0] != theirs[0] or ours[1] != theirs[1]:
                    out.append(_tree_entry_to_baseindexentry(ours, 2))
                    out.append(_tree_entry_to_baseindexentry(theirs, 3))
                else:
                    # It was added the same in both.
                    out.append(_tree_entry_to_baseindexentry(ours, 0))
                # END handle two items
            # END handle heads
        # END handle base exists
    # END for each entries tuple

    return out
