import math
from enum import Enum
from functools import partial
from itertools import product
import torch
from scipy.special import stdtr, stdtrit
from scipy.stats import kendalltau
__all__ = [
"ENUM_FUNC_BIDEP",
"cdf_func_kernel",
"chatterjee_xi",
"debye1",
"ferreira_tail_dep_coeff",
"kendall_tau",
"mutual_info",
"ref_count_hfunc",
"solve_ITP",
"wasserstein_dist_ind",
]
_CDF_MIN, _CDF_MAX = 0.0, 1.0
_NU_MIN, _NU_MAX = 1.001, 49.999
_RHO_MIN, _RHO_MAX = -0.99, 0.99
_TAU_MIN, _TAU_MAX = -0.999, 0.999
# def kendall_tau(
# x: torch.Tensor,
# y: torch.Tensor,
# tau_min: float = _TAU_MIN,
# tau_max: float = _TAU_MAX,
# ) -> float:
# """https://gist.github.com/ili3p/f2b38b898f6eab0d87ec248ea39fde94
# x,y are both of shape (n, 1)
# """
# n = x.shape[0]
# n *= n - 1
# return (
# ((x.T - x).sign() * (y.T - y).sign()).sum().div(n).clamp(min=tau_min, max=tau_max).item()
# )
[docs]
def kendall_tau(
x: torch.Tensor,
y: torch.Tensor,
tau_min: float = _TAU_MIN,
tau_max: float = _TAU_MAX,
) -> float:
"""x,y are both of shape (n, 1)"""
res = kendalltau(x.cpu().ravel(), y.cpu().ravel())
return (
max(min(res.correlation.item(), tau_max), tau_min),
res.pvalue.item(),
)
[docs]
def mutual_info(x: torch.Tensor, y: torch.Tensor, is_sklearn: bool = True) -> float:
"""mutual information, need scikit-learn or fastkde installed.
x,y are both of shape (n, 1)
https://scikit-learn.org/stable/modules/generated/sklearn.feature_selection.mutual_info_regression.html
Purkayastha, S., & Song, P. X. K. (2024).
fastMI: A fast and consistent copula-based nonparametric estimator of mutual information.
Journal of Multivariate Analysis, 201, 105270.
"""
if is_sklearn:
from sklearn.feature_selection import mutual_info_regression
return mutual_info_regression(
X=x.cpu(),
# ! notice the shape of y
y=y.cpu().ravel(),
discrete_features=False,
n_neighbors=3,
copy=True,
random_state=None,
)[0].item()
else:
# Purkayastha, S., & Song, P. X. K. (2024).
from fastkde.fastKDE import pdf_at_points
from numpy import log
x_, y_ = x.ravel(), y.ravel()
joint = pdf_at_points(var1=x_, var2=y_)
joint = joint[joint > 0]
margin_x = pdf_at_points(var1=x_)
margin_x = margin_x[margin_x > 0]
margin_y = pdf_at_points(var1=y_)
margin_y = margin_y[margin_y > 0]
return (log(joint).mean() - log(margin_x).mean() - log(margin_y).mean()).item()
[docs]
def ferreira_tail_dep_coeff(x: torch.Tensor, y: torch.Tensor) -> float:
"""pairwise tail dependence coefficient (λ) estimator, max of rotation 0, 90, 180, 270
x and y are both of shape (n, 1) inside (0, 1)
symmetric for (x,y), (y,1-x), (1-x,1-y), (1-y,x), (y,x), (1-x,y), (1-y,1-x), (x,1-y)
Ferreira, M.S., 2013. Nonparametric estimation of the tail-dependence coefficient;
"""
x1 = 1 - x
y1 = 1 - y
return (
3
- (
1
- torch.as_tensor(
[
torch.max(x, y).mean(),
torch.max(y, x1).mean(),
torch.max(x1, y1).mean(),
torch.max(y1, x).mean(),
]
)
.clamp(0.5, 0.6666666666666666)
.min()
).reciprocal()
).item()
[docs]
def chatterjee_xi(x: torch.Tensor, y: torch.Tensor, M: int = 1) -> float:
"""revised Chatterjee's rank correlation coefficient (ξ), taken max to be symmetric
Chatterjee, S., 2021. A new coefficient of correlation.
Journal of the American Statistical Association, 116(536), pp.2009-2022.
Lin, Z. and Han, F., 2023. On boosting the power of Chatterjee’s rank correlation. Biometrika, 110(2), pp.283-299.
"a large negative value of ξ has only one possible interpretation: the data does not resemble an iid sample."
:param x: obs of shape (n,1)
:type x: torch.Tensor
:param y: obs of shape (n,1)
:type y: torch.Tensor
:param M: num of right nearest neighbors
:type M: int
:return: revised Chatterjee's rank correlation coefficient (ξ), taken max to be symmetric for (X,Y) and (Y,X)
:rtype: float
"""
# ranks of x in the order of (ranks of y)
# ranks of y in the order of (ranks of x)
xrank, yrank = (
x.argsort(dim=0).argsort(dim=0) + 1,
y.argsort(dim=0).argsort(dim=0) + 1,
)
xrank, yrank = xrank[yrank.argsort(dim=0)], yrank[xrank.argsort(dim=0)]
# the inner sum inside the numerator term, ∑min(Ri, Rjm(i))
# max for symmetry as following Remark 1 in Chatterjee (2021)
def xy_sum(m: int) -> tuple:
return (
(torch.min(xrank[:-m], xrank[m:])).sum() + xrank[-m:].sum(),
(torch.min(yrank[:-m], yrank[m:])).sum() + yrank[-m:].sum(),
)
# whole eq. 3 in Lin and Han (2023)
n = x.shape[0]
return -2.0 + 24.0 * (
torch.as_tensor([xy_sum(m) for m in range(1, M + 1)]).sum(dim=0).max().item()
) / (M * (1.0 + n) * (1.0 + M + 4.0 * n))
[docs]
def wasserstein_dist_ind(
x: torch.Tensor,
y: torch.Tensor,
p: int = 2,
reg: float = 0.1,
num_step: int = 50,
seed: int = 0,
) -> float:
"""Wasserstein distance from bicop obs to indep bicop, by ot.sinkhorn2 (averaged for each observation).
Need pot installed.
:param x: copula obs of shape (n,1)
:type x: torch.Tensor
:param y: copula obs of shape (n,1)
:type y: torch.Tensor
:param p: p-norm to calculate distance between each vector pair, defaults to 2
:type p: int, optional
:param reg: regularization strength, defaults to 0.1
:type reg: float, optional
:param num_step: number of steps in the independent bivariate copula grid, defaults to 50
:type num_step: int, optional
:param seed: random seed for torch.manual_seed(), defaults to 0
:type seed: int, optional
:return: Wasserstein distance from bicop obs to indep bicop
:rtype: float
"""
from ot import sinkhorn2
torch.manual_seed(seed=seed)
return sinkhorn2(
a=torch.ones_like(x) / x.shape[0],
b=torch.ones(size=(num_step**2, 1), device=x.device, dtype=x.dtype) / num_step**2,
M=torch.cdist(
x1=torch.hstack([x, y]),
x2=torch.as_tensor(
(
*product(
torch.linspace(0, 1, num_step),
torch.linspace(0, 1, num_step),
),
),
device=x.device,
dtype=x.dtype,
),
p=p,
),
reg=reg,
).item()
[docs]
class ENUM_FUNC_BIDEP(Enum):
"""an enum class for bivariate dependence measures"""
chatterjee_xi = partial(chatterjee_xi)
ferreira_tail_dep_coeff = partial(ferreira_tail_dep_coeff)
kendall_tau = partial(kendall_tau)
mutual_info = partial(mutual_info)
wasserstein_dist_ind = partial(wasserstein_dist_ind)
[docs]
def cdf_func_kernel(obs: torch.Tensor, is_scott: bool = True) -> callable:
"""kernel density estimation (KDE) function of the cumulative distribution function (CDF)
:param obs: observations, of shape (n,1)
:type obs: torch.Tensor
:param is_scott: whether to use Scott's rule for bandwidth, defaults to True
:type is_scott: bool, optional
:return: a CDF function by KDE
:rtype: callable
"""
if is_scott:
# * bandwidth by Scott 1992
band_width = (
torch.nanquantile(input=obs, q=0.75) - torch.nanquantile(input=obs, q=0.25)
) / 1.349
band_width = obs.std().clamp_max(band_width) * 1.059 * len(obs) ** (-0.2)
else:
band_width = obs.std() * 0.6973425390765554 * len(obs) ** -0.1111111111111111
def func_cdf(q: torch.Tensor) -> torch.Tensor:
return torch.special.ndtr((q - obs.T) / band_width).nanmean(dim=1)
return func_cdf
# * Gaussian distribution CDF (p), PPF (q), density (d)
pnorm = torch.special.ndtr
qnorm = torch.special.ndtri
def pbvnorm(obs: torch.Tensor, rho: float) -> torch.Tensor:
"""CDF of (standard) bivariate Gaussian distribution
modified from http://www.math.wsu.edu/faculty/genz/software/matlab/bvnl.m
https://keisan.casio.com/exec/system/1280624821
:param obs: bivariate Gaussian observations, of shape (num_obs, 2)
:type obs: torch.Tensor
:param rho: the rho parameter, also a Pearson's corr coef
:type rho: float
:return: cumulative distribution function (CDF) of shape (num_obs, 1), given the observation
:rtype: torch.Tensor
"""
rho = min(max(rho, _RHO_MIN), _RHO_MAX)
# Gauss Legendre points and weights, n = 100
w = torch.as_tensor(
data=(
0.007968192496166605,
0.01846646831109096,
0.02878470788332337,
0.03879919256962705,
0.04840267283059405,
0.057493156217619065,
0.06597422988218049,
0.0737559747377052,
0.08075589522942021,
0.08689978720108298,
0.09212252223778612,
0.09636873717464425,
0.09959342058679527,
0.1017623897484055,
0.10285265289355884,
0.007968192496166605,
0.01846646831109096,
0.02878470788332337,
0.03879919256962705,
0.04840267283059405,
0.057493156217619065,
0.06597422988218049,
0.0737559747377052,
0.08075589522942021,
0.08689978720108298,
0.09212252223778612,
0.09636873717464425,
0.09959342058679527,
0.1017623897484055,
0.10285265289355884,
),
device=obs.device,
dtype=obs.dtype,
).reshape(-1, 1)
x = torch.as_tensor(
data=(
0.003106515925350495,
0.016331876720252825,
0.03997813503169245,
0.07379995257072569,
0.11743946420794726,
0.17043423761723164,
0.23222256789517381,
0.30214950520668415,
0.3794738170107571,
0.46337585185798014,
0.5529662304619108,
0.6472952744691218,
0.7453630738321102,
0.8461300863914165,
0.9485281574446823,
1.9968934840746495,
1.983668123279747,
1.9600218649683074,
1.9262000474292744,
1.8825605357920527,
1.8295657623827684,
1.7677774321048263,
1.697850494793316,
1.6205261829892428,
1.5366241481420198,
1.447033769538089,
1.3527047255308782,
1.2546369261678898,
1.1538699136085835,
1.0514718425553178,
),
device=obs.device,
dtype=obs.dtype,
).reshape(1, -1)
#
h, k = obs[:, [0]], obs[:, [1]]
asr = math.asin(rho) / 2.0
sn = (x * asr).sin()
return (
(
(
(
(h.square() + k.square()) / -2.0
# ! broadcast: x,sn are rows; w,h,k are columns
+ sn * h * k
)
/ (1.0 - sn.square())
).exp()
@ w
* (asr / 6.283185307179586)
+ (pnorm(h) * pnorm(k))
)
.nan_to_num()
.clamp(min=_CDF_MIN, max=_CDF_MAX)
)
# # * Student's t distribution CDF (p), PPF (q), density (d)
# def inc_beta_reg(vec: torch.Tensor, a: float, b: float) -> torch.Tensor:
# # * regularized incomplete beta integral, with a = 0.5, b = nu / 2, vec in [0,1]
# res = torch.empty_like(vec)
# if (idx := vec > ((a + 1.0) / (2.0 + a + b))).any():
# res[idx] = 1.0 - inc_beta_reg(vec=1.0 - vec[idx], a=b, b=a)
# idx = ~idx
# x = vec[idx]
# qab = a + b
# qap = a + 1.0
# qam = a - 1.0
# c = torch.ones_like(x)
# d = 1.0 / (1.0 - qab * x / qap)
# f = d.clone()
# front = (
# -math.lgamma(a)
# - math.lgamma(b)
# + math.lgamma(a + b)
# - math.log(a)
# + x.log() * a
# + x.neg().log1p() * b
# ).exp()
# for i in range(1, 200):
# i2 = i * 2.0
# ai2 = a + i2
# nmrt = (i * (b - i) / ((qam + i2) * ai2)) * x
# d = (d * nmrt + 1.0).reciprocal()
# c = nmrt / c + 1.0
# f *= c * d
# nmrt = (-(a + i) * (qab + i) / ((qap + i2) * ai2)) * x
# d = (d * nmrt + 1.0).reciprocal()
# c = nmrt / c + 1.0
# cd = c * d
# f *= cd
# if ((1.0 - cd).abs() < 1e-6).all():
# break
# res[idx] = front * f
# return res.nan_to_num().clamp(min=_CDF_MIN, max=_CDF_MAX)
# def inc_beta_reg_inv(vec: torch.Tensor, a: float, b: float) -> torch.Tensor:
# a1, b1 = a - 1.0, b - 1.0
# if a > 1.0 and b > 1.0:
# tmp = vec.clone()
# idx = vec >= 0.5
# tmp[idx] = 1.0 - tmp[idx]
# t = (-2.0 * tmp.log()).sqrt()
# x = (2.30753 + t * 0.27061) / (1.0 + t * (0.99229 + t * 0.04481)) - t
# idx = ~idx
# x[idx] = -x[idx]
# al = (x.square() - 3.0) / 6.0
# h = 2.0 / (1.0 / (2.0 * a - 1.0) + 1.0 / (2.0 * b - 1.0))
# w = (x * math.sqrt(al + h) / h) - (1.0 / (2.0 * b - 1.0) - 1.0 / (2.0 * a - 1.0)) * (
# al + 5.0 / 6.0 - 2.0 / (3.0 * h)
# )
# x = a / (a + b * (2.0 * w).exp())
# else:
# x = vec.clone()
# t = math.exp(a * math.log(a / (a + b))) / a
# w = t + math.exp(b * math.log(b / (a + b))) / b
# idx = vec < t / w
# x[idx] = (a * w * x[idx]).pow(1.0 / a)
# idx = ~idx
# x[idx] = 1.0 - (b * w * (1.0 - x[idx])).pow(1.0 / b)
# afac = math.lgamma(a + b) - math.lgamma(a) - math.lgamma(b)
# for _ in range(10):
# t = (a1 * x.log() + b1 * x.neg().log1p() + afac).exp()
# u = (inc_beta_reg(vec=x, a=a, b=b) - vec) / t
# t = u / (1.0 - 0.5 * (u * (a1 / x - b1 / (1.0 - x))).clamp_max(max=1.0))
# idx = (x > 0.0) & (x < 1.0)
# x[idx] -= t[idx]
# idx = x < 0.0
# x[idx] = 0.5 * (x[idx] + t[idx])
# idx = x > 1.0
# x[idx] = 0.5 * (x[idx] + t[idx] + 1.0)
# x = x.nan_to_num().clamp(min=_CDF_MIN, max=_CDF_MAX)
# return x
# def pt(vec: torch.Tensor, nu: float) -> torch.Tensor:
# if nu == 1:
# res = vec.atan() / 3.141592653589793 + 0.5
# elif nu == 2:
# res = (vec / (vec.square() + 2.0).sqrt() + 1.0) / 2.0
# elif nu == 3:
# res = (
# (vec / 1.7320508075688772).atan() / 3.141592653589793
# + 0.5
# + 0.5513288954217921 * vec / (vec.square() + 3.0)
# )
# elif nu == 4:
# _ = vec.square()
# res = (vec * (_ + 6.0) / (_ + 4.0).pow(1.5) + 1.0) / 2.0
# elif nu == 5:
# res = (
# (vec / 2.23606797749979).atan() / 3.141592653589793
# + (
# (vec.square() * 3.0 + 25.0)
# * 0.23725418113905902
# * vec
# / (vec.square() + 5.0).square()
# )
# + 0.5
# )
# elif nu == 6:
# res = vec.square()
# res = ((res.square() * 2.0 + res * 30.0 + 135.0) * vec / (res + 6.0).pow(2.5)) / 4.0 + 0.5
# else:
# nu = max(min(_NU_MAX, nu), _NU_MIN)
# res = inc_beta_reg(vec=nu / (vec.square() + nu), a=nu / 2.0, b=0.5)
# idx = vec > 0.0
# res[idx] = 2.0 - res[idx]
# res *= 0.5
# return res.nan_to_num().clamp(min=_CDF_MIN, max=_CDF_MAX)
# def qt(vec: torch.Tensor, nu: float) -> torch.Tensor:
# vec2 = vec * 2.0
# nu2 = nu / 2.0
# res = torch.empty_like(input=vec, device=vec.device)
# idx = vec < 0.5
# res[idx] = (inc_beta_reg_inv(vec=vec2[idx], a=nu2, b=0.5).reciprocal() - 1.0).sqrt().neg()
# idx = ~idx
# res[idx] = (inc_beta_reg_inv(vec=2.0 - vec2[idx], a=nu2, b=0.5).reciprocal() - 1.0).sqrt()
# res[vec == 0.5] = 0.0
# res *= math.sqrt(nu)
# return res.nan_to_num()
def pt_scipy(vec: torch.Tensor, nu: float) -> torch.Tensor:
if vec.device.type == "cpu":
return stdtr(nu, vec)
else:
return stdtr(nu, vec.cpu()).cuda()
def qt_scipy(vec: torch.Tensor, nu: float) -> torch.Tensor:
if vec.device.type == "cpu":
return stdtrit(nu, vec)
else:
return stdtrit(nu, vec.cpu()).cuda()
pt = pt_scipy
qt = qt_scipy
def l_dt(obs: torch.Tensor, nu: float) -> torch.Tensor:
"""log density of Student's t distribution
:param obs: Student's t observations
:type obs: torch.Tensor
:param nu: degrees of freedom
:type nu: float
:return: log density, given the observations
:rtype: torch.Tensor
"""
nu2 = nu / 2.0
return ((obs.square() / nu).log1p() * (-nu2 - 0.5)) + (
math.lgamma(nu2 + 0.5) - math.lgamma(nu2) - 0.5723649429247001 - 0.5 * math.log(nu)
)
def pbvt(
obs: torch.Tensor,
rho: float,
nu: float,
) -> torch.Tensor:
"""CDF of (standard) bivariate Student's t distribution
modified from http://www.math.wsu.edu/faculty/genz/software/matlab/bvtl.m
:param obs: bivariate Student's t observations, of shape (num_obs, 2)
:type obs: torch.Tensor
:param rho: the rho parameter
:type rho: float
:param nu: the nu parameter, degrees of freedom
:type nu: float
:return: cumulative distribution function (CDF) of shape (num_obs, 1), given the observations
:rtype: torch.Tensor
"""
h = obs[:, [0]]
k = obs[:, [1]]
rho: torch.Tensor = torch.as_tensor(
data=rho,
dtype=obs.dtype,
device=obs.device,
).clamp(
min=_RHO_MIN,
max=_RHO_MAX,
)
nu: torch.Tensor = torch.as_tensor(
data=nu,
dtype=obs.dtype,
device=obs.device,
).clamp(
min=_NU_MIN,
max=_NU_MAX,
)
#
if nu < 1:
bvt = pbvnorm(obs=obs, rho=rho)
else:
xnhk, xnkh = torch.zeros_like(h), torch.zeros_like(h)
ors = 1.0 - rho**2
snu = nu.sqrt()
hhh, kkk = h - rho * k, k - rho * h
hs, ks = hhh.sign(), kkk.sign()
idx = hhh.abs() + ors > 0.0
xnhk[idx] = hhh[idx].square() / (hhh[idx].square() + ors * (nu + k[idx].square()))
xnkh[idx] = kkk[idx].square() / (kkk[idx].square() + ors * (nu + h[idx].square()))
# gmph as hhh, gmpk as kkk
if nu.ceil() % 2 == 0:
bvt = torch.full_like(
input=h,
fill_value=torch.atan2(ors.sqrt(), -rho) / 6.283185307179586,
)
hhh = h / (16.0 * (h.square() + nu)).sqrt()
kkk = k / (16.0 * (k.square() + nu)).sqrt()
btnchk = 0.6366197723675814 * torch.atan2(xnhk.sqrt(), (1.0 - xnhk).sqrt())
btpdhk = 0.6366197723675814 * (xnhk * (1.0 - xnhk)).sqrt()
btnckh = 0.6366197723675814 * torch.atan2(xnkh.sqrt(), (1.0 - xnkh).sqrt())
btpdkh = 0.6366197723675814 * (xnkh * (1.0 - xnkh)).sqrt()
for j in torch.arange(start=1.0, end=nu / 2.0 + 1e-5, step=1):
bvt += hhh * (btnckh * ks + 1.0) + kkk * (btnchk * hs + 1.0)
hhh *= (j - 0.5) / (j * (1.0 + h.square() / nu))
kkk *= (j - 0.5) / (j * (1.0 + k.square() / nu))
btnchk += btpdhk
btpdhk *= j * 2.0 * (1.0 - xnhk) / (2.0 * j + 1.0)
btnckh += btpdkh
btpdkh *= j * 2.0 * (1.0 - xnkh) / (2.0 * j + 1.0)
else:
qhrk = (h.square() + k.square() - 2.0 * rho * h * k + nu * ors).sqrt()
hkrn = h * k + rho * nu
hkn = h * k - nu
bvt = (
torch.atan2(
-snu * (hkn * qhrk + (h + k) * hkrn),
hkn * hkrn - nu * (h + k) * qhrk,
)
/ 6.283185307179586
)
idx = bvt < -1e-8
bvt[idx] += 1.0
hhh = h / (6.283185307179586 * snu * (1.0 + h.square() / nu))
kkk = k / (6.283185307179586 * snu * (1.0 + k.square() / nu))
btnchk, btnckh = xnhk.sqrt(), xnkh.sqrt()
btpdhk, btpdkh = btnchk * 1.0, btnckh * 1.0
if (nu - 1.0) / 2.0 >= 1.0:
for j in torch.arange(start=1, end=(nu - 1.0) / 2.0 + 1e-5, step=1):
bvt += hhh * (1.0 + ks * btnckh) + kkk * (1.0 + hs * btnchk)
hhh *= j / ((j + 0.5) * (1.0 + h.square() / nu))
kkk *= j / ((j + 0.5) * (1.0 + k.square() / nu))
btpdhk *= (j - 0.5) * (1.0 - xnhk) / j
btnchk += btpdhk
btpdkh *= (j - 0.5) * (1.0 - xnkh) / j
btnckh += btpdkh
return bvt
[docs]
def debye1(x: float) -> float:
"""computes the Debye function of order 1.
https://github.com/openturns/openturns/blob/b5797d7e4a71c71faf86df51f26ad0d8d551ad08/lib/src/Base/Func/SpecFunc/Debye.cxx
:param x: upper limit of the integral
:type x: float
:return: Debye function of order 1; 0 if x<=0
:rtype: float
"""
if x < 0:
return 0
elif x >= 3:
# ! scalar loop faster than short np.array
k_max = [0, 0, 0, 13, 10, 8, 7, 6, 5, 5, 4, 4, 4, 3][max(min(int(x), 13), 0)]
res = 1.64493406684822643647241516665
for k in range(1, 1 + k_max):
xk = x * k
res -= math.exp(-xk) * (1.0 / xk + 1.0 / xk**2) * x**2
res /= x
else:
koeff = (
0.0,
1.289868133696452872944830333292,
1.646464674222763830320073930823e-01,
3.468612396889827942903585958184e-02,
8.154712395888678757370477017305e-03,
1.989150255636170674291917800638e-03,
4.921731066160965972759960954793e-04,
1.224962701174096585170902102707e-04,
3.056451881730374346514297527344e-05,
7.634586529999679712923289243879e-06,
1.907924067745592226304077366899e-06,
4.769010054554659800072963735060e-07,
1.192163781025189592248804158716e-07,
2.980310965673008246931701326140e-08,
7.450668049576914109638408036805e-09,
1.862654864839336365743529470042e-09,
4.656623667353010984002911951881e-10,
1.164154417580540177848737197821e-10,
2.910384378208396847185926449064e-11,
7.275959094757302380474472711747e-12,
1.818989568052777856506623677390e-12,
4.547473691649305030453643155957e-13,
1.136868397525517121855436593505e-13,
2.842170965606321353966861428348e-14,
7.105427382674227346596939068119e-15,
1.776356842186163180619218277278e-15,
4.440892101596083967998640188409e-16,
1.110223024969096248744747318102e-16,
2.775557561945046552567818981300e-17,
6.938893904331845249488542992219e-18,
1.734723476023986745668411013469e-18,
4.336808689994439570027820336642e-19,
1.084202172491329082183740080878e-19,
2.710505431220232916297046799365e-20,
6.776263578041593636171406200902e-21,
1.694065894509399669649398521836e-21,
4.235164736272389463688418879636e-22,
1.058791184067974064762782460584e-22,
2.646977960169798160618902050189e-23,
6.617444900424343177893912768629e-24,
1.654361225106068880734221123349e-24,
4.135903062765153408791935838694e-25,
1.033975765691286264082026643327e-25,
2.584939414228213340076225223666e-26,
6.462348535570530772269628236053e-27,
1.615587133892632406631747637268e-27,
4.038967834731580698317525293132e-28,
1.009741958682895139216954234507e-28,
2.524354896707237808750799932127e-29,
6.310887241768094478219682436680e-30,
1.577721810442023614704107565240e-30,
3.944304526105059031370476640000e-31,
9.860761315262647572437533499000e-32,
2.465190328815661892443976898000e-32,
6.162975822039154730370601500000e-33,
1.540743955509788682510501190000e-33,
3.851859888774471706184973900000e-34,
9.629649721936179265360991000000e-35,
2.407412430484044816328953000000e-35,
6.018531076210112040809600000000e-36,
1.504632769052528010200750000000e-36,
3.761581922631320025497600000000e-37,
9.403954806578300063715000000000e-38,
2.350988701644575015901000000000e-38,
5.877471754111437539470000000000e-39,
1.469367938527859384580000000000e-39,
3.673419846319648458500000000000e-40,
9.183549615799121117000000000000e-41,
2.295887403949780249000000000000e-41,
5.739718509874450320000000000000e-42,
1.434929627468612270000000000000e-42,
)
x2pi = x * 0.159154943091895335768883763373 # 1/(2pi)
res, k = 0.0, 1
while k <= 69:
res_0 = res
res += (koeff[k] + 2.0) * x2pi ** (k * 2.0) / (k * 2.0 + 1.0)
k += 1
res -= (koeff[k] + 2.0) * x2pi ** (k * 2.0) / (k * 2.0 + 1.0)
if abs(res - res_0) < 1e-7:
break
else:
k += 1
res += 1.0 - x / 4.0
return res
[docs]
def solve_ITP(
f: callable,
a: float,
b: float,
eps_2: float = 1e-9,
n_0: int = 1,
k_1: float = 0.2,
k_2: float = 2.0,
j_max: int = 31,
) -> float:
"""Solve an arbitrary function for a zero-crossing.
Oliveira, I.F. and Takahashi, R.H., 2020.
An enhancement of the bisection method average performance preserving minmax optimality.
ACM Transactions on Mathematical Software (TOMS), 47(1), pp.1-24.
https://docs.rs/kurbo/0.8.1/kurbo/common/fn.solve_itp.html
https://en.wikipedia.org/wiki/ITP_method
! It is assumed that f(a) < 0 and f(b) > 0, otherwise unexpected results may occur.
The ITP method has tuning parameters. This implementation hardwires k2 to 2.0,
both because it avoids an expensive floating point exponentiation,
and because this value has been tested to work well with curve fitting problems.
The n0 parameter controls the relative impact of the bisection and secant components.
When it is 0, the number of iterations is guaranteed to be no more than the number required by bisection
(thus, this method is strictly superior to bisection). However, when the function is smooth,
a value of 1 gives the secant method more of a chance to engage, so the average number of iterations is likely lower,
though there can be one more iteration than bisection in the worst case.
The k1 parameter is harder to characterize, and interested users are referred to the paper,
as well as encouraged to do empirical testing. To match the the paper, a value of 0.2 / (b - a) is suggested,
and this is confirmed to give good results. When the function is monotonic,
the returned result is guaranteed to be within epsilon of the zero crossing.
"""
y_a, y_b, x_wid = f(a), f(b), b - a
n_max = n_0 + math.ceil(math.log2(x_wid / eps_2))
for j in range(1, j_max + 1):
# Calculating Parameters
if x_wid < eps_2:
break
x_half = (a + b) / 2.0
rho = eps_2 * 2.0 ** (n_max - j) - x_wid / 2.0
delta = k_1 * x_wid**k_2
# Interpolation
x_f = (y_b * a - y_a * b) / (y_b - y_a)
# Truncation
tmp = x_half - x_f
sign = tmp / abs(tmp)
x_t = x_half if delta > abs(tmp) else x_f + sign * delta
# Projection
x_ITP = x_t if (rho >= abs(x_t - x_half)) else x_half - sign * rho
# Updating interval
y_ITP = f(x_ITP)
if y_ITP > 0.0:
b, y_b = x_ITP, y_ITP
elif y_ITP < 0.0:
a, y_a = x_ITP, y_ITP
else:
return x_ITP
x_wid = b - a
return (a + b) / 2.0
[docs]
def ref_count_hfunc(
dct_tree: dict,
tpl_first_vs: tuple[tuple[int, frozenset]] = tuple(),
tpl_sim: tuple[int] = tuple(),
) -> tuple[dict, tuple[int], int]:
"""reference counting for each data vertex during cond-sim workflow,
for garbage collection (memory release) and source vertices selection;
1. when len(tpl_sim) < num_dim: vertices not in tpl_sim are set on the top lv, vertices in tpl_sim are set at deepest lvs
2. when len(tpl_sim) == num_dim: check dct_first_vs to move vertices up
:param dct_tree: dct_tree inside a DataVineCop object, of the form {lv: {(v_l, v_r, s): bidep_func}}
:type dct_tree: dict
:param tpl_first_vs: tuple of vertices (explicitly arranged in conditioned - conditioning set)
that are taken as known at the beginning of a simulation workflow,
only used when len(tpl_sim)==num_dim, defaults to tuple()
:type tpl_first_vs: tuple[tuple[int, frozenset]], optional
:param tpl_sim: tuple of vertices in a full simulation workflow,
gives flexibility to experienced users, defaults to tuple()
:type tpl_sim: tuple[int], optional
:return: number of visits for each vertex; tuple of source vertices in this simulation workflow
from shallowest to deepest; number of hfunc calls
:rtype: tuple[dict, tuple[int], int]
"""
def _visit(v_down: int, s_down: frozenset, is_hinv: bool = False):
"""recursively visits vertices and update their reference counts"""
# * locate the bicop on upper level that generates this pseudo obs
lv_up = len(s_down) - 1
s_bcp = {v_down} | s_down
for (v_l, v_r, s_up), _ in dct_tree[lv_up].items():
if ({v_l, v_r} | s_up) == s_bcp:
break
# * check missing parents
if is_hinv:
l_visit = [(v_l if v_down == v_r else v_r, s_up)]
else:
l_visit = [(v_l, s_up), (v_r, s_up)]
for v, s in l_visit:
# can be tensor or None
if dct_ref_count.get((v, s), None) is None:
_visit(v_down=v, s_down=s, is_hinv=False)
# ! call hfunc only when parent missing
num_hfunc[0] += 1
# * increment reference counts for all three vertices
l_visit = [(v_l, s_up), (v_r, s_up), (v_down, s_down)]
for v, s in l_visit:
dct_ref_count[v, s] = dct_ref_count.get((v, s), 0) + 1
if is_hinv:
return v_down, s_up
if not len(tpl_sim):
raise ValueError("tpl_sim must be a non-empty tuple")
num_dim = max(dct_tree) + 2
if len(tpl_sim) < num_dim:
# ! when len(tpl_sim) < num_dim:
# ! vertices not in tpl_sim are set on the top lv, vertices in tpl_sim are set at deepest lv
s_cond = frozenset(range(num_dim)) - frozenset(tpl_sim)
tpl_sim = tuple(
(v, s_cond | frozenset(tpl_sim[idx + 1 :])) for idx, v in enumerate(tpl_sim)
) + tuple((v, frozenset()) for v in s_cond)
else:
# ! when len(tpl_sim) == num_dim:
# ! check dct_first_vs to move vertices up
dct_first_vs = {v_s_cond[0]: v_s_cond for v_s_cond in tpl_first_vs}
tpl_sim = tuple(
dct_first_vs.get(v, (v, frozenset(tpl_sim[(idx + 1) :])))
for idx, v in enumerate(tpl_sim)
)
tpl_sim = tpl_sim[::-1]
# ! count in initial sim (pseudo obs that are given at the beginning of each sim path)
dct_ref_count = {_: 1 for _ in tpl_sim}
# * mutable
num_hfunc = [0]
# * process each source vertex from the shallowest to the deepest
for v, s in tpl_sim:
while s:
v, s = _visit(v_down=v, s_down=s, is_hinv=True)
return dct_ref_count, tpl_sim, num_hfunc[0]