Module hipe4ml.analysis_utils
Module containing the analysis utils.
Expand source code
"""
Module containing the analysis utils.
"""
import numpy as np
import pandas as pd
from scipy.interpolate import InterpolatedUnivariateSpline
from sklearn.preprocessing import label_binarize
from sklearn.model_selection import train_test_split
def bdt_efficiency_array(y_truth, y_score, n_points=50, keep_lower=False):
"""
Calculate the model efficiency as a function of the score
threshold. The candidates for each class should be labeled
with 0, ..., N. In case of binary classification, 0 should
correspond to the background while 1 to the signal
Parameters
------------------------------------------------
y_truth: array
Training or test set labels. The candidates for each
class should be labeled with 0, ..., N.
In case of binary classification, 0 should
correspond to the background while 1 to the signal
y_score: array
Estimated probabilities or decision function.
n_points: int
Number of points to be sampled
keep_lower: bool
If True compute the efficiency using the candidates with
score lower than the score threshold; otherwise using the
candidates with score higher than the score threshold.
Returns
------------------------------------------------
out: numpy array
Efficiency as a function of the threshold value
Numpy array of numpy arrays in case of multi-classification
out: numpy array
Threshold values
"""
operator = np.greater
if keep_lower:
operator = np.less
# get number of classes
n_classes = len(np.unique(y_truth))
min_score = np.min(y_score)
max_score = np.max(y_score)
threshold = np.linspace(min_score, max_score, n_points)
if n_classes <= 2:
n_sig = np.sum(y_truth)
efficiency = np.empty((0, n_points))
for thr in threshold:
n_sig_selected = np.sum(y_truth[operator(y_score, thr)])
efficiency = np.append(efficiency, [n_sig_selected/n_sig])
efficiencies = efficiency
else:
efficiencies = []
for clas in range(n_classes):
y_truth_multi = label_binarize(y_truth, classes=range(n_classes))
# considering signal only the class for the same BDT output
n_sig = np.sum(y_truth_multi[:, clas])
efficiency = np.empty((0, n_points))
for thr in threshold:
n_sig_selected = np.sum(
y_truth_multi[:, clas][operator(y_score[:, clas], thr)])
efficiency = np.append(efficiency, [n_sig_selected/n_sig])
efficiencies.append(efficiency)
efficiencies = np.array(efficiencies)
return efficiencies, threshold
def score_from_efficiency_array(y_truth, y_score, efficiency_selected, keep_lower=False):
"""
Return the score array corresponding to an external fixed efficiency
array.
Parameters
-----------------------------------------------
y_truth: array
Training or test set labels. The candidates for each
class should be labeled with 0, ..., N.
In case of binary classification, 0 should
correspond to the background while 1 to the signal
y_score: array
Estimated probabilities or decision function.
keep_lower: bool
If True compute the efficiency using the candidates with
score lower than the score threshold; otherwise using the
candidates with score higher than the score threshold.
efficiency_selected: list or array
Efficiency array along which calculate
the corresponding score array
Returns
-----------------------------------------------
out: numpy array
Score array corresponding to efficiency_selected
"""
score_list = []
eff, score = bdt_efficiency_array(
y_truth, y_score, n_points=1000, keep_lower=keep_lower)
for eff_val in efficiency_selected:
interp = InterpolatedUnivariateSpline(score, eff-eff_val)
score_list.append(interp.roots()[0])
score_array = np.array(score_list)
return score_array
def train_test_generator(data_list, labels_list, sliced_df=False, **kwds):
"""
Return a list containing respectively training set dataframe,
training label array, test set dataframe, test label array
computed from a list of TreeHandler objects. If sliced_df == True,
the method preforms the train-test split for each slice
Parameters
-----------------------------------------------
data_list: list
List of TreeHandler models. For example: if you perform binary
classification the list should contain the TreeHandlers corresponding
to the signal and the background candidates
labels_list: list
List containing the labels associated to each DataHandler. For example:
if you perform binary classification, the list should be [1,0]
sliced_df: bool
If True, the function searches for the slices stored in the DataHandler
and perform the train_test_split for each slice
**kwds
Extra arguments are passed on to sklearn.model_selection.train_test_split:
https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html
Returns
-----------------------------------------------
out: list
List containing respectively training set dataframe,
training label array, test set dataframe, test label array. If sliced_df==True
returns a list containing the above cited list for each slice
"""
if sliced_df is False:
labels_train_test = []
df_list = []
for data, labels in zip(data_list, labels_list):
data_frame = data.get_data_frame()
labels_train_test += len(data_frame)*[labels]
df_list.append(data_frame)
del data_list, data_frame
df_tot_train_test = pd.concat(df_list, sort=True)
del df_list
train_test = train_test_split(df_tot_train_test, np.array(labels_train_test), **kwds)
# swap for ModelHandler compatibility
train_test[1], train_test[2] = train_test[2], train_test[1]
return train_test
train_test_slices = []
n_slices = len(data_list[0].get_projection_binning())
for slice_ind in range(n_slices):
labels_train_test = []
df_list = []
for data, labels in zip(data_list, labels_list):
data_frame = data.get_slice(slice_ind)
labels_train_test += len(data_frame)*[labels]
df_list.append(data_frame)
del data_frame
df_tot_train_test = pd.concat(df_list, sort=True)
del df_list
train_test = train_test_split(df_tot_train_test, np.array(labels_train_test), **kwds)
train_test[1], train_test[2] = train_test[2], train_test[1]
train_test_slices.append(train_test)
return train_test_slices
Functions
def bdt_efficiency_array(y_truth, y_score, n_points=50, keep_lower=False)
-
Calculate the model efficiency as a function of the score threshold. The candidates for each class should be labeled with 0, …, N. In case of binary classification, 0 should correspond to the background while 1 to the signal
Parameters
y_truth
:array
- Training or test set labels. The candidates for each class should be labeled with 0, …, N. In case of binary classification, 0 should correspond to the background while 1 to the signal
y_score
:array
- Estimated probabilities or decision function.
n_points
:int
- Number of points to be sampled
keep_lower
:bool
- If True compute the efficiency using the candidates with score lower than the score threshold; otherwise using the candidates with score higher than the score threshold.
Returns
out
:numpy array
- Efficiency as a function of the threshold value Numpy array of numpy arrays in case of multi-classification
out
:numpy array
- Threshold values
Expand source code
def bdt_efficiency_array(y_truth, y_score, n_points=50, keep_lower=False): """ Calculate the model efficiency as a function of the score threshold. The candidates for each class should be labeled with 0, ..., N. In case of binary classification, 0 should correspond to the background while 1 to the signal Parameters ------------------------------------------------ y_truth: array Training or test set labels. The candidates for each class should be labeled with 0, ..., N. In case of binary classification, 0 should correspond to the background while 1 to the signal y_score: array Estimated probabilities or decision function. n_points: int Number of points to be sampled keep_lower: bool If True compute the efficiency using the candidates with score lower than the score threshold; otherwise using the candidates with score higher than the score threshold. Returns ------------------------------------------------ out: numpy array Efficiency as a function of the threshold value Numpy array of numpy arrays in case of multi-classification out: numpy array Threshold values """ operator = np.greater if keep_lower: operator = np.less # get number of classes n_classes = len(np.unique(y_truth)) min_score = np.min(y_score) max_score = np.max(y_score) threshold = np.linspace(min_score, max_score, n_points) if n_classes <= 2: n_sig = np.sum(y_truth) efficiency = np.empty((0, n_points)) for thr in threshold: n_sig_selected = np.sum(y_truth[operator(y_score, thr)]) efficiency = np.append(efficiency, [n_sig_selected/n_sig]) efficiencies = efficiency else: efficiencies = [] for clas in range(n_classes): y_truth_multi = label_binarize(y_truth, classes=range(n_classes)) # considering signal only the class for the same BDT output n_sig = np.sum(y_truth_multi[:, clas]) efficiency = np.empty((0, n_points)) for thr in threshold: n_sig_selected = np.sum( y_truth_multi[:, clas][operator(y_score[:, clas], thr)]) efficiency = np.append(efficiency, [n_sig_selected/n_sig]) efficiencies.append(efficiency) efficiencies = np.array(efficiencies) return efficiencies, threshold
def score_from_efficiency_array(y_truth, y_score, efficiency_selected, keep_lower=False)
-
Return the score array corresponding to an external fixed efficiency array.
Parameters
y_truth
:array
- Training or test set labels. The candidates for each class should be labeled with 0, …, N. In case of binary classification, 0 should correspond to the background while 1 to the signal
y_score
:array
- Estimated probabilities or decision function.
keep_lower
:bool
- If True compute the efficiency using the candidates with score lower than the score threshold; otherwise using the candidates with score higher than the score threshold.
efficiency_selected
:list
orarray
- Efficiency array along which calculate the corresponding score array
Returns
out
:numpy array
- Score array corresponding to efficiency_selected
Expand source code
def score_from_efficiency_array(y_truth, y_score, efficiency_selected, keep_lower=False): """ Return the score array corresponding to an external fixed efficiency array. Parameters ----------------------------------------------- y_truth: array Training or test set labels. The candidates for each class should be labeled with 0, ..., N. In case of binary classification, 0 should correspond to the background while 1 to the signal y_score: array Estimated probabilities or decision function. keep_lower: bool If True compute the efficiency using the candidates with score lower than the score threshold; otherwise using the candidates with score higher than the score threshold. efficiency_selected: list or array Efficiency array along which calculate the corresponding score array Returns ----------------------------------------------- out: numpy array Score array corresponding to efficiency_selected """ score_list = [] eff, score = bdt_efficiency_array( y_truth, y_score, n_points=1000, keep_lower=keep_lower) for eff_val in efficiency_selected: interp = InterpolatedUnivariateSpline(score, eff-eff_val) score_list.append(interp.roots()[0]) score_array = np.array(score_list) return score_array
def train_test_generator(data_list, labels_list, sliced_df=False, **kwds)
-
Return a list containing respectively training set dataframe, training label array, test set dataframe, test label array computed from a list of TreeHandler objects. If sliced_df == True, the method preforms the train-test split for each slice
Parameters
data_list
:list
- List of TreeHandler models. For example: if you perform binary classification the list should contain the TreeHandlers corresponding to the signal and the background candidates
labels_list
:list
- List containing the labels associated to each DataHandler. For example: if you perform binary classification, the list should be [1,0]
sliced_df
:bool
- If True, the function searches for the slices stored in the DataHandler and perform the train_test_split for each slice
**kwds
- Extra arguments are passed on to sklearn.model_selection.train_test_split: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html
Returns
out
:list
- List containing respectively training set dataframe, training label array, test set dataframe, test label array. If sliced_df==True returns a list containing the above cited list for each slice
Expand source code
def train_test_generator(data_list, labels_list, sliced_df=False, **kwds): """ Return a list containing respectively training set dataframe, training label array, test set dataframe, test label array computed from a list of TreeHandler objects. If sliced_df == True, the method preforms the train-test split for each slice Parameters ----------------------------------------------- data_list: list List of TreeHandler models. For example: if you perform binary classification the list should contain the TreeHandlers corresponding to the signal and the background candidates labels_list: list List containing the labels associated to each DataHandler. For example: if you perform binary classification, the list should be [1,0] sliced_df: bool If True, the function searches for the slices stored in the DataHandler and perform the train_test_split for each slice **kwds Extra arguments are passed on to sklearn.model_selection.train_test_split: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.train_test_split.html Returns ----------------------------------------------- out: list List containing respectively training set dataframe, training label array, test set dataframe, test label array. If sliced_df==True returns a list containing the above cited list for each slice """ if sliced_df is False: labels_train_test = [] df_list = [] for data, labels in zip(data_list, labels_list): data_frame = data.get_data_frame() labels_train_test += len(data_frame)*[labels] df_list.append(data_frame) del data_list, data_frame df_tot_train_test = pd.concat(df_list, sort=True) del df_list train_test = train_test_split(df_tot_train_test, np.array(labels_train_test), **kwds) # swap for ModelHandler compatibility train_test[1], train_test[2] = train_test[2], train_test[1] return train_test train_test_slices = [] n_slices = len(data_list[0].get_projection_binning()) for slice_ind in range(n_slices): labels_train_test = [] df_list = [] for data, labels in zip(data_list, labels_list): data_frame = data.get_slice(slice_ind) labels_train_test += len(data_frame)*[labels] df_list.append(data_frame) del data_frame df_tot_train_test = pd.concat(df_list, sort=True) del df_list train_test = train_test_split(df_tot_train_test, np.array(labels_train_test), **kwds) train_test[1], train_test[2] = train_test[2], train_test[1] train_test_slices.append(train_test) return train_test_slices