freemt
Update before sent-align
4c04f50
raw
history blame
5.35 kB
"""Gne pset from cmat. Find pairs for a given cmat.
tinybee.find_pairs.py with fixed estimator='dbscan' eps=eps, min_samples=min_samples
"""
# pylint: disable=too-many-locals, unused-import, invalid-name
from typing import List, Tuple, Union
import numpy as np
import pandas as pd
from sklearn.cluster import DBSCAN
import logzero
from logzero import logger
from radiobee.cmat2tset import cmat2tset
from radiobee.interpolate_pset import interpolate_pset
def _gen_pset(
cmat1: Union[List[List[float]], np.ndarray, pd.DataFrame],
eps: float = 10,
min_samples: int = 6,
delta: float = 7,
verbose: Union[bool, int] = False,
# ) -> List[Tuple[int, int, Union[float, str]]]:
) -> List[Tuple[Union[float, str], Union[float, str], Union[float, str]]]:
"""Gen pset from cmat.
Find pairs for a given cmat.
Args:
cmat: correlation/similarity matrix
eps: min epsilon for DBSCAN (10)
min_samples: minimum # of samples for DBSCAN (6)
delta: tolerance (7)
Returns:
pairs + "" or metric (float)
dbscan_pairs' setup
if eps is None:
eps = src_len * .01
if eps < 3:
eps = 3
if min_samples is None:
min_samples = tgt_len / 100 * 0.5
if min_samples < 3:
min_samples = 3
def gen_eps_minsamples(src_len, tgt_len):
eps = src_len * .01
if eps < 3:
eps = 3
min_samples = tgt_len / 100 * 0.5
if min_samples < 3:
min_samples = 3
return {"eps": eps, "min_samples": min_samples}
"""
if isinstance(verbose, bool):
if verbose:
verbose = 10
else:
verbose = 20
logzero.loglevel(verbose)
# if isinstance(cmat, list):
cmat = np.array(cmat1)
src_len, tgt_len = cmat.shape
# tset = cmat2tset(cmat)
tset = cmat2tset(cmat).tolist()
logger.debug("tset: %s", tset)
# iset = gen_iset(cmat, verbose=verbose, estimator=estimator)
labels = DBSCAN(eps=eps, min_samples=min_samples).fit(tset).labels_
df_tset = pd.DataFrame(tset, columns=["x", "y", "cos"])
cset = df_tset[labels > -1].to_numpy()
# sort cset
_ = sorted(cset.tolist(), key=lambda x: x[0])
iset = interpolate_pset(_, tgt_len)
# *_, ymax = zip(*tset)
# ymax = list(ymax)
# low_ = np.min(ymax) - 1 # reset to minimum_value - 1
buff = [(-1, -1, ""), (tgt_len, src_len, "")]
# for idx, tset_elm in enumerate(tset):
for tset_elm in tset:
logger.debug("buff: %s", buff)
# postion max in ymax and insert in buff
# if with range given by iset+-delta and
# it's valid (do not exceed constraint
# by neighboring points
# argmax = int(np.argmax(ymax))
# logger.debug("=== %s,%s === %s", _, argmax, tset[_])
logger.debug("=== %s === %s", _, tset_elm)
# ymax[_] = low_
# elm = tset[argmax]
# elm0, *_ = elm
elm0, *_ = tset_elm
# position elm in buff
idx = -1 # for making pyright happy
for idx, loc in enumerate(buff):
if loc[0] > elm0:
break
else:
idx += 1 # last
# insert elm in for valid elm
# (within range inside two neighboring points)
# pos = int(tset[argmax][0])
pos = int(tset_elm[0])
logger.debug(" %s <=> %s ", tset_elm, iset[pos])
# if abs(tset[argmax][1] - iset[pos][1]) <= delta:
if abs(tset_elm[1] - iset[pos][1]) <= delta:
if tset_elm[1] > buff[idx - 1][1] and tset_elm[1] < buff[idx][1]:
buff.insert(idx, tset_elm)
logger.debug("idx: %s, tset_elm: %s", idx, tset_elm)
else:
logger.debug("\t***\t idx: %s, tset_elm: %s", idx, tset_elm)
_ = """
if abs(tset[loc][1] - iset[loc][1]) <= delta:
if tset[loc][1] > buff[idx][1] and tset[loc][1] < buff[idx + 1][1]:
buff.insert(idx + 1, tset[loc])
# """
# remove first and last entry in buff
buff.pop(0)
buff.pop()
# return [(1, 1, "")]
return [(int(elm0), int(elm1), elm2) for elm0, elm1, elm2 in buff]
def gen_pset(
cmat1: Union[List[List[float]], np.ndarray, pd.DataFrame],
eps: float = 10,
min_samples: int = 6,
delta: float = 7,
verbose: Union[bool, int] = False,
) -> List[Tuple[Union[float, str], Union[float, str], Union[float, str]]]:
"""Gen pset.
Refer to _gen_pset.
"""
del verbose
gen_pset.min_samples = min_samples
for min_s in range(min_samples):
logger.debug(" min_samples, try %s", min_samples - min_s)
try:
pset = _gen_pset(
cmat1,
eps=eps,
min_samples=min_samples - min_s,
delta=delta,
)
break
except ValueError:
logger.debug(" decrease min_samples by %s", min_s + 1)
continue
except Exception as e:
logger.error(e)
continue
else:
# break should happen above when min_samples = 2
raise Exception("bummer, this shouldn't happen, probably another bug")
# store new min_samples
gen_pset.min_samples = min_samples - min_s
return pset