import numpy as np import matplotlib.pyplot as plt from matplotlib.colors import LogNorm from sklearn.neural_network import MLPClassifier from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import confusion_matrix import tensorflow as tf #We define our own class that contains a few functions which will be used in our #single track analysis. This helps to prevent 1000 lines long scripts and makes debugging much easier... #Plus, if you use a procedure more than once, it might be worse to turn it into a function with a few arguments which can be used multiple times... class analysis_utils(object): def __init__(self): data =[] #Plot correlations between the drift-chamber, calorimeter and momentum #information #*********************************************************************** def show_correlation(self,input_data,plot_conditions,plot_title,n_bins): fig,ax = plt.subplots(2,3,figsize=(25,10)) fig.subplots_adjust(wspace=0.75) fig.subplots_adjust(hspace=0.25) fig.suptitle(plot_title) drift_chamber = '' calorimeter = '' e_over_p = '' xticks_corr = np.linspace(0.0,8.0,3) #--> Set number of ticks for the x-axis in the correlation plots xticks_ep = np.linspace(0.0,2.0,3)#---> Set number of ticks for the x-axis in the e/p plot if plot_conditions is not None: #++++++++++++++++++++++++++++ for i in range(2): #i == 0: Forward part #i == 1: Central part if i == 0: drift_chamber = 'ddEdx_FDC' calorimeter = 'dE_FCAL' e_over_p = 'e_over_p_fcal' else: drift_chamber = 'ddEdx_CDC' calorimeter = 'dE_BCAL' e_over_p = 'e_over_p_bcal' ax[i,0].hist2d(input_data[i][plot_conditions[i]]['p'],input_data[i][plot_conditions[i]][drift_chamber],bins=n_bins,norm=LogNorm(),range=((0.0,8.0),(0.0,1e-5))) ax[i,0].set_xticks(xticks_corr) ax[i,1].hist2d(input_data[i][plot_conditions[i]]['p'],input_data[i][plot_conditions[i]][calorimeter],bins=n_bins,norm=LogNorm(),range=((0.0,8.0),(0.0,8.0))) ax[i,1].set_xticks(xticks_corr) ax[i,2].hist(input_data[i][plot_conditions[i]][e_over_p],n_bins,range=(0.0,2.0)) ax[i,2].set_xticks(xticks_ep) #++++++++++++++++++++++++++++ else: #++++++++++++++++++++++++++++ for i in range(2): #i == 0: Forward part #i == 1: Central part if i == 0: drift_chamber = 'ddEdx_FDC' calorimeter = 'dE_FCAL' e_over_p = 'e_over_p_fcal' else: drift_chamber = 'ddEdx_CDC' calorimeter = 'dE_BCAL' e_over_p = 'e_over_p_bcal' ax[i,0].hist2d(input_data[i]['p'],input_data[i][drift_chamber],bins=n_bins,norm=LogNorm(),range=((0.0,8.0),(0.0,1e-5))) ax[i,0].set_xticks(xticks_corr) ax[i,1].hist2d(input_data[i]['p'],input_data[i][calorimeter],bins=n_bins,norm=LogNorm(),range=((0.0,8.0),(0.0,8.0))) ax[i,1].set_xticks(xticks_corr) ax[i,2].hist(input_data[i][e_over_p],n_bins,range=(0.0,2.0)) ax[i,2].set_xticks(xticks_ep) #++++++++++++++++++++++++++++ ax[0,0].set_ylabel('dE/dx (FDC)') ax[0,1].set_ylabel('E (FCAL) ' + r'$[GeV]$') ax[1,0].set_xlabel('Momentum ' + r'$[GeV/c]$') ax[1,0].set_ylabel('dE/dx (CDC)') ax[1,1].set_xlabel('Momentum ' + r'$[GeV/c]$') ax[1,1].set_ylabel('E (BCAL) ' + r'$[GeV]$') ax[0,2].set_ylabel('Entries [a.u.]') ax[1,2].set_ylabel('Entries [a.u.]') ax[1,2].set_xlabel('E/p') return fig,ax #*********************************************************************** #Get particle identification (pid) criteria: #*********************************************************************** def get_pid_criteria(self,input_data,e_over_p_min,e_over_p_max,fdc_min,fdc_max,cdc_min,cdc_max): criteria_forward = (input_data[0]['e_over_p_fcal'] > e_over_p_min) & (input_data[0]['e_over_p_fcal'] < e_over_p_max) & (input_data[0]['ddEdx_FDC'] > fdc_min) & (input_data[0]['ddEdx_FDC'] < fdc_max) criteria_central = (input_data[1]['e_over_p_bcal'] > e_over_p_min) & (input_data[1]['e_over_p_bcal'] < e_over_p_max) & (input_data[1]['ddEdx_CDC'] > cdc_min) & (input_data[1]['ddEdx_CDC'] < cdc_max) return criteria_forward, criteria_central #*********************************************************************** #Set up a neural network (scikit): #*********************************************************************** def set_scikit_mlp_classifier(self,architecture,act_func,n_epochs,val_split,init_learn_rate): my_mlp = MLPClassifier( hidden_layer_sizes=architecture, #one hidden layer with 10 neurons activation=act_func, #rectified linear unit function solver='sgd', #stochastic gradient descent optimizer #--> to minimize the error warm_start=True, max_iter = n_epochs, #maximum number of learning epochs shuffle=True, #shuffle the data validation_fraction=val_split, early_stopping=True, random_state=0, learning_rate_init = init_learn_rate #step size for the gradient ) return my_mlp #*********************************************************************** #Set up keras network: #*********************************************************************** def set_keras_mlp_classifier(self,n_inputs,n_outputs,architecture,act_func_hl,act_func_out,init_learn_rate,optimizer='',loss_function='',print_summary=False): #Set up the sequential model: my_mlp = tf.keras.Sequential(name='keras_mlp') #Define the input layer: input_layer = tf.keras.layers.Input(shape=(n_inputs,),name='input') my_mlp.add(input_layer) #Set hidden layers: n_hidden_layers = len(architecture) #+++++++++++++++++++++++++++++ for l in range(n_hidden_layers): hl_name = 'dense_' + str(l) current_layer = tf.keras.layers.Dense(architecture[l],act_func_hl[l],name=hl_name) my_mlp.add(current_layer) #+++++++++++++++++++++++++++++ #Set the output: output_layer = tf.keras.layers.Dense(n_outputs,act_func_out,name='output') my_mlp.add(output_layer) #Define the optimizer (default is sgd): mlp_optimizer = tf.keras.optimizers.SGD(init_learn_rate) #---------------------------------------------------------- if optimizer == 'adam': mlp_optimizer = tf.keras.optimizers.Adam(init_learn_rate) elif optimizer == 'rms_prop': mlp_optimizer = tf.keras.optimizers.RMSprop(init_learn_rate) #---------------------------------------------------------- #Define the loss-function (defualt is mse): mlp_loss = tf.keras.losses.mean_squared_error #---------------------------------------------------------- if loss_function == 'binaray_xentropy': mlp_loss = tf.keras.losses.binary_crossentropy elif loss_function == 'categorial_xentropy': mlp_loss = tf.keras.losses.categorical_crossentropy elif loss_function == 'mae': mlp_loss = tf.keras.losses.mean_absolute_error #---------------------------------------------------------- my_mlp.compile(optimizer=mlp_optimizer,loss=mlp_loss,metrics=['accuracy','mse']) if print_summary: print(" ") my_mlp.summary() print(" ") return my_mlp #*********************************************************************** #Train the keras network: #*********************************************************************** def train_keras_mlp(self,mlp_model,X,Y,n_epochs,val_split,early_stopping_pars): early_stopping = tf.keras.callbacks.EarlyStopping(monitor=early_stopping_pars[0],min_delta=early_stopping_pars[1],mode=early_stopping_pars[2],patience=early_stopping_pars[3],restore_best_weights=True) history = mlp_model.fit(x=X,y=Y,epochs=n_epochs,validation_split=val_split,callbacks=early_stopping,verbose=0) return history #*********************************************************************** #Set up the random forest classifier (scikit): #*********************************************************************** def set_scikit_random_forest_classifier(self,n_trees,depth): my_rf = RandomForestClassifier( n_estimators=n_trees, #--> Number of trees in your forest warm_start=True, max_depth=depth, #--> Maximum depth of tree random_state=0 ) return my_rf #*********************************************************************** #Get optimum from the roc-curve: #*********************************************************************** def get_optimum_from_roc(self,false_pos_rate,true_pos_rate,threshold): roc_optimum = 10 optimum_threshold = 0.0 #--> threshold at optimum optimum_tpr = 0.0 #--> true positive rate at optimum optimum_fpr = 0.0 #--> false positive rate at optimum scanned_distances = [] #+++++++++++++++++++++++++++++++++++ for fpr,tpr,th in zip(false_pos_rate,true_pos_rate,threshold): dist_to_optimum = fpr*fpr + (1.0-tpr)*(1.0-tpr) scanned_distances.append(dist_to_optimum) if dist_to_optimum < roc_optimum: roc_optimum = dist_to_optimum optimum_threshold = th optimum_tpr = tpr optimum_fpr = fpr #+++++++++++++++++++++++++++++++++++ return roc_optimum, optimum_fpr, optimum_tpr, optimum_threshold, scanned_distances #*********************************************************************** #Get the confusion matrix: #*********************************************************************** def get_confusion_matrix(self,data,true_label_name,pred_label_name,labels,matrix_title): #Calculate the confusion matrix: my_confusion_matrix = confusion_matrix(data[true_label_name].values,data[pred_label_name].values,labels=labels) #and normalize it: my_confusion_matrix = np.transpose( np.transpose(my_confusion_matrix)/ my_confusion_matrix.astype(np.float).sum(axis=1) ) textFormat = '.2f' fig,ax = plt.subplots() plt.rcParams['font.size'] = 20 plt.subplots_adjust(bottom=0.25,top=0.9) im = ax.imshow(my_confusion_matrix,interpolation='nearest') ax.set_xticks(np.arange(my_confusion_matrix.shape[1])) ax.set_yticks(np.arange(my_confusion_matrix.shape[0])) ax.set_xticklabels(labels) ax.set_yticklabels(labels) ax.set_xticks(np.arange(my_confusion_matrix.shape[1]+1)-.5,minor=True) ax.set_yticks(np.arange(my_confusion_matrix.shape[0]+1)-.5,minor=True) ax.tick_params(axis='both', which='major', labelsize=30) ax.set_xlabel('Predicted Label',fontsize=30,labelpad=15) ax.set_ylabel('True Label',fontsize=30,labelpad=25) ax.set_title(matrix_title,y = 1.03) ax.figure.colorbar(im,ax=ax) colorThresh = my_confusion_matrix.max() / 2. nDim = len(labels) #++++++++++++++++++++++++++++++++ for i in range(0,nDim): #++++++++++++++++++++++++++++++++ for j in range(0,nDim): ax.text(j,i, format(my_confusion_matrix[i][j],textFormat), ha = 'center', va = 'center', color = "black" if my_confusion_matrix[i,j] > colorThresh else "white") #++++++++++++++++++++++++++++++++ #++++++++++++++++++++++++++++++++ return fig,ax #***********************************************************************