"""
x2sys_cross - Calculate crossovers between track data files.
"""
import contextlib
import os
from collections.abc import Sequence
from pathlib import Path
from typing import Any, Literal
import pandas as pd
from pygmt._typing import PathLike
from pygmt.alias import AliasSystem
from pygmt.clib import Session
from pygmt.exceptions import GMTTypeError
from pygmt.helpers import (
    build_arg_list,
    data_kind,
    fmt_docstring,
    unique_name,
    use_alias,
)
@contextlib.contextmanager
def tempfile_from_dftrack(track, suffix):
    """
    Saves :class:`pandas.DataFrame` track table to a temporary tab-separated ASCII text
    file with a unique name (to prevent clashes when running x2sys_cross), adding a
    suffix extension to the end.
    Parameters
    ----------
    track : :class:`pandas.DataFrame`
        A table holding track data with coordinate (x, y) or (lon, lat) values,
        and (optionally) time (t).
    suffix : str
        File extension, e.g. xyz, tsv, etc.
    Yields
    ------
    tmpfilename : str
        A temporary tab-separated value file with a unique name holding the
        track data. E.g. "track-1a2b3c4.tsv".
    """
    try:
        tmpfilename = f"track-{unique_name()[:7]}.{suffix}"
        track.to_csv(
            path_or_buf=tmpfilename,
            sep="\t",
            index=False,
            na_rep="NaN",  # write a NaN value explicitly instead of a blank string
            date_format="%Y-%m-%dT%H:%M:%S.%fZ",  # ISO8601 format
        )
        yield tmpfilename
    finally:
        Path(tmpfilename).unlink()
[docs]
@fmt_docstring
@use_alias(
    A="combitable",
    C="runtimes",
    D="override",
    I="interpolation",
    S="speed",
    T="tag",
    Q="coe",
    W="numpoints",
    Z="trackvalues",
)
def x2sys_cross(
    tracks=None,
    outfile: PathLike | None = None,
    region: Sequence[float | str] | str | None = None,
    verbose: Literal["quiet", "error", "warning", "timing", "info", "compat", "debug"]
    | bool = False,
    **kwargs,
) -> pd.DataFrame | None:
    r"""
    Calculate crossovers between track data files.
    Determines all intersections between ("external cross-overs") or within
    ("internal cross-overs") tracks (Cartesian or geographic), and report the
    time, position, distance along track, heading and speed along each track
    segment, and the crossover error (COE) and mean values for all observables.
    By default, :func:`pygmt.x2sys_cross` will look for both external and
    internal COEs. As an option, you may choose to project all data using one
    of the map projections prior to calculating the COE.
    Full GMT docs at :gmt-docs:`supplements/x2sys/x2sys_cross.html`.
    {aliases}
       - R = region
       - V = verbose
    Parameters
    ----------
    tracks : :class:`pandas.DataFrame`, str, or list
        A table or a list of tables with (x, y) or (lon, lat) values in the
        first two columns. Track(s) can be provided as :class:`pandas.DataFrame` tables
        or file names. Supported file formats are ASCII, native binary, or
        COARDS netCDF 1-D data. More columns may also be present.
        If the file names are missing their file extension, we will append the suffix
        specified for this TAG. Track files will be searched for first in the current
        directory and second in all directories listed in $X2SYS_HOME/TAG/TAG_paths.txt
        (if it exists). [If environment variable :term:`X2SYS_HOME` is not set it will
        default to $GMT_SHAREDIR/x2sys]. (**Note**: MGD77 files will also be looked for
        via $MGD77_HOME/mgd77_paths.txt and .gmt files will be searched for via
        $GMT_SHAREDIR/mgg/gmtfile_paths).
    outfile
        The file name for the output ASCII txt file to store the table in.
    tag : str
        Specify the x2sys TAG which identifies the attributes of this data
        type.
    combitable : str
        Only process the pair-combinations found in the file *combitable*
        [Default process all possible combinations among the specified files].
        The file *combitable* is created by :gmt-docs:`x2sys_get's -L option
        <supplements/x2sys/x2sys_get.html#l>`.
    runtimes : bool or str
        Compute and append the processing run-time for each pair to the
        progress message (use ``runtimes=True``). Pass in a file name (e.g.
        ``runtimes="file.txt"``) to save these run-times to file. The idea here
        is to use the knowledge of run-times to split the main process in a
        number of sub-processes that can each be launched in a different
        processor of your multi-core machine. See the MATLAB function
        `split_file4coes.m
        <https://github.com/GenericMappingTools/gmt/blob/master/src/x2sys/>`_.
    override : bool or str
        **S**\|\ **N**.
        Control how geographic coordinates are handled (Cartesian data are
        unaffected). By default, we determine if the data are closer to one
        pole than the other, and then we use a cylindrical polar conversion to
        avoid problems with longitude jumps. You can turn this off entirely
        with ``override`` and then the calculations uses the original data (we
        have protections against longitude jumps). However, you can force the
        selection of the pole for the projection by appending **S** or **N**
        for the south or north pole, respectively. The conversion is used
        because the algorithm used to find crossovers is inherently a
        Cartesian algorithm that can run into trouble with data that has large
        longitudinal range at higher latitudes.
    interpolation : str
        **l**\|\ **a**\|\ **c**.
        Sets the interpolation mode for estimating values at the crossover.
        Choose among:
        - **l**: Linear interpolation [Default].
        - **a**: Akima spline interpolation.
        - **c**: Cubic spline interpolation.
    coe : str
        Use **e** for external COEs only, and **i** for internal COEs only
        [Default is all COEs].
    {region}
    speed : str or list
        **l**\|\ **u**\|\ **h**\ *speed*.
        Defines window of track speeds. If speeds are outside this window we do
        not calculate a COE. Specify:
        - **l** sets lower speed [Default is 0].
        - **u** sets upper speed [Default is infinity].
        - **h** does not limit the speed but sets a lower speed below which
          headings will not be computed (i.e., set to NaN) [Default
          calculates headings regardless of speed].
        For example, you can use ``speed=["l0", "u10", "h5"]`` to set a lower
        speed of 0, upper speed of 10, and disable heading calculations for
        speeds below 5.
    {verbose}
    numpoints : int
        Give the maximum number of data points on either side of the crossover
        to use in the spline interpolation [Default is 3].
    trackvalues : bool
        Report the values of each track at the crossover [Default reports the
        crossover value and the mean value].
    Returns
    -------
    crossover_errors
        Table containing crossover error information. A :class:`pandas.DataFrame` object
        is returned if ``outfile`` is not set, otherwise ``None`` is returned and output
        will be stored in file set by ``outfile``.
    """
    # Determine output type based on 'outfile' parameter
    output_type = "pandas" if outfile is None else "file"
    file_contexts: list[contextlib.AbstractContextManager[Any]] = []
    for track in tracks:
        match data_kind(track):
            case "file":
                file_contexts.append(contextlib.nullcontext(track))
            case "vectors":
                # find suffix (-E) of trackfiles used (e.g. xyz, csv, etc) from
                # $X2SYS_HOME/TAGNAME/TAGNAME.tag file
                tagfile = Path(
                    os.environ["X2SYS_HOME"], kwargs["T"], f"{kwargs['T']}.tag"
                )
                # Last line is like "-Dxyz -Etsv -I1/1"
                lastline = tagfile.read_text(encoding="utf8").splitlines()[-1]
                for item in sorted(lastline.split()):  # sort list alphabetically
                    if item.startswith(("-E", "-D")):  # prefer -Etsv over -Dxyz
                        suffix = item[2:]  # e.g. tsv (1st choice) or xyz (2nd choice)
                # Save pandas.DataFrame track data to temporary file
                file_contexts.append(tempfile_from_dftrack(track=track, suffix=suffix))
            case _:
                raise GMTTypeError(type(track))
    aliasdict = AliasSystem().add_common(
        R=region,
        V=verbose,
    )
    aliasdict.merge(kwargs)
    with Session() as lib:
        with lib.virtualfile_out(kind="dataset", fname=outfile) as vouttbl:
            with contextlib.ExitStack() as stack:
                fnames = [stack.enter_context(c) for c in file_contexts]
                lib.call_module(
                    module="x2sys_cross",
                    args=build_arg_list(aliasdict, infile=fnames, outfile=vouttbl),
                )
                result = lib.virtualfile_to_dataset(
                    vfname=vouttbl, output_type=output_type, header=2
                )
            if output_type == "file":
                return result
            # Convert 3rd and 4th columns to datetime/timedelta for pandas output.
            # These two columns have names "t_1"/"t_2" or "i_1"/"i_2".
            # "t_" means absolute datetimes and "i_" means dummy times.
            # Internally, they are all represented as double-precision numbers in GMT,
            # relative to TIME_EPOCH with the unit defined by TIME_UNIT.
            # In GMT, TIME_UNIT can be 'y' (year), 'o' (month), 'w' (week), 'd' (day),
            # 'h' (hour), 'm' (minute), 's' (second). Years are 365.2425 days and months
            # are of equal length.
            # pd.to_timedelta() supports unit of 'W'/'D'/'h'/'m'/'s'/'ms'/'us'/'ns'.
            match time_unit := lib.get_default("TIME_UNIT"):
                case "y":
                    unit = "s"
                    scale = 365.2425 * 86400.0
                case "o":
                    unit = "s"
                    scale = 365.2425 / 12.0 * 86400.0
                case "w" | "d" | "h" | "m" | "s":
                    unit = time_unit.upper() if time_unit in "wd" else time_unit
                    scale = 1.0
            if len(result) > 0:  # if crossovers exist (more than one output row)
                columns: pd.Index = result.columns[2:4]  # i_1/i_2 or t_1/t_2 columns
                result[columns] *= scale
                result[columns] = result[columns].apply(pd.to_timedelta, unit=unit)
                if columns[0][0] == "t":  # "t" or "i":
                    result[columns] += pd.Timestamp(lib.get_default("TIME_EPOCH"))
            return result