# Author: Yue Zhao <zhaoy@cmu.edu>
# License: MIT
from sklearn.base import clone
import numpy as np
from scipy.stats import rankdata
from joblib import effective_n_jobs
from sklearn.utils import check_array
from sklearn.utils.validation import check_is_fitted
from pyod.utils.utility import score_to_label
from .jl_projection import jl_fit_transform, jl_transform
from ..utils.utility import raw_score_to_proba
[docs]
def indices_to_one_hot(data, nb_classes):
"""Convert an iterable of indices to one-hot encoded labels."""
targets = np.array(data).reshape(-1)
return np.eye(nb_classes)[targets]
[docs]
def balanced_scheduling(time_cost_pred, n_estimators, n_jobs, verbose=False):
"""Conduct balanced scheduling based on the sum of rank, for both train
and prediction. The algorithm will enforce the equal sum of ranks among
workers.
Parameters
----------
time_cost_pred : list
The list of time cost by the cost predictor. The length is equal to
the number of base detectors.
n_estimators : int
The number of base estimators.
n_jobs : optional (default=1)
The number of jobs to run in parallel for both `fit` and
`predict`. If -1, then the number of jobs is set to the
number of cores.
verbose : bool, optional (default=False)
Controls the verbosity of the building process.
Returns
-------
n_estimators_list : list
The number of estimators for each worker
starts : list
The actual index of base detectors to be scheduled. For instance,
starts[k, k+1] base detectors will be assigned to worker k.
n_jobs :
The actual usable number of jobs to run in parallel.
"""
# get the number of usable workers
n_jobs = min(effective_n_jobs(n_jobs), n_estimators)
# conduct Balanced Task Scheduling
n_estimators_list = [] # track the number of estimators for each worker
ranks = rankdata(time_cost_pred)
##########################################
# #todo: the fastest is at most 2 times costly than the slowest
# enable a parameter for the rank strength
ranks = 1 + ranks / n_estimators
##########################################
rank_sum = np.sum(ranks)
chunk_sum = rank_sum / n_jobs
starts_orig = [0]
index_track = 0
sum_check = []
for i in range(len(ranks) + 1):
if np.sum(ranks[starts_orig[index_track]:i]) >= chunk_sum:
starts_orig.append(i)
index_track += 1
starts_orig.append(len(ranks))
starts = starts_orig
# # offset for the last worker's load
# starts = [0]
# for k in range(1, n_jobs+1):
# starts.append(starts_orig[k]-np.random.randint(low=1, high=k+1))
# print(starts)
# starts[-1] = n_estimators
# print(starts)
for j in range(n_jobs):
sum_check.append(np.sum(ranks[starts[j]:starts[j + 1]]))
if verbose:
print('Worker', j + 1, 'sum of ranks:', sum_check[j])
n_estimators_list.append(starts[j + 1] - starts[j])
print()
# Confirm the length of the estimators is consistent
assert (np.sum(n_estimators_list) == n_estimators)
assert (np.abs(rank_sum - np.sum(sum_check)) < 0.1)
xdiff = [starts[n] - starts[n - 1] for n in range(1, len(starts))]
if verbose:
print("Split among workers BPS:", starts, xdiff)
return n_estimators_list, starts, n_jobs
def _partition_estimators(n_estimators, n_jobs, verbose=False):
"""Private function used to partition estimators between jobs.
"""
# Compute the number of jobs
n_jobs = min(effective_n_jobs(n_jobs), n_estimators)
# Partition estimators between jobs
n_estimators_per_job = np.full(n_jobs, n_estimators // n_jobs,
dtype=int)
n_estimators_per_job[:n_estimators % n_jobs] += 1
starts = np.cumsum(n_estimators_per_job)
xdiff = [starts[n] - starts[n - 1] for n in range(1, len(starts))]
if verbose:
print("Split among workers default:", starts, xdiff)
return n_estimators_per_job.tolist(), [0] + starts.tolist(), n_jobs
def _parallel_fit(n_estimators, clfs, X, total_n_estimators,
rp_flags, objective_dim, rp_method, verbose):
X = check_array(X)
# Build estimators
estimators = []
rp_transformers = []
for i in range(n_estimators):
estimator = clone(clfs[i])
if verbose > 1:
print("Building estimator %d of %d for this parallel run "
"(total %d)..." % (i + 1, n_estimators, total_n_estimators))
if rp_flags[i] == 1:
X_scaled, jlt_transformer = jl_fit_transform(X, objective_dim,
rp_method)
rp_transformers.append(jlt_transformer)
estimator.fit(X_scaled)
estimators.append(estimator)
else:
# if projection is not used, use an identity matrix to keep the shape
rp_transformers.append(np.ones([X.shape[1], X.shape[1]]))
estimator.fit(X)
estimators.append(estimator)
return estimators, rp_transformers
def _parallel_predict(n_estimators, clfs, approximators, X, total_n_estimators,
rp_transformers, approx_flags, contamination, verbose):
X = check_array(X)
pred = []
for i in range(n_estimators):
estimator = clfs[i]
if verbose > 1:
print("predicting with estimator %d of %d for this parallel run "
"(total %d)..." % (i + 1, n_estimators, total_n_estimators))
# project matrix
X_scaled = jl_transform(X, rp_transformers[i])
# turn approximator scores to labels by outlier
if approx_flags[i] == 1:
predicted_labels = score_to_label(
approximators[i].predict(X_scaled),
outliers_fraction=contamination)
else:
predicted_labels = estimator.predict(X_scaled)
pred.append(predicted_labels)
return pred
def _parallel_decision_function(n_estimators, clfs, approximators, X,
total_n_estimators, rp_transformers,
approx_flags, verbose):
X = check_array(X)
pred = []
for i in range(n_estimators):
estimator = clfs[i]
if verbose > 1:
print("predicting with estimator %d of %d for this parallel run "
"(total %d)..." % (i + 1, n_estimators, total_n_estimators))
# project matrix
X_scaled = jl_transform(X, rp_transformers[i])
# turn approximator scores to labels by outlier
if approx_flags[i] == 1:
predicted_scores = approximators[i].predict(X_scaled)
else:
predicted_scores = estimator.decision_function(X_scaled)
pred.append(predicted_scores)
return pred
def _parallel_predict_proba(n_estimators, clfs, approximators, X,
total_n_estimators, rp_transformers,
approx_flags, verbose):
X = check_array(X)
pred = []
for i in range(n_estimators):
estimator = clfs[i]
if verbose > 1:
print("predicting with estimator %d of %d for this parallel run "
"(total %d)..." % (i + 1, n_estimators, total_n_estimators))
# project matrix
X_scaled = jl_transform(X, rp_transformers[i])
# turn approximator scores to labels by outlier
if approx_flags[i] == 1:
raw_scores = approximators[i].predict(X_scaled)
predicted_scores = raw_score_to_proba(estimator.decision_scores_,
raw_scores)
else:
predicted_scores = estimator.predict_proba(X_scaled)
pred.append(predicted_scores[:, 1])
# pred.append(predicted_scores)
return pred
def _parallel_approx_estimators(n_estimators, clfs, X, total_n_estimators,
approx_flags, approximator, rp_transformers,
verbose):
"""
Parameters
----------
n_estimators
clfs
X
total_n_estimators
approx_flags
approximator
verbose
Returns
-------
"""
X = check_array(X)
# Build estimators
approximators = []
# TODO: approximators can be different
for i in range(n_estimators):
# project matrix
X_scaled = jl_transform(X, rp_transformers[i])
estimator = clfs[i]
check_is_fitted(estimator, ['decision_scores_'])
if verbose > 1:
print("Building estimator %d of %d for this parallel run "
"(total %d)..." % (i + 1, n_estimators, total_n_estimators))
if approx_flags[i] == 1:
# operate on the reduce space
pseudo_scores = estimator.decision_scores_
# pseudo_scores = estimator.decision_function(X)
# use the same type of approximator for all models
base_approximater = clone(approximator)
base_approximater.fit(X_scaled, pseudo_scores)
approximators.append(base_approximater)
else:
approximators.append(None)
return approximators
idx_clf_mapping = {
1: 'ABOD',
2: 'CBLOF',
3: 'FeatureBagging',
4: 'HBOS',
5: 'IForest',
6: 'KNN',
7: 'LOF',
8: 'MCD',
9: 'OCSVM',
10: 'PCA',
11: 'UNK'
}
clf_idx_mapping = {
'ABOD': 1,
'CBLOF': 2,
'FeatureBagging': 3,
'HBOS': 4,
'IForest': 5,
'KNN': 6,
'LOF': 7,
'MCD': 8,
'OCSVM': 9,
'PCA': 10,
'UNK': 11
}