"""Integral Fréchet distance."""
import functools
import numpy as np
from numba import njit
from .util import sanitize_vertices
__all__ = [
"ifd",
"ifd_owp",
]
EPSILON = np.finfo(np.float64).eps
def sanitize_vertices_ifd(degenerate_fallback, owp):
"""Decorator to sanitize the vertices for IFD and its variants.
The integral-based Fréchet distance variants must return NaN for two
degenerate curves with zero-lengths to avoid returning false zero-value.
Use this decorator with :func:`sanitize_vertices`.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(P, Q, *args, **kwargs):
if len(P) != 1 and len(Q) != 1:
return func(P, Q, *args, **kwargs)
elif len(P) == 1 and len(Q) == 1:
if owp:
return np.float64(np.nan), np.empty((0, 2), dtype=np.float64)
else:
return np.float64(np.nan)
# degenerate cases
if len(P) == 1:
curve, point, curve_idx = Q, P[0], 1
else:
curve, point, curve_idx = P, Q[0], 0
dist = degenerate_fallback(curve, point)
if owp:
path = np.zeros((2, 2), dtype=np.float64)
curve_len = np.sum(np.linalg.norm(np.diff(curve, axis=0), axis=-1))
path[1, curve_idx] = curve_len
return dist, path
else:
return dist
return wrapper
return decorator
@njit(cache=True)
def ifd_degenerate(curve, point):
ret = 0
for i in range(len(curve) - 1):
a, b = curve[i], curve[i + 1]
ret += _line_point_square_integrate(a, b, point)
return ret
[docs]
@sanitize_vertices(owp=False)
@sanitize_vertices_ifd(ifd_degenerate, owp=False)
def ifd(P, Q, delta):
r"""Integral Fréchet distance between two open polygonal curves.
Let :math:`f, g: [0, 1] \to \Omega` be curves defined in a metric space
:math:`\Omega`. Let :math:`\alpha, \beta: [0, 1] \to [0, 1]` be continuous
non-decreasing surjections, and define :math:`\pi: [0, 1] \to [0, 1] \times
[0, 1]` such that :math:`\pi(t) = \left(\alpha(t), \beta(t)\right)`.
The integral Fréchet distance between :math:`f` and :math:`g` is defined as
.. math::
\inf_{\pi} \int_0^1
\delta\left(\pi(t)\right) \cdot
\lVert \pi'(t) \rVert
\mathrm{d}t,
where :math:`\delta\left(\pi(t)\right)` is a distance between
:math:`f\left(\alpha(t)\right)` and :math:`g\left(\beta(t)\right)` in
:math:`\Omega` and :math:`\lVert \cdot \rVert` is a norm in :math:`[0, 1]
\times [0, 1]`. In this implementation, we use the squared Euclidean distance
for :math:`\delta` and the Manhattan norm for :math:`\lVert \cdot \rVert`.
Parameters
----------
P : array_like
A :math:`p` by :math:`n` array of :math:`p` vertices in an
:math:`n`-dimensional space.
Q : array_like
A :math:`q` by :math:`n` array of :math:`q` vertices in an
:math:`n`-dimensional space.
delta : double
Maximum length of edges between Steiner points.
Returns
-------
dist : double
The integral Fréchet distance between *P* and *Q*, NaN if any vertice
is empty or both vertices consist of a single point.
Raises
------
ValueError
If *P* and *Q* are not 2-dimensional arrays with same number of columns.
See Also
--------
ifd_owp : Integral Fréchet distance with optimal warping path.
Notes
-----
This function implements the algorithm of Brankovic et al [#]_.
References
----------
.. [#] Brankovic, M., et al. "(k, l)-Medians Clustering of Trajectories Using
Continuous Dynamic Time Warping." Proceedings of the 28th International
Conference on Advances in Geographic Information Systems. 2020.
Examples
--------
>>> ifd([[0, 0], [0.5, 0], [1, 0]], [[0, 1], [1, 1]], 0.1)
2.0
"""
ret = _ifd(*_sample_ifd_pts(P, Q, delta))
return float(ret)
@njit(cache=True)
def _ifd(
P_edge_len,
P_subedges_num,
P_pts,
Q_edge_len,
Q_subedges_num,
Q_pts,
):
NP = len(P_subedges_num) + 1
P_vert_indices = np.empty(NP, dtype=np.int_)
P_vert_indices[0] = 0
P_vert_indices[1:] = np.cumsum(P_subedges_num)
NQ = len(Q_subedges_num) + 1
Q_vert_indices = np.empty(NQ, dtype=np.int_)
Q_vert_indices[0] = 0
Q_vert_indices[1:] = np.cumsum(Q_subedges_num)
# Cost containers; elements will be updated.
P_costs = np.empty(len(P_pts), dtype=np.float64)
P_costs[0] = 0
Q_costs = np.empty(len(Q_pts), dtype=np.float64)
Q_costs[0] = 0
# TODO: parallelize this i-loop.
# Must ensure that cell (i - 1, j) is computed before (i, j).
p0 = P_costs[:1] # will be updated during i-loop when j == 0
for i in range(NP - 1):
p_pts = P_pts[P_vert_indices[i] : P_vert_indices[i + 1] + 1]
q0 = Q_costs[:1] # will be updated during j-loop
for j in range(NQ - 1):
q_pts = Q_pts[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1]
if j == 0:
p_costs = np.concatenate(
(p0, P_costs[P_vert_indices[i] + 1 : P_vert_indices[i + 1] + 1])
)
else:
p_costs = P_costs[P_vert_indices[i] : P_vert_indices[i + 1] + 1].copy()
q_costs = np.concatenate(
(q0, Q_costs[Q_vert_indices[j] + 1 : Q_vert_indices[j + 1] + 1])
)
p1, q1 = _cell_owcs(
P_edge_len[i],
p_pts,
p_costs,
P_costs[P_vert_indices[i] : P_vert_indices[i + 1] + 1],
Q_edge_len[j],
q_pts,
q_costs,
Q_costs[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1],
i == 0,
j == 0,
i == NP - 2,
j == NQ - 2,
)
# store for the next loops
if j == 0:
p0 = p1
q0 = q1
return Q_costs[-1]
@njit(cache=True)
def _cell_owcs(
L1,
p_pts,
p_costs,
p_costs_out,
L2,
q_pts,
q_costs,
q_costs_out,
p_is_initial,
q_is_initial,
p_is_last,
q_is_last,
):
"""Optimal warping costs for all points in an cell."""
# p_costs = lower boundary, p_costs_out = upper boundary,
# q_costs = left boundary, q_costs_out = right boundary of the cell.
P1, Q1, u, v, b, delta_P, delta_Q = _cell_info(p_pts, L1, q_pts, L2)
# Will be reused for each border point (t) to find best starting point (s).
p_cost_candidates = np.empty(len(p_pts), dtype=np.float64)
q_cost_candidates = np.empty(len(q_pts), dtype=np.float64)
s = np.empty((2,), dtype=np.float64)
t = np.empty((2,), dtype=np.float64)
# compute upper boundary
t[1] = L2
if q_is_last: # No need steiner points on upper boundary. Just check corner point.
start_idx = len(p_pts) - 1
else:
start_idx = 0
for i in range(start_idx, len(p_pts)): # Fill p_costs_out[i]
t[0] = delta_P * i
s[0] = 0
if p_is_initial: # No steiner points on left boundary; just check [0, 0]
q_end_idx = 1
else:
q_end_idx = len(q_pts)
for j in range(0, q_end_idx):
s[1] = delta_Q * j
cost, _, _ = _st_owp(P1, u, Q1, v, b, s, t)
q_cost_candidates[j] = q_costs[j] + cost
s[1] = 0
if q_is_initial: # No steiner points on bottom boundary; just check [0, 0]
p_end_idx = 1
else:
p_end_idx = i + 1
p_cost_candidates[0] = q_cost_candidates[0] # cost from [0, 0] already known.
for i_ in range(1, p_end_idx): # let bottom border points be (s). (to right)
s[0] = delta_P * i_
cost, _, _ = _st_owp(P1, u, Q1, v, b, s, t)
p_cost_candidates[i_] = p_costs[i_] + cost
p_costs_out[i] = min(
np.min(p_cost_candidates[:p_end_idx]), np.min(q_cost_candidates[:q_end_idx])
)
# compute right boundary
t[0] = L1
if p_is_last: # No need steiner points on right boundary. Just check corner point.
start_idx = len(q_pts) - 1
elif q_is_initial:
start_idx = 0
else: # LR corner already computed by the lower cell
start_idx = 1
# Don't need to compute the last j (already done by P loop just above)
for j in range(start_idx, len(q_pts) - 1):
t[1] = delta_Q * j
s[1] = 0
if q_is_initial: # No steiner points on bottom boundary; just check [0, 0]
p_end_idx = 1
else:
p_end_idx = len(p_pts)
for i in range(0, p_end_idx):
s[0] = delta_P * i
cost, _, _ = _st_owp(P1, u, Q1, v, b, s, t)
p_cost_candidates[i] = p_costs[i] + cost
s[0] = 0
if p_is_initial: # No steiner points on left boundary; just check [0, 0]
q_end_idx = 1
else:
q_end_idx = j + 1
q_cost_candidates[0] = p_cost_candidates[0] # cost from [0, 0] already known.
for j_ in range(1, q_end_idx): # cost from [0, 0] already known.
s[1] = delta_Q * j_
cost, _, _ = _st_owp(P1, u, Q1, v, b, s, t)
q_cost_candidates[j_] = q_costs[j_] + cost
q_costs_out[j] = min(
np.min(p_cost_candidates[:p_end_idx]), np.min(q_cost_candidates[:q_end_idx])
)
q_costs_out[-1] = p_costs_out[-1]
# Lower-right corner and upper-left corner of cells.
return q_costs_out[:1], p_costs_out[:1]
[docs]
@sanitize_vertices(owp=True)
@sanitize_vertices_ifd(ifd_degenerate, owp=True)
def ifd_owp(P, Q, delta):
"""Integral Fréchet distance and its optimal warping path.
Parameters
----------
P : array_like
A :math:`p` by :math:`n` array of :math:`p` vertices in an
:math:`n`-dimensional space.
Q : array_like
A :math:`q` by :math:`n` array of :math:`q` vertices in an
:math:`n`-dimensional space.
delta : double
Maximum length of edges between Steiner points.
Returns
-------
dist : double
The integral Fréchet distance between *P* and *Q*, NaN if any vertice
is empty or both vertices consist of a single point.
owp : ndarray
Optimal warping path, empty if any vertice is empty or both vertices
consist of a single point.
Raises
------
ValueError
If *P* and *Q* are not 2-dimensional arrays with same number of columns.
Examples
--------
>>> dist, path = ifd_owp([[0, 0], [0.5, 0], [1, 0]], [[0.5, 1], [1.5, 1]], 0.1)
>>> import matplotlib.pyplot as plt #doctest: +SKIP
>>> plt.plot(*path.T) #doctest: +SKIP
"""
dist, owp, count = _ifd_owp(*_sample_ifd_pts(P, Q, delta))
return float(dist), owp[:count]
@njit(cache=True)
def _ifd_owp(
P_edge_len,
P_subedges_num,
P_pts,
Q_edge_len,
Q_subedges_num,
Q_pts,
):
# Same as _ifd(), but stores paths so needs more memory.
NP = len(P_subedges_num) + 1
P_vert_indices = np.empty(NP, dtype=np.int_)
P_vert_indices[0] = 0
P_vert_indices[1:] = np.cumsum(P_subedges_num)
NQ = len(Q_subedges_num) + 1
Q_vert_indices = np.empty(NQ, dtype=np.int_)
Q_vert_indices[0] = 0
Q_vert_indices[1:] = np.cumsum(Q_subedges_num)
# Cost containers; elements will be updated.
P_costs = np.empty(len(P_pts), dtype=np.float64)
P_costs[0] = 0
Q_costs = np.empty(len(Q_pts), dtype=np.float64)
Q_costs[0] = 0
# Path containers; elements will be updated.
# Path passes (NP + NQ - 3) cells, and has max 4 vertices in each cell.
# (NP + NQ - 4) vertices overlap.
MAX_PATH_VERT_NUM = (NP + NQ - 3) * 4 - (NP + NQ - 4)
P_paths = np.empty((len(P_pts), MAX_PATH_VERT_NUM, 2), dtype=np.float64)
P_paths[0, 0] = [0, 0]
Q_paths = np.empty((len(Q_pts), MAX_PATH_VERT_NUM, 2), dtype=np.float64)
Q_paths[0, 0] = [0, 0]
P_count = np.empty(len(P_pts), dtype=np.int32)
P_count[0] = 1
Q_count = np.empty(len(Q_pts), dtype=np.int32)
Q_count[0] = 1
# TODO: parallelize this i-loop.
p0_cost = P_costs[:1]
p0_path = P_paths[:1]
p0_count = P_count[:1]
for i in range(NP - 1):
p_pts = P_pts[P_vert_indices[i] : P_vert_indices[i + 1] + 1]
q0_cost = Q_costs[:1]
q0_path = Q_paths[:1]
q0_count = Q_count[:1]
for j in range(NQ - 1):
q_pts = Q_pts[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1]
if j == 0:
p_costs = np.concatenate(
(
p0_cost,
P_costs[P_vert_indices[i] + 1 : P_vert_indices[i + 1] + 1],
)
)
p_paths = np.concatenate(
(
p0_path,
P_paths[P_vert_indices[i] + 1 : P_vert_indices[i + 1] + 1],
)
)
p_count = np.concatenate(
(
p0_count,
P_count[P_vert_indices[i] + 1 : P_vert_indices[i + 1] + 1],
)
)
else:
p_costs = P_costs[P_vert_indices[i] : P_vert_indices[i + 1] + 1].copy()
p_paths = P_paths[P_vert_indices[i] : P_vert_indices[i + 1] + 1].copy()
p_count = P_count[P_vert_indices[i] : P_vert_indices[i + 1] + 1].copy()
q_costs = np.concatenate(
(q0_cost, Q_costs[Q_vert_indices[j] + 1 : Q_vert_indices[j + 1] + 1])
)
q_paths = np.concatenate(
(
q0_path,
Q_paths[Q_vert_indices[j] + 1 : Q_vert_indices[j + 1] + 1],
)
)
q_count = np.concatenate(
(q0_count, Q_count[Q_vert_indices[j] + 1 : Q_vert_indices[j + 1] + 1])
)
p1_cost, p1_path, p1_count, q1_cost, q1_path, q1_count = _cell_owps(
P_edge_len[i],
p_pts,
p_costs,
P_costs[P_vert_indices[i] : P_vert_indices[i + 1] + 1],
p_paths,
P_paths[P_vert_indices[i] : P_vert_indices[i + 1] + 1],
p_count,
P_count[P_vert_indices[i] : P_vert_indices[i + 1] + 1],
Q_edge_len[j],
q_pts,
q_costs,
Q_costs[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1],
q_paths,
Q_paths[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1],
q_count,
Q_count[Q_vert_indices[j] : Q_vert_indices[j + 1] + 1],
i == 0,
j == 0,
i == NP - 2,
j == NQ - 2,
)
if j == 0:
p0_cost = p1_cost
p0_path = p1_path
p0_count = p1_count
q0_cost = q1_cost
q0_path = q1_path
q0_count = q1_count
return Q_costs[-1], Q_paths[-1], Q_count[-1]
@njit(cache=True)
def _cell_owps(
L1,
p_pts,
p_costs,
p_costs_out,
p_paths,
p_paths_out,
p_count,
p_count_out,
L2,
q_pts,
q_costs,
q_costs_out,
q_paths,
q_paths_out,
q_count,
q_count_out,
p_is_initial,
q_is_initial,
p_is_last,
q_is_last,
):
"""Optimal warping paths and their costs for all points in an cell."""
P1, Q1, u, v, b, delta_P, delta_Q = _cell_info(p_pts, L1, q_pts, L2)
p_cost_candidates = np.empty(len(p_pts), dtype=np.float64)
q_cost_candidates = np.empty(len(q_pts), dtype=np.float64)
p_path_candidates = np.empty((len(p_pts), 3, 2), dtype=np.float64)
q_path_candidates = np.empty((len(q_pts), 3, 2), dtype=np.float64)
p_count_candidates = np.empty(len(p_pts), dtype=np.int32)
q_count_candidates = np.empty(len(q_pts), dtype=np.int32)
s = np.empty((2,), dtype=np.float64)
t = np.empty((2,), dtype=np.float64)
# compute upper boundary
t[1] = L2
if q_is_last: # No need steiner points on upper boundary. Just check corner point.
start_idx = len(p_pts) - 1
else:
start_idx = 0
for i in range(start_idx, len(p_pts)): # Fill p_costs_out[i]
t[0] = delta_P * i
s[0] = 0
if p_is_initial: # No steiner points on left boundary; just check [0, 0]
q_end_idx = 1
else:
q_end_idx = len(q_pts)
for j in range(0, q_end_idx):
s[1] = delta_Q * j
cost, vert, count = _st_owp(P1, u, Q1, v, b, s, t)
q_cost_candidates[j] = q_costs[j] + cost
q_path_candidates[j, : count - 1] = (vert - vert[0])[1:count]
q_count_candidates[j] = count - 1
s[1] = 0
if q_is_initial: # No steiner points on bottom boundary; just check [0, 0]
p_end_idx = 1
else:
p_end_idx = i + 1
p_cost_candidates[0] = q_cost_candidates[0] # cost from [0, 0] already known.
p_path_candidates[0] = q_path_candidates[0] # path from [0, 0] already known.
p_count_candidates[0] = q_count_candidates[0]
for i_ in range(1, p_end_idx): # let bottom border points be (s). (to right)
s[0] = delta_P * i_
cost, vert, count = _st_owp(P1, u, Q1, v, b, s, t)
p_cost_candidates[i_] = p_costs[i_] + cost
p_path_candidates[i_, : count - 1] = (vert - vert[0])[1:count]
p_count_candidates[i_] = count - 1
p_min_idx = np.argmin(p_cost_candidates[:p_end_idx])
q_min_idx = np.argmin(q_cost_candidates[:q_end_idx])
from_p_mincost = p_cost_candidates[p_min_idx]
from_q_mincost = q_cost_candidates[q_min_idx]
if from_p_mincost > from_q_mincost:
mincost = from_q_mincost
prevcount = q_count[q_min_idx]
prevpath = q_paths[q_min_idx, :prevcount]
mincount = q_count_candidates[q_min_idx]
minpath = q_path_candidates[q_min_idx, :mincount]
else:
mincost = from_p_mincost
prevcount = p_count[p_min_idx]
prevpath = p_paths[p_min_idx, :prevcount]
mincount = p_count_candidates[p_min_idx]
minpath = p_path_candidates[p_min_idx, :mincount]
p_costs_out[i] = mincost
p_count_out[i] = prevcount + mincount
p_paths_out[i, :prevcount] = prevpath
p_paths_out[i, prevcount : prevcount + mincount] = prevpath[-1] + minpath
# compute right boundary
t[0] = L1
if p_is_last: # No need steiner points on right boundary. Just check corner point.
start_idx = len(q_pts) - 1
elif q_is_initial:
start_idx = 0
else: # LR corner already computed by the lower cell (MUST exclude for path track)
start_idx = 1
# Don't need to compute the last j (already done by P loop just above)
for j in range(start_idx, len(q_pts) - 1):
t[1] = delta_Q * j
s[1] = 0
if q_is_initial: # No steiner points on bottom boundary; just check [0, 0]
p_end_idx = 1
else:
p_end_idx = len(p_pts)
for i in range(0, p_end_idx):
s[0] = delta_P * i
cost, vert, count = _st_owp(P1, u, Q1, v, b, s, t)
p_cost_candidates[i] = p_costs[i] + cost
p_path_candidates[i, : count - 1] = (vert - vert[0])[1:count]
p_count_candidates[i] = count - 1
s[0] = 0
if p_is_initial: # No steiner points on left boundary; just check [0, 0]
q_end_idx = 1
else:
q_end_idx = j + 1
q_cost_candidates[0] = p_cost_candidates[0] # cost from [0, 0] already known.
q_path_candidates[0] = p_path_candidates[0] # path from [0, 0] already known.
q_count_candidates[0] = p_count_candidates[0]
for j_ in range(1, q_end_idx): # cost from [0, 0] already known.
s[1] = delta_Q * j_
cost, vert, count = _st_owp(P1, u, Q1, v, b, s, t)
q_cost_candidates[j_] = q_costs[j_] + cost
q_path_candidates[j_, : count - 1] = (vert - vert[0])[1:count]
q_count_candidates[j_] = count - 1
p_min_idx = np.argmin(p_cost_candidates[:p_end_idx])
q_min_idx = np.argmin(q_cost_candidates[:q_end_idx])
from_p_mincost = p_cost_candidates[p_min_idx]
from_q_mincost = q_cost_candidates[q_min_idx]
if from_p_mincost > from_q_mincost:
mincost = from_q_mincost
prevcount = q_count[q_min_idx]
prevpath = q_paths[q_min_idx, :prevcount]
mincount = q_count_candidates[q_min_idx]
minpath = q_path_candidates[q_min_idx, :mincount]
else:
mincost = from_p_mincost
prevcount = p_count[p_min_idx]
prevpath = p_paths[p_min_idx, :prevcount]
mincount = p_count_candidates[p_min_idx]
minpath = p_path_candidates[p_min_idx, :mincount]
q_costs_out[j] = mincost
q_count_out[j] = prevcount + mincount
q_paths_out[j, :prevcount] = prevpath
q_paths_out[j, prevcount : prevcount + mincount] = prevpath[-1] + minpath
q_costs_out[-1] = p_costs_out[-1]
q_paths_out[-1] = p_paths_out[-1]
q_count_out[-1] = p_count_out[-1]
return (
q_costs_out[:1],
q_paths_out[:1],
q_count_out[:1],
p_costs_out[:1],
p_paths_out[:1],
p_count_out[:1],
)
@njit(cache=True)
def _st_owp(P1, u, Q1, v, b, s, t):
"""Optimal warping path between two points in a cell and its cost."""
P_s = P1 + u * s[0]
P_t = P1 + u * t[0]
Q_s = Q1 + v * s[1]
Q_t = Q1 + v * t[1]
verts = np.empty((4, 2), dtype=np.float64)
verts[0] = s
count = 1
if s[1] > s[0] + b:
cs_x, cs_y = s[1] - b, s[1]
else:
cs_x, cs_y = s[0], s[0] + b
if t[1] < t[0] + b:
ct_x, ct_y = t[1] - b, t[1]
else:
ct_x, ct_y = t[0], t[0] + b
if cs_x < ct_x: # pass through lm
P_cs = P1 + u * cs_x
P_ct = P1 + u * ct_x
Q_cs = Q1 + v * cs_y
Q_ct = Q1 + v * ct_y
if s[1] > s[0] + b: # right
s_to_cs = _line_point_square_integrate(P_s, P_cs, Q_s)
else: # up
s_to_cs = _line_point_square_integrate(Q_s, Q_cs, P_s)
cs_to_ct = _line_line_square_integrate(P_cs, P_ct, Q_cs, Q_ct)
if t[1] > t[0] + b: # up
ct_to_t = _line_point_square_integrate(Q_ct, Q_t, P_t)
else: # right
ct_to_t = _line_point_square_integrate(P_ct, P_t, Q_t)
cost = s_to_cs + cs_to_ct + ct_to_t
if s_to_cs > 0:
verts[count] = (cs_x, cs_y)
count += 1
if cs_to_ct > 0:
verts[count] = (ct_x, ct_y)
count += 1
if ct_to_t > 0:
verts[count] = t
count += 1
else: # pass c'
if s[1] > s[0] + b: # right -> up
cost1 = _line_point_square_integrate(P_s, P_t, Q_s)
cost2 = _line_point_square_integrate(Q_s, Q_t, P_t)
c_prime = (t[0], s[1])
else: # up -> right
cost1 = _line_point_square_integrate(Q_s, Q_t, P_s)
cost2 = _line_point_square_integrate(P_s, P_t, Q_t)
c_prime = (s[0], t[1])
cost = cost1 + cost2
if cost1 > 0:
verts[count] = c_prime
count += 1
if cost2 > 0:
verts[count] = t
count += 1
return cost, verts, count
def _sample_ifd_pts(P, Q, delta):
# No need to add Steiner points if the other polyline is just a line segment.
if len(Q) == 2:
P_edge_len = np.linalg.norm(np.diff(P, axis=0), axis=-1)
P_subedges_num = np.ones(len(P) - 1, dtype=np.int_)
P_pts = P
else:
P_edge_len, P_subedges_num, P_pts = _sample_pts(P, delta)
if len(P) == 2:
Q_edge_len = np.linalg.norm(np.diff(Q, axis=0), axis=-1)
Q_subedges_num = np.ones(len(Q) - 1, dtype=np.int_)
Q_pts = Q
else:
Q_edge_len, Q_subedges_num, Q_pts = _sample_pts(Q, delta)
return (
P_edge_len,
P_subedges_num,
P_pts,
Q_edge_len,
Q_subedges_num,
Q_pts,
)
@njit(cache=True)
def _sample_pts(vert, delta):
N, D = vert.shape
vert_diff = vert[1:] - vert[:-1]
edge_lens = np.empty(N - 1, dtype=np.float64)
for i in range(N - 1):
edge_lens[i] = np.linalg.norm(vert_diff[i])
subedges_num = np.ceil(edge_lens / delta).astype(np.int_)
pts = np.empty((np.sum(subedges_num) + 1, D), dtype=np.float64)
count = 0
for cell_idx in range(N - 1):
P0 = vert[cell_idx]
v = vert_diff[cell_idx]
n = subedges_num[cell_idx]
for i in range(n):
pts[count + i] = P0 + (i / n) * v
count += n
pts[count] = vert[N - 1]
return edge_lens, subedges_num, pts
@njit(cache=True)
def _cell_info(P_pts, L1, Q_pts, L2):
P1 = P_pts[0]
P2 = P_pts[-1]
Q1 = Q_pts[0]
Q2 = Q_pts[-1]
P1P2 = P2 - P1
Q1Q2 = Q2 - Q1
if L1 < EPSILON:
u = np.array([0, 0], np.float64)
else:
u = (P1P2) / L1
if L2 < EPSILON:
v = np.array([0, 0], np.float64)
else:
v = (Q1Q2) / L2
# Find lm: y = x + b.
# Can be acquired by finding points where distance is minimum.
w = P1 - Q1
u_dot_v = np.dot(u, v)
if 1 - u_dot_v < EPSILON:
# P and Q are parallel; equations degenerate into s - (u.v)t = -u.w
b = np.dot(u, w) / u_dot_v
elif u_dot_v + 1 < EPSILON:
# P and Q are antiparallel.
# Any value is OK, so just set b=0.
b = np.float64(0)
else:
# P and Q intersects.
# Find points P(s) and Q(t) where P and Q intersects.
# (s, t) is on y = x + b
A = np.array([[1, -u_dot_v], [-u_dot_v, 1]], dtype=np.float64)
B = np.array([-np.dot(u, w), np.dot(v, w)], dtype=np.float64)
s, t = np.linalg.solve(A, B)
b = t - s
delta_P = L1 / (len(P_pts) - 1)
delta_Q = L2 / (len(Q_pts) - 1)
return P1, Q1, u, v, b, delta_P, delta_Q
@njit(cache=True)
def _line_point_square_integrate(a, b, p):
r"""Analytic integration from AP to BP (squared).
.. math::
\int_0^1 \lVert (A - P) + (B - A) t \rVert^2 \cdot \lVert (B - A) \rVert dt
"""
# integrate (A*t**2 + B*t + C) * sqrt(A) dt over t [0, 1]
# where A = dot(ab, ab), B = 2 * dot(pa, ab) and C = dot(pa, pa).
ab = b - a
pa = a - p
A = np.dot(ab, ab)
B = 2 * np.dot(ab, pa)
C = np.dot(pa, pa)
return (A / 3 + B / 2 + C) * np.sqrt(A)
@njit(cache=True)
def _line_line_square_integrate(a, b, c, d):
r"""Analytic integration from AC to BD (squared).
.. math::
\int_0^1 \lVert (A - C) + (B - A + C - D)t \rVert^2 \cdot
\left( \lVert B - A \rVert + \lVert D - C \rVert \right) dt
"""
# integrate (A*t**2 + B*t + C) * (sqrt(D) + sqrt(E)) dt over t [0, 1]
# where A = dot(vu, vu), B = 2 * dot(vu, w), C = dot(w, w), D = dot(u, u),
# and E = dot(v, v); where u = b - a, v = d - c and w = a - c.
u = b - a
v = d - c
w = a - c
vu = u - v
A = np.dot(vu, vu)
B = 2 * np.dot(vu, w)
C = np.dot(w, w)
D = np.dot(u, u)
E = np.dot(v, v)
return (A / 3 + B / 2 + C) * (np.sqrt(D) + np.sqrt(E))