Spaces:
Runtime error
Runtime error
File size: 3,091 Bytes
fe74ce1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
import numpy as np
from scipy.sparse.csgraph import connected_components
from scipy.special import softmax
import logging
logger = logging.getLogger(__name__)
def degree_centrality_scores(
similarity_matrix,
threshold=None,
increase_power=True,
):
if not (
threshold is None
or isinstance(threshold, float)
and 0 <= threshold < 1
):
raise ValueError(
'\'threshold\' should be a floating-point number '
'from the interval [0, 1) or None',
)
if threshold is None:
markov_matrix = create_markov_matrix(similarity_matrix)
else:
markov_matrix = create_markov_matrix_discrete(
similarity_matrix,
threshold,
)
scores = stationary_distribution(
markov_matrix,
increase_power=increase_power,
normalized=False,
)
return scores
def _power_method(transition_matrix, increase_power=True, max_iter=10000):
eigenvector = np.ones(len(transition_matrix))
if len(eigenvector) == 1:
return eigenvector
transition = transition_matrix.transpose()
for _ in range(max_iter):
eigenvector_next = np.dot(transition, eigenvector)
if np.allclose(eigenvector_next, eigenvector):
return eigenvector_next
eigenvector = eigenvector_next
if increase_power:
transition = np.dot(transition, transition)
logger.warning("Maximum number of iterations for power method exceeded without convergence!")
return eigenvector_next
def connected_nodes(matrix):
_, labels = connected_components(matrix)
groups = []
for tag in np.unique(labels):
group = np.where(labels == tag)[0]
groups.append(group)
return groups
def create_markov_matrix(weights_matrix):
n_1, n_2 = weights_matrix.shape
if n_1 != n_2:
raise ValueError('\'weights_matrix\' should be square')
row_sum = weights_matrix.sum(axis=1, keepdims=True)
# normalize probability distribution differently if we have negative transition values
if np.min(weights_matrix) <= 0:
return softmax(weights_matrix, axis=1)
return weights_matrix / row_sum
def create_markov_matrix_discrete(weights_matrix, threshold):
discrete_weights_matrix = np.zeros(weights_matrix.shape)
ixs = np.where(weights_matrix >= threshold)
discrete_weights_matrix[ixs] = 1
return create_markov_matrix(discrete_weights_matrix)
def stationary_distribution(
transition_matrix,
increase_power=True,
normalized=True,
):
n_1, n_2 = transition_matrix.shape
if n_1 != n_2:
raise ValueError('\'transition_matrix\' should be square')
distribution = np.zeros(n_1)
grouped_indices = connected_nodes(transition_matrix)
for group in grouped_indices:
t_matrix = transition_matrix[np.ix_(group, group)]
eigenvector = _power_method(t_matrix, increase_power=increase_power)
distribution[group] = eigenvector
if normalized:
distribution /= n_1
return distribution
|