Source code for curvesimilarities.integfrechet

"""Integral Fréchet distance."""

import functools

import numpy as np
from numba import njit
from numba.np.extensions import cross2d

from .util import sanitize_vertices

__all__ = [
    "ifd",
    "ifd_owp",
]


EPSILON = np.finfo(np.float64).eps


def sanitize_vertices_ifd(degenerate_fallback, owp):
    """Decorator to sanitize the vertices for IFD and its variants.

    The integral-based Fréchet distance variants must return NaN for two
    degenerate curves with zero-lengths to avoid returning false zero-value.
    Use this decorator with :func:`sanitize_vertices`.
    """

    def decorator(func):

        @functools.wraps(func)
        def wrapper(P, Q, *args, **kwargs):
            if len(P) != 1 and len(Q) != 1:
                return func(P, Q, *args, **kwargs)
            elif len(P) == 1 and len(Q) == 1:
                if owp:
                    return np.float64(np.nan), np.empty((0, 2), dtype=np.float64)
                else:
                    return np.float64(np.nan)

            # degenerate cases
            if len(P) == 1:
                curve, point, curve_idx = Q, P[0], 1
            else:
                curve, point, curve_idx = P, Q[0], 0

            dist = degenerate_fallback(curve, point)
            if owp:
                path = np.zeros((2, 2), dtype=np.float64)
                curve_len = np.sum(np.linalg.norm(np.diff(curve, axis=0), axis=-1))
                path[1, curve_idx] = curve_len
                return dist, path
            else:
                return dist

        return wrapper

    return decorator


@njit(cache=True)
def ifd_degenerate(curve, point):
    ret = 0
    for i in range(len(curve) - 1):
        a, b = curve[i], curve[i + 1]
        ret += _line_point_integrate(a, b, point)
    return ret


[docs] @sanitize_vertices(owp=False) @sanitize_vertices_ifd(ifd_degenerate, owp=False) def ifd(P, Q, delta): r"""Integral Fréchet distance between two open polygonal curves. Let :math:`f, g: [0, 1] \to \Omega` be curves defined in a metric space :math:`\Omega`. Let :math:`\alpha, \beta: [0, 1] \to [0, 1]` be continuous non-decreasing surjections, and define :math:`\pi: [0, 1] \to [0, 1] \times [0, 1]` such that :math:`\pi(t) = \left(\alpha(t), \beta(t)\right)`. The integral Fréchet distance between :math:`f` and :math:`g` is defined as .. math:: \inf_{\pi} \int_0^1 \delta\left(\pi(t)\right) \cdot \lVert \pi'(t) \rVert \mathrm{d}t, where :math:`\delta\left(\pi(t)\right)` is a distance between :math:`f\left(\alpha(t)\right)` and :math:`g\left(\beta(t)\right)` in :math:`\Omega` and :math:`\lVert \cdot \rVert` is a norm in :math:`[0, 1] \times [0, 1]`. In this implementation, we use the Euclidean distance for :math:`\delta` and the Manhattan norm for :math:`\lVert \cdot \rVert`. Parameters ---------- P : array_like A :math:`p` by :math:`n` array of :math:`p` vertices in an :math:`n`-dimensional space. Q : array_like A :math:`q` by :math:`n` array of :math:`q` vertices in an :math:`n`-dimensional space. delta : double Maximum length of edges between Steiner points. Returns ------- dist : double The integral Fréchet distance between *P* and *Q*, NaN if any vertice is empty or both vertices consist of a single point. Raises ------ ValueError If *P* and *Q* are not 2-dimensional arrays with same number of columns. See Also -------- ifd_owp : Integral Fréchet distance with optimal warping path. Notes ----- This function implements the algorithm of Brankovic et al [#]_. References ---------- .. [#] Brankovic, M., et al. "(k, l)-Medians Clustering of Trajectories Using Continuous Dynamic Time Warping." Proceedings of the 28th International Conference on Advances in Geographic Information Systems. 2020. Examples -------- >>> ifd([[0, 0], [0.5, 0], [1, 0]], [[0, 1], [1, 1]], 0.1) 2.0 """ ret = _ifd( *_sample_ifd_pts(P, Q, delta), _line_point_integrate, _line_line_integrate ) return float(ret)
@njit(cache=True) def _ifd( P_edge_len, P_subedges_num, P_pts, Q_edge_len, Q_subedges_num, Q_pts, line_point_cost, line_line_cost, ): NP = len(P_subedges_num) + 1 P_vert_indices = np.empty(NP, dtype=np.int_) P_vert_indices[0] = 0 P_vert_indices[1:] = np.cumsum(P_subedges_num) NQ = len(Q_subedges_num) + 1 Q_vert_indices = np.empty(NQ, dtype=np.int_) Q_vert_indices[0] = 0 Q_vert_indices[1:] = np.cumsum(Q_subedges_num) # Cost containers; elements will be updated. P_costs = np.empty(len(P_pts), dtype=np.float64) P_costs[0] = 0 Q_costs = np.empty(len(Q_pts), dtype=np.float64) Q_costs[0] = 0 # TODO: parallelize this i-loop. # Must ensure that cell (i - 1, j) is computed before (i, j). p0 = P_costs[:1] # will be updated during i-loop when j == 0 for i in range(NP - 1): p_pts = P_pts[P_vert_indices[i] : P_vert_indices[i + 1] + 1] q0 = Q_costs[:1] # will be updated during j-loop for j in range(NQ - 1): q_pts = Q_pts[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1] if j == 0: p_costs = np.concatenate( (p0, P_costs[P_vert_indices[i] + 1 : P_vert_indices[i + 1] + 1]) ) else: p_costs = P_costs[P_vert_indices[i] : P_vert_indices[i + 1] + 1].copy() q_costs = np.concatenate( (q0, Q_costs[Q_vert_indices[j] + 1 : Q_vert_indices[j + 1] + 1]) ) p1, q1 = _cell_owcs( P_edge_len[i], p_pts, p_costs, P_costs[P_vert_indices[i] : P_vert_indices[i + 1] + 1], Q_edge_len[j], q_pts, q_costs, Q_costs[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1], i == 0, j == 0, i == NP - 2, j == NQ - 2, line_point_cost, line_line_cost, ) # store for the next loops if j == 0: p0 = p1 q0 = q1 return Q_costs[-1] @njit(cache=True) def _cell_owcs( L1, p_pts, p_costs, p_costs_out, L2, q_pts, q_costs, q_costs_out, p_is_initial, q_is_initial, p_is_last, q_is_last, line_point_cost, line_line_cost, ): """Optimal warping costs for all points in an cell.""" # p_costs = lower boundary, p_costs_out = upper boundary, # q_costs = left boundary, q_costs_out = right boundary of the cell. P1, Q1, u, v, b, delta_P, delta_Q = _cell_info(p_pts, L1, q_pts, L2) # Will be reused for each border point (t) to find best starting point (s). p_cost_candidates = np.empty(len(p_pts), dtype=np.float64) q_cost_candidates = np.empty(len(q_pts), dtype=np.float64) s = np.empty((2,), dtype=np.float64) t = np.empty((2,), dtype=np.float64) # compute upper boundary t[1] = L2 if q_is_last: # No need steiner points on upper boundary. Just check corner point. start_idx = len(p_pts) - 1 else: start_idx = 0 for i in range(start_idx, len(p_pts)): # Fill p_costs_out[i] t[0] = delta_P * i s[0] = 0 if p_is_initial: # No steiner points on left boundary; just check [0, 0] q_end_idx = 1 else: q_end_idx = len(q_pts) for j in range(0, q_end_idx): s[1] = delta_Q * j _, cost = _st_owp(P1, u, Q1, v, b, s, t, line_point_cost, line_line_cost) q_cost_candidates[j] = q_costs[j] + cost s[1] = 0 if q_is_initial: # No steiner points on bottom boundary; just check [0, 0] p_end_idx = 1 else: p_end_idx = i + 1 p_cost_candidates[0] = q_cost_candidates[0] # cost from [0, 0] already known. for i_ in range(1, p_end_idx): # let bottom border points be (s). (to right) s[0] = delta_P * i_ _, cost = _st_owp(P1, u, Q1, v, b, s, t, line_point_cost, line_line_cost) p_cost_candidates[i_] = p_costs[i_] + cost p_costs_out[i] = min( np.min(p_cost_candidates[:p_end_idx]), np.min(q_cost_candidates[:q_end_idx]) ) # compute right boundary t[0] = L1 if p_is_last: # No need steiner points on right boundary. Just check corner point. start_idx = len(q_pts) - 1 elif q_is_initial: start_idx = 0 else: # LR corner already computed by the lower cell start_idx = 1 # Don't need to compute the last j (already done by P loop just above) for j in range(start_idx, len(q_pts) - 1): t[1] = delta_Q * j s[1] = 0 if q_is_initial: # No steiner points on bottom boundary; just check [0, 0] p_end_idx = 1 else: p_end_idx = len(p_pts) for i in range(0, p_end_idx): s[0] = delta_P * i _, cost = _st_owp(P1, u, Q1, v, b, s, t, line_point_cost, line_line_cost) p_cost_candidates[i] = p_costs[i] + cost s[0] = 0 if p_is_initial: # No steiner points on left boundary; just check [0, 0] q_end_idx = 1 else: q_end_idx = j + 1 q_cost_candidates[0] = p_cost_candidates[0] # cost from [0, 0] already known. for j_ in range(1, q_end_idx): # cost from [0, 0] already known. s[1] = delta_Q * j_ _, cost = _st_owp(P1, u, Q1, v, b, s, t, line_point_cost, line_line_cost) q_cost_candidates[j_] = q_costs[j_] + cost q_costs_out[j] = min( np.min(p_cost_candidates[:p_end_idx]), np.min(q_cost_candidates[:q_end_idx]) ) q_costs_out[-1] = p_costs_out[-1] # Lower-right corner and upper-left corner of cells. return q_costs_out[:1], p_costs_out[:1]
[docs] @sanitize_vertices(owp=True) @sanitize_vertices_ifd(ifd_degenerate, owp=True) def ifd_owp(P, Q, delta): """Integral Fréchet distance and its optimal warping path. Parameters ---------- P : array_like A :math:`p` by :math:`n` array of :math:`p` vertices in an :math:`n`-dimensional space. Q : array_like A :math:`q` by :math:`n` array of :math:`q` vertices in an :math:`n`-dimensional space. delta : double Maximum length of edges between Steiner points. Returns ------- dist : double The integral Fréchet distance between *P* and *Q*, NaN if any vertice is empty or both vertices consist of a single point. owp : ndarray Optimal warping path, empty if any vertice is empty or both vertices consist of a single point. Raises ------ ValueError If *P* and *Q* are not 2-dimensional arrays with same number of columns. Examples -------- >>> dist, path = ifd_owp([[0, 0], [0.5, 0], [1, 0]], [[0.5, 1], [1.5, 1]], 0.1) >>> import matplotlib.pyplot as plt #doctest: +SKIP >>> plt.plot(*path.T) #doctest: +SKIP """ dist, owp = _ifd_owp( *_sample_ifd_pts(P, Q, delta), _line_point_integrate, _line_line_integrate ) return float(dist), _refine_path(owp)
@njit(cache=True) def _ifd_owp( P_edge_len, P_subedges_num, P_pts, Q_edge_len, Q_subedges_num, Q_pts, line_point_cost, line_line_cost, ): # Same as _ifd(), but stores paths so needs more memory. NP = len(P_subedges_num) + 1 P_vert_indices = np.empty(NP, dtype=np.int_) P_vert_indices[0] = 0 P_vert_indices[1:] = np.cumsum(P_subedges_num) NQ = len(Q_subedges_num) + 1 Q_vert_indices = np.empty(NQ, dtype=np.int_) Q_vert_indices[0] = 0 Q_vert_indices[1:] = np.cumsum(Q_subedges_num) # Cost containers; elements will be updated. P_costs = np.empty(len(P_pts), dtype=np.float64) P_costs[0] = 0 Q_costs = np.empty(len(Q_pts), dtype=np.float64) Q_costs[0] = 0 # Path containers; elements will be updated. # Path passes (NP + NQ - 3) cells, and has 4 vertices in each cell ([s, cs, ct, t] # or [s, c', c', t]). (NP + NQ - 4) vertices overlap. MAX_PATH_VERT_NUM = (NP + NQ - 3) * 4 - (NP + NQ - 4) P_paths = np.empty((len(P_pts), MAX_PATH_VERT_NUM, 2), dtype=np.float64) P_paths[0, 0] = [0, 0] Q_paths = np.empty((len(Q_pts), MAX_PATH_VERT_NUM, 2), dtype=np.float64) Q_paths[0, 0] = [0, 0] # TODO: parallelize this i-loop. p0_cost = P_costs[:1] p0_path = P_paths[:1, :1] for i in range(NP - 1): p_pts = P_pts[P_vert_indices[i] : P_vert_indices[i + 1] + 1] q0_cost = Q_costs[:1] q0_path = Q_paths[:1, : 3 * i + 1] for j in range(NQ - 1): q_pts = Q_pts[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1] pc = 3 * (i + j) + 1 # path count if j == 0: p_costs = np.concatenate( ( p0_cost, P_costs[P_vert_indices[i] + 1 : P_vert_indices[i + 1] + 1], ) ) p_paths = np.concatenate( ( p0_path, P_paths[P_vert_indices[i] + 1 : P_vert_indices[i + 1] + 1, :pc], ) ) else: p_costs = P_costs[P_vert_indices[i] : P_vert_indices[i + 1] + 1].copy() p_paths = P_paths[ P_vert_indices[i] : P_vert_indices[i + 1] + 1, :pc ].copy() q_costs = np.concatenate( (q0_cost, Q_costs[Q_vert_indices[j] + 1 : Q_vert_indices[j + 1] + 1]) ) q_paths = np.concatenate( ( q0_path, Q_paths[Q_vert_indices[j] + 1 : Q_vert_indices[j + 1] + 1, :pc], ) ) p1_cost, p1_path, q1_cost, q1_path = _cell_owps( P_edge_len[i], p_pts, p_costs, P_costs[P_vert_indices[i] : P_vert_indices[i + 1] + 1], p_paths, P_paths[P_vert_indices[i] : P_vert_indices[i + 1] + 1, : pc + 3], Q_edge_len[j], q_pts, q_costs, Q_costs[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1], q_paths, Q_paths[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1, : pc + 3], i == 0, j == 0, i == NP - 2, j == NQ - 2, line_point_cost, line_line_cost, ) if j == 0: p0_cost = p1_cost p0_path = p1_path q0_cost = q1_cost q0_path = q1_path return Q_costs[-1], Q_paths[-1] @njit(cache=True) def _cell_owps( L1, p_pts, p_costs, p_costs_out, p_paths, p_paths_out, L2, q_pts, q_costs, q_costs_out, q_paths, q_paths_out, p_is_initial, q_is_initial, p_is_last, q_is_last, line_point_cost, line_line_cost, ): """Optimal warping paths and their costs for all points in an cell.""" P1, Q1, u, v, b, delta_P, delta_Q = _cell_info(p_pts, L1, q_pts, L2) p_cost_candidates = np.empty(len(p_pts), dtype=np.float64) q_cost_candidates = np.empty(len(q_pts), dtype=np.float64) p_path_candidates = np.empty((len(p_pts), 3, 2), dtype=np.float64) q_path_candidates = np.empty((len(q_pts), 3, 2), dtype=np.float64) s = np.empty((2,), dtype=np.float64) t = np.empty((2,), dtype=np.float64) # compute upper boundary t[1] = L2 if q_is_last: # No need steiner points on upper boundary. Just check corner point. start_idx = len(p_pts) - 1 else: start_idx = 0 for i in range(start_idx, len(p_pts)): # Fill p_costs_out[i] t[0] = delta_P * i s[0] = 0 if p_is_initial: # No steiner points on left boundary; just check [0, 0] q_end_idx = 1 else: q_end_idx = len(q_pts) for j in range(0, q_end_idx): s[1] = delta_Q * j vert, cost = _st_owp(P1, u, Q1, v, b, s, t, line_point_cost, line_line_cost) q_cost_candidates[j] = q_costs[j] + cost q_path_candidates[j] = vert[1:] - vert[0] s[1] = 0 if q_is_initial: # No steiner points on bottom boundary; just check [0, 0] p_end_idx = 1 else: p_end_idx = i + 1 p_cost_candidates[0] = q_cost_candidates[0] # cost from [0, 0] already known. p_path_candidates[0] = q_path_candidates[0] # path from [0, 0] already known. for i_ in range(1, p_end_idx): # let bottom border points be (s). (to right) s[0] = delta_P * i_ vert, cost = _st_owp(P1, u, Q1, v, b, s, t, line_point_cost, line_line_cost) p_cost_candidates[i_] = p_costs[i_] + cost p_path_candidates[i_] = vert[1:] - vert[0] p_min_idx = np.argmin(p_cost_candidates[:p_end_idx]) q_min_idx = np.argmin(q_cost_candidates[:q_end_idx]) from_p_mincost = p_cost_candidates[p_min_idx] from_q_mincost = q_cost_candidates[q_min_idx] if from_p_mincost > from_q_mincost: mincost = from_q_mincost prevpath = q_paths[q_min_idx] minpath = q_path_candidates[q_min_idx] else: mincost = from_p_mincost prevpath = p_paths[p_min_idx] minpath = p_path_candidates[p_min_idx] p_costs_out[i] = mincost p_paths_out[i, :-3] = prevpath p_paths_out[i, -3:] = prevpath[-1] + minpath # compute right boundary t[0] = L1 if p_is_last: # No need steiner points on right boundary. Just check corner point. start_idx = len(q_pts) - 1 elif q_is_initial: start_idx = 0 else: # LR corner already computed by the lower cell (MUST exclude for path track) start_idx = 1 # Don't need to compute the last j (already done by P loop just above) for j in range(start_idx, len(q_pts) - 1): t[1] = delta_Q * j s[1] = 0 if q_is_initial: # No steiner points on bottom boundary; just check [0, 0] p_end_idx = 1 else: p_end_idx = len(p_pts) for i in range(0, p_end_idx): s[0] = delta_P * i vert, cost = _st_owp(P1, u, Q1, v, b, s, t, line_point_cost, line_line_cost) p_cost_candidates[i] = p_costs[i] + cost p_path_candidates[i] = vert[1:] - vert[0] s[0] = 0 if p_is_initial: # No steiner points on left boundary; just check [0, 0] q_end_idx = 1 else: q_end_idx = j + 1 q_cost_candidates[0] = p_cost_candidates[0] # cost from [0, 0] already known. q_path_candidates[0] = p_path_candidates[0] # path from [0, 0] already known. for j_ in range(1, q_end_idx): # cost from [0, 0] already known. s[1] = delta_Q * j_ vert, cost = _st_owp(P1, u, Q1, v, b, s, t, line_point_cost, line_line_cost) q_cost_candidates[j_] = q_costs[j_] + cost q_path_candidates[j_] = vert[1:] - vert[0] p_min_idx = np.argmin(p_cost_candidates[:p_end_idx]) q_min_idx = np.argmin(q_cost_candidates[:q_end_idx]) from_p_mincost = p_cost_candidates[p_min_idx] from_q_mincost = q_cost_candidates[q_min_idx] if from_p_mincost > from_q_mincost: mincost = from_q_mincost prevpath = q_paths[q_min_idx] minpath = q_path_candidates[q_min_idx] else: mincost = from_p_mincost prevpath = p_paths[p_min_idx] minpath = p_path_candidates[p_min_idx] q_costs_out[j] = mincost q_paths_out[j, :-3] = prevpath q_paths_out[j, -3:] = prevpath[-1] + minpath q_costs_out[-1] = p_costs_out[-1] q_paths_out[-1] = p_paths_out[-1] return q_costs_out[:1], q_paths_out[:1], p_costs_out[:1], p_paths_out[:1] @njit(cache=True) def _st_owp(P1, u, Q1, v, b, s, t, line_point_cost, line_line_cost): """Optimal warping path between two points in a cell and its cost.""" # Find steiner points in curve space P_s = P1 + u * s[0] P_t = P1 + u * t[0] Q_s = Q1 + v * s[1] Q_t = Q1 + v * t[1] if s[1] > s[0] + b: cs = np.array([s[1] - b, s[1]]) else: cs = np.array([s[0], s[0] + b]) if t[1] < t[0] + b: ct = np.array([t[1] - b, t[1]]) else: ct = np.array([t[0], t[0] + b]) if cs[0] < ct[0]: # pass through lm P_cs = P1 + u * cs[0] P_ct = P1 + u * ct[0] Q_cs = Q1 + v * cs[1] Q_ct = Q1 + v * ct[1] if s[1] > s[0] + b: # right s_to_cs = line_point_cost(P_s, P_cs, Q_s) else: # up s_to_cs = line_point_cost(Q_s, Q_cs, P_s) cs_to_ct = line_line_cost(P_cs, P_ct, Q_cs, Q_ct) if t[1] > t[0] + b: # up ct_to_t = line_point_cost(Q_ct, Q_t, P_t) else: # right ct_to_t = line_point_cost(P_ct, P_t, Q_t) cost = s_to_cs + cs_to_ct + ct_to_t verts = np.stack((s, cs, ct, t)) else: # pass c' if s[1] > s[0] + b: # right -> up cost = line_point_cost(P_s, P_t, Q_s) + line_point_cost(Q_s, Q_t, P_t) c_prime = np.array((t[0], s[1])) else: # up -> right cost = line_point_cost(Q_s, Q_t, P_s) + line_point_cost(P_s, P_t, Q_t) c_prime = np.array((s[0], t[1])) # Force len=4 because homogeneous output type is easier to handle verts = np.stack((s, c_prime, c_prime, t)) return verts, cost def _sample_ifd_pts(P, Q, delta): # No need to add Steiner points if the other polyline is just a line segment. if len(Q) == 2: P_edge_len = np.linalg.norm(np.diff(P, axis=0), axis=-1) P_subedges_num = np.ones(len(P) - 1, dtype=np.int_) P_pts = P else: P_edge_len, P_subedges_num, P_pts = _sample_pts(P, delta) if len(P) == 2: Q_edge_len = np.linalg.norm(np.diff(Q, axis=0), axis=-1) Q_subedges_num = np.ones(len(Q) - 1, dtype=np.int_) Q_pts = Q else: Q_edge_len, Q_subedges_num, Q_pts = _sample_pts(Q, delta) return ( P_edge_len, P_subedges_num, P_pts, Q_edge_len, Q_subedges_num, Q_pts, ) @njit(cache=True) def _sample_pts(vert, delta): N, D = vert.shape vert_diff = vert[1:] - vert[:-1] edge_lens = np.empty(N - 1, dtype=np.float64) for i in range(N - 1): edge_lens[i] = np.linalg.norm(vert_diff[i]) subedges_num = np.ceil(edge_lens / delta).astype(np.int_) pts = np.empty((np.sum(subedges_num) + 1, D), dtype=np.float64) count = 0 for cell_idx in range(N - 1): P0 = vert[cell_idx] v = vert_diff[cell_idx] n = subedges_num[cell_idx] for i in range(n): pts[count + i] = P0 + (i / n) * v count += n pts[count] = vert[N - 1] return edge_lens, subedges_num, pts @njit(cache=True) def _cell_info(P_pts, L1, Q_pts, L2): P1 = P_pts[0] P2 = P_pts[-1] Q1 = Q_pts[0] Q2 = Q_pts[-1] P1P2 = P2 - P1 Q1Q2 = Q2 - Q1 if L1 < EPSILON: u = np.array([0, 0], np.float64) else: u = (P1P2) / L1 if L2 < EPSILON: v = np.array([0, 0], np.float64) else: v = (Q1Q2) / L2 # Find lm: y = x + b. # Can be acquired by finding points where distance is minimum. w = P1 - Q1 u_dot_v = np.dot(u, v) if np.abs(1 - u_dot_v**2) > EPSILON: # Find points P(s) and Q(t) where P and Q intersects. # (s, t) is on y = x + b A = np.array([[1, -u_dot_v], [-u_dot_v, 1]], dtype=np.float64) B = np.array([-np.dot(u, w), np.dot(v, w)], dtype=np.float64) s, t = np.linalg.solve(A, B) b = t - s else: # P and Q are parallel; equations degenerate into s - (u.v)t = -u.w b = np.dot(u, w) / u_dot_v delta_P = L1 / (len(P_pts) - 1) delta_Q = L2 / (len(Q_pts) - 1) return P1, Q1, u, v, b, delta_P, delta_Q @njit(cache=True) def _refine_path(path): prev = path[0] count = 1 for i in range(1, len(path)): current = path[i] if np.all(prev == current): continue else: path[count] = current prev = current count += 1 return path[:count] @njit(cache=True) def _line_point_integrate(a, b, p): r"""Analytic integration from AP to BP. .. math:: \int_0^1 \lVert (A - P) + (B - A) t \rVert \cdot \lVert (B - A) \rVert dt """ # Goal: integrate sqrt(A*t**2 + B*t + C) * sqrt(A) dt over t [0, 1] # where A = dot(ab, ab), B = 2 * dot(pa, ab) and C = dot(pa, pa). # Can be simplified to A * integral sqrt(t**2 + B/A*t + C/A) dt. # Rewrite: A * integral sqrt(t**2 + B*t + C) dt over t [0, 1] # where B = 2 * dot(pa, ab) / A and C = dot(pa, pa) / A. ab = b - a A = np.dot(ab, ab) if A < EPSILON: # Degenerate: ab does not form line segement. return 0 pa = a - p if np.abs(cross2d(ab, pa)) < EPSILON: # Degenerate: a, b, p all on a same line. t = np.dot(ab, pa) / A L1 = np.dot(pa, pa) bp = p - b L2 = np.dot(bp, bp) if t < 0: return (L2 - L1) / 2 elif t > 1: return (L1 - L2) / 2 else: return (L1 + L2) / 2 B = 2 * np.dot(ab, pa) / A C = np.dot(pa, pa) / A integ = ( 4 * np.sqrt(1 + B + C) + 2 * B * (-np.sqrt(C) + np.sqrt(1 + B + C)) - (B**2 - 4 * C) * np.log((2 + B + 2 * np.sqrt(1 + B + C)) / (B + 2 * np.sqrt(C))) ) / 8 return A * integ @njit(cache=True) def _line_line_integrate(a, b, c, d): r"""Analytic integration from AC to BD. .. math:: \int_0^1 \lVert (A - C) + (B - A + C - D)t \rVert \cdot \left( \lVert B - A \rVert + \lVert D - C \rVert \right) dt """ # Goal: integrate sqrt(A*t**2 + B*t + C) * (sqrt(D) + sqrt(E)) dt over t [0, 1] # where A = dot(vu, vu), B = 2 * dot(vu, w), C = dot(w, w), D = dot(u, u), # and E = dot(v, v); where u = b - a, v = d - c and w = a - c. # Rewrite: (sqrt(A*D) + sqrt(A*E)) * integral sqrt(t**2 + B*t + C) dt over t [0, 1] # where B = 2 * dot(vu, w) / A and C = dot(w, w) / A u, v, w = b - a, d - c, a - c vu = u - v A = np.dot(vu, vu) C, D, E = np.dot(w, w), np.dot(u, u), np.dot(v, v) if A < EPSILON: # Degenerate: ab and cd has same direction and magnitude return np.sqrt(C * D) + np.sqrt(C * E) B, C = 2 * np.dot(vu, w) / A, C / A if np.abs(B) < EPSILON and C < EPSILON: # Degenerate: B and C are 0 (either w = 0 or A is too large) return (np.sqrt(A * D) + np.sqrt(A * E)) / 2 numer = 2 + B + 2 * np.sqrt(1 + B + C) denom = B + 2 * np.sqrt(C) if np.abs(numer) < EPSILON or np.abs(denom) < EPSILON: # Degenerate: u-v and w are on the opposite direction # B**2 = 4*C, therefore integral sqrt((t + B/2)**2) dt over t [0, 1] if B > 0 or B < -2: integ = (np.abs(B / 2) + np.abs(1 + B / 2)) / 2 else: integ = (np.abs(B / 2) ** 2) / 2 + (np.abs(1 + B / 2) ** 2) / 2 else: integ = ( 4 * np.sqrt(1 + B + C) + 2 * B * (-np.sqrt(C) + np.sqrt(1 + B + C)) - (B**2 - 4 * C) * np.log(numer / denom) ) / 8 return np.sqrt(A * D) + np.sqrt(A * E) * integ