#!/usr/bin/env python3 import numpy as np import pandas as pd import tensorflow as tf import matplotlib.pyplot as plt from matplotlib.colors import LogNorm from sklearn.metrics import mean_squared_error # Daniel Lersch (dlersch@jlab.org) # Simple script to train a autoencoder neural network to reconstruct y=x^2 data # and detect anomalies # Note: Some of the following lines were inspired by the great Autoencoder tutorial, provided by tensorflow (see: https://www.tensorflow.org/tutorials/generative/autoencoder?hl=en) print(" ") print("Detect y=x^2 Data with a Autoencoder Neural Network") print(" ") K = tf.keras np.random.seed(123) n_events_stored = 100000 test_data_compression = False #--> If this is set to true, two .csv files with 'n_events_stored' events each are genereated and locally stored # 1. File: data before encoding # 2. File: data after encoding # Create the data set: print("Create y=x^2 (signal) and y=x^3 (background) data...") n_channels = 50 x = np.linspace(-5.0,5.0,n_channels) n_events = 100 rel_noise = 0.05 data = (np.ones((n_events,1))*x*x) * np.random.normal(1.0,rel_noise,(n_events,n_channels)) bkg_data = (np.ones((n_events,1))*x*x*x*0.1) * np.random.normal(1.0,rel_noise,(n_events,n_channels)) add_noise = np.random.normal(1.0,0.05,data.shape) plt.rcParams.update({'font.size': 30}) fig_d,ax_d = plt.subplots(figsize=(10,8)) #++++++++++++++++++++++++++++++++++++ for ev in range(n_events): ax_d.plot(x,data[ev,:],'ko') #++++++++++++++++++++++++++++++++++++ ax_d.set_xlabel('x') ax_d.set_ylabel('y') ax_d.grid(True) fig_d.savefig('ae_data.png') plt.close(fig_d) print("...done!") print(" ") #Set up the ae-model: print("Set up autoencoder model...") latent_dim = 4 coding_activation = 'linear' output_activation = 'linear' layer_activation = 'relu' encoder = K.Sequential([ K.layers.Input(shape=(n_channels,)), K.layers.Dense(100,activation=layer_activation), K.layers.Dense(50,activation=layer_activation), K.layers.Dense(25,activation=layer_activation), K.layers.Dense(latent_dim,activation=coding_activation) ]) decoder = K.Sequential([ K.layers.Input(shape=(latent_dim,)), K.layers.Dense(25,activation=layer_activation), K.layers.Dense(50,activation=layer_activation), K.layers.Dense(100,activation=layer_activation), K.layers.Dense(n_channels,activation=output_activation) ]) ae_input = K.layers.Input(shape=n_channels,) ae_model = K.Sequential([ae_input,encoder,decoder]) ae_model.compile(optimizer=K.optimizers.Adam(1e-3),loss=K.losses.MSE) ae_model.summary() print("...done!") print(" ") #Train the model on the provided data history = ae_model.fit(x=data,y=data,epochs=100,validation_split=0.25) print(" ") print("Get model predictions for y=x^2 and y=x^3 data...") ae_pred = ae_model.predict(data) ae_pred_bkg = ae_model.predict(bkg_data) encoder_out = K.backend.get_value(encoder(data)) print("...done!") print(" ") #And plot the results: print("Visualize results...") fig_t,ax_t = plt.subplots(figsize=(10,8)) ax_t.plot(history.history['loss'],linewidth=2.0,label='Training Set') ax_t.plot(history.history['val_loss'],linewidth=2.0,label='Testing Set') ax_t.legend() ax_t.grid(True) ax_t.set_xlabel('Epoch') ax_t.set_ylabel('Loss') fig_t.savefig('ae_training_curve.png') plt.close(fig_t) fig_l,ax_l = plt.subplots(2,3,figsize=(25,10)) fig_l.subplots_adjust(wspace=0.5) fig_l.subplots_adjust(hspace=0.5) ax_l[0,0].plot(encoder_out[:,0],encoder_out[:,1],'bs') ax_l[0,1].plot(encoder_out[:,0],encoder_out[:,2],'bs') ax_l[0,2].plot(encoder_out[:,0],encoder_out[:,3],'bs') ax_l[1,0].plot(encoder_out[:,1],encoder_out[:,2],'bs') ax_l[1,1].plot(encoder_out[:,1],encoder_out[:,3],'bs') ax_l[1,2].plot(encoder_out[:,2],encoder_out[:,3],'bs') ax_l[0,0].set_xlabel('Encoder Output 1') ax_l[0,0].set_ylabel('Encoder Output 2') ax_l[0,1].set_xlabel('Encoder Output 1') ax_l[0,1].set_ylabel('Encoder Output 3') ax_l[0,2].set_xlabel('Encoder Output 1') ax_l[0,2].set_ylabel('Encoder Output 4') ax_l[1,0].set_xlabel('Encoder Output 2') ax_l[1,0].set_ylabel('Encoder Output 3') ax_l[1,1].set_xlabel('Encoder Output 2') ax_l[1,1].set_ylabel('Encoder Output 4') ax_l[1,2].set_xlabel('Encoder Output 3') ax_l[1,2].set_ylabel('Encoder Output 4') fig_l.savefig('ae_latent_space.png') plt.close(fig_l) fig_r,ax_r = plt.subplots(1,3,figsize=(30,8)) fig_r.subplots_adjust(wspace=0.25) #++++++++++++++++++++++++++++++++++++ for ev in range(n_events): if ev == 0: ax_r[0].plot(x,data[ev,:],'ko',label='Sig. Data') ax_r[1].plot(x,bkg_data[ev,:],'ko',label='Bkg. Data') else: ax_r[0].plot(x,data[ev,:],'ko') ax_r[1].plot(x,bkg_data[ev,:],'ko') #++++++++++++++++++++++++++++++++++++ ax_r[0].plot(x,np.mean(ae_pred,0),'r-',linewidth=3.0,label='Avg. AE Prediction') ax_r[1].plot(x,np.mean(ae_pred_bkg,0),'b--',linewidth=3.0,label='Avg. AE Prediction') ax_r[0].set_xlabel('x') ax_r[0].set_ylabel('y') ax_r[1].set_xlabel('x') ax_r[1].set_ylabel('y') ax_r[0].legend() ax_r[0].grid(True) ax_r[1].legend() ax_r[1].grid(True) ax_r[2].plot(x,np.std(ae_pred,0),'r-',linewidth=3.0,label='AE') ax_r[2].plot(x,np.std(data,0),'k-',linewidth=3.0,label='Sig. Data') ax_r[2].set_xlabel('x') ax_r[2].set_ylabel('Std. Dev.') ax_r[2].legend() ax_r[2].grid(True) fig_r.savefig('ae_predictions.png') plt.close(fig_r) mse_data = np.mean((data - ae_pred)**2,1) mse_bkg = np.mean((bkg_data - ae_pred_bkg)**2,1) fig_a,ax_a = plt.subplots(figsize=(10,8)) ax_a.hist(mse_data,20,facecolor='r',label='Sig. Data') ax_a.hist(mse_bkg,20,facecolor='b',label='Bkg. Data') ax_a.set_xlabel('Mean Squared Error') ax_a.set_ylabel('Counts') ax_a.legend() fig_a.savefig('ae_anomaly_det.png') plt.close(fig_a) print("...done! Have a great day!") print(" ") if test_data_compression: print("But wait, there is more! Run a test on data compression.") print("Generate new data with " + str(n_events_stored) + " events...") data = (np.ones((n_events_stored,1))*x*x) * np.random.normal(1.0,rel_noise,(n_events_stored,n_channels)) print("...done!") print(" ") name_before_encoder = "ae_data_before_encoding.csv" print("Store un-encoded data at: " + name_before_encoder + "...") feature_names = ['y_' + str(i) for i in range(n_channels)] data_df = pd.DataFrame(data,columns=feature_names) data_df.to_csv(name_before_encoder,index=False) print("...done!") print(" ") name_after_encoder = "ae_data_after_encoding.csv" print("Pass data through encoder and store it at: " + name_after_encoder + "...") encoded_data = K.backend.get_value(encoder(data)) feature_names = ['enc_' + str(i) for i in range(latent_dim)] enc_data_df = pd.DataFrame(encoded_data,columns=feature_names) enc_data_df.to_csv(name_after_encoder,index=False) print("...done! Have a beautiful day!") print(" ")