src.FeatureSelectionMethods.GrangerCausality
1# ************************************************************************************************************************* # 2# UTC Header # 3# :::::::::::::::::::: ::: ::: ::::::::::: :::::::: # 4# GrangerCausality.py :::::::::::::::::::: :+: :+: :+: :+: :+: # 5# ::::::::::::::+++#####+++ +:+ +:+ +:+ +:+ # 6# By: branlyst and ismailkad < > ::+++##############+++ +:+ +:+ +:+ +:+ # 7# +++##############+++:::: +#+ +:+ +#+ +#+ # 8# +++##+++:::::::::::::: +#+ +:+ +#+ +#+ # 9# :::::::::::::::::::: +#+ +#+ +#+ +#+ # 10# :::::::::::::::::::: #+# #+# #+# #+# #+# # 11# Update: 2022/06/16 17:51:15 by branlyst and ismai :::::::::::::::::::: ######## ### ######## .fr # 12# # 13# ************************************************************************************************************************* # 14 15from src.FeatureSelectionMethods.TemplateMethod import TemplateMethod 16from src.scripts.utils import stationary_dataframe, symmetrize 17 18from statsmodels.tsa.stattools import grangercausalitytests 19from statsmodels.tsa.api import VAR 20 21from sklearn_extra.cluster import KMedoids 22 23import numpy as np 24import pandas as pd 25 26 27class GrangerCausality(TemplateMethod): 28 """ 29 GrangerCausality is a class which implements the TemplateMethods in order to implement the Granger Causality feature selection 30 Explaned in the paper [GFSM: a Feature Selection Method for Improving Time Series Forecasting](https://hal.archives-ouvertes.fr/hal-02448277/document) 31 """ 32 33 def __init__(self): 34 TemplateMethod.__init__(self, "GrangerCausality") 35 36 def select(self, dataframe, target_columns, number_of_target_to_keep=1): 37 38 # make dataframe stationary 39 df, _ = stationary_dataframe(dataframe) 40 41 # compute granger causality matrix 42 lagrange_matrix = self.grangers_causation_matrix( 43 df, df.columns, test="ssr_ftest" 44 ) 45 46 # make the matrix symmetric using the max function agg 47 lgm = symmetrize(lagrange_matrix) 48 lgm_df = pd.DataFrame(lgm, columns=df.columns, index=df.columns) 49 50 # clustering using KMedoid 51 KMobj = KMedoids( 52 n_clusters=number_of_target_to_keep, 53 metric="precomputed", 54 init="k-medoids++", 55 ).fit(lgm) 56 clusters = KMobj.labels_ 57 58 self._selected_features = dict() 59 for target_column in target_columns: 60 self._selected_features[target_column] = self.gfsm_features( 61 lgm_df, clusters, target_column 62 ) 63 self._score = lgm_df[target_columns] 64 65 def grangers_causation_matrix( 66 self, data, variables, test="ssr_ftest", maxlag=10, verbose=False 67 ): 68 """Check Granger Causality of all possible combinations of the Time series. 69 The rows are the response variable, columns are predictors. The values in the table 70 are the P-Values. P-Values lesser than the significance level (0.05), implies 71 the Null Hypothesis that the coefficients of the corresponding past values is 72 zero, that is, the X does not cause Y can be rejected. 73 74 Args: 75 data (DataFrame) : pandas dataframe containing the time series variables 76 variables : list containing names of the time series variables. 77 """ 78 79 # TODO: assert dataframe is stationary 80 df = pd.DataFrame( 81 np.zeros((len(variables), len(variables))), 82 columns=variables, 83 index=variables, 84 ) 85 86 # maxlag = int((data.shape[0] - 1) / (2 * (data.shape[1] + 1))) 87 88 for c in df.columns: 89 for r in df.index: 90 91 if c != r: 92 # Computing the lag order 93 # check for stationarity 94 df_c_r, _ = stationary_dataframe(data[[r, c]]) 95 lag = self.var_lag_order(df_c_r) 96 test_result = grangercausalitytests( 97 df_c_r, maxlag=lag, verbose=False 98 ) 99 p_values = [ 100 round(test_result[i + 1][0][test][1], 4) for i in range(lag) 101 ] 102 min_p_value = np.min(p_values) 103 if verbose: 104 print(f"Y = {r}, X = {c}, P Values = {p_values}") 105 df.loc[r, c] = min_p_value 106 107 else: 108 df.loc[r, c] = 1 109 110 df.columns = [var + "_x" for var in variables] 111 df.index = [var + "_y" for var in variables] 112 113 return df 114 115 def var_lag_order(self, dataframe, criterion="aic"): 116 """ 117 Pass in a dataframe 118 Returns the optimal lag order given the criterion input for a VAR model 119 120 Args: 121 dataframe (DataFrame) : pandas dataframe 122 criterion (str) : criterion, `aic` by default 123 """ 124 # TODO: assert n_columns = 2 125 # TODO: assert dataframe stationary 126 model = VAR(dataframe) 127 select_order = model.select_order() 128 if criterion == "aic": 129 # We select the order based on AIC criterion 130 optimal_lag = select_order.aic 131 else: 132 # TODO: having other criterions handled 133 optimal_lag = select_order.aic 134 return optimal_lag 135 136 def gfsm_features(self, matrix, labels, target): 137 """ 138 Returns the features in matrix having the max causality with the target for each cluster 139 140 Args: 141 matrix (DataFrame) : the granger Matrix 142 labels 143 target (str) : target name 144 """ 145 features = [] 146 for label in set(labels): 147 ind = np.where(labels == label) 148 max_feature = matrix[target].iloc[ind].idxmax() 149 features.append(max_feature) 150 return features
28class GrangerCausality(TemplateMethod): 29 """ 30 GrangerCausality is a class which implements the TemplateMethods in order to implement the Granger Causality feature selection 31 Explaned in the paper [GFSM: a Feature Selection Method for Improving Time Series Forecasting](https://hal.archives-ouvertes.fr/hal-02448277/document) 32 """ 33 34 def __init__(self): 35 TemplateMethod.__init__(self, "GrangerCausality") 36 37 def select(self, dataframe, target_columns, number_of_target_to_keep=1): 38 39 # make dataframe stationary 40 df, _ = stationary_dataframe(dataframe) 41 42 # compute granger causality matrix 43 lagrange_matrix = self.grangers_causation_matrix( 44 df, df.columns, test="ssr_ftest" 45 ) 46 47 # make the matrix symmetric using the max function agg 48 lgm = symmetrize(lagrange_matrix) 49 lgm_df = pd.DataFrame(lgm, columns=df.columns, index=df.columns) 50 51 # clustering using KMedoid 52 KMobj = KMedoids( 53 n_clusters=number_of_target_to_keep, 54 metric="precomputed", 55 init="k-medoids++", 56 ).fit(lgm) 57 clusters = KMobj.labels_ 58 59 self._selected_features = dict() 60 for target_column in target_columns: 61 self._selected_features[target_column] = self.gfsm_features( 62 lgm_df, clusters, target_column 63 ) 64 self._score = lgm_df[target_columns] 65 66 def grangers_causation_matrix( 67 self, data, variables, test="ssr_ftest", maxlag=10, verbose=False 68 ): 69 """Check Granger Causality of all possible combinations of the Time series. 70 The rows are the response variable, columns are predictors. The values in the table 71 are the P-Values. P-Values lesser than the significance level (0.05), implies 72 the Null Hypothesis that the coefficients of the corresponding past values is 73 zero, that is, the X does not cause Y can be rejected. 74 75 Args: 76 data (DataFrame) : pandas dataframe containing the time series variables 77 variables : list containing names of the time series variables. 78 """ 79 80 # TODO: assert dataframe is stationary 81 df = pd.DataFrame( 82 np.zeros((len(variables), len(variables))), 83 columns=variables, 84 index=variables, 85 ) 86 87 # maxlag = int((data.shape[0] - 1) / (2 * (data.shape[1] + 1))) 88 89 for c in df.columns: 90 for r in df.index: 91 92 if c != r: 93 # Computing the lag order 94 # check for stationarity 95 df_c_r, _ = stationary_dataframe(data[[r, c]]) 96 lag = self.var_lag_order(df_c_r) 97 test_result = grangercausalitytests( 98 df_c_r, maxlag=lag, verbose=False 99 ) 100 p_values = [ 101 round(test_result[i + 1][0][test][1], 4) for i in range(lag) 102 ] 103 min_p_value = np.min(p_values) 104 if verbose: 105 print(f"Y = {r}, X = {c}, P Values = {p_values}") 106 df.loc[r, c] = min_p_value 107 108 else: 109 df.loc[r, c] = 1 110 111 df.columns = [var + "_x" for var in variables] 112 df.index = [var + "_y" for var in variables] 113 114 return df 115 116 def var_lag_order(self, dataframe, criterion="aic"): 117 """ 118 Pass in a dataframe 119 Returns the optimal lag order given the criterion input for a VAR model 120 121 Args: 122 dataframe (DataFrame) : pandas dataframe 123 criterion (str) : criterion, `aic` by default 124 """ 125 # TODO: assert n_columns = 2 126 # TODO: assert dataframe stationary 127 model = VAR(dataframe) 128 select_order = model.select_order() 129 if criterion == "aic": 130 # We select the order based on AIC criterion 131 optimal_lag = select_order.aic 132 else: 133 # TODO: having other criterions handled 134 optimal_lag = select_order.aic 135 return optimal_lag 136 137 def gfsm_features(self, matrix, labels, target): 138 """ 139 Returns the features in matrix having the max causality with the target for each cluster 140 141 Args: 142 matrix (DataFrame) : the granger Matrix 143 labels 144 target (str) : target name 145 """ 146 features = [] 147 for label in set(labels): 148 ind = np.where(labels == label) 149 max_feature = matrix[target].iloc[ind].idxmax() 150 features.append(max_feature) 151 return features
GrangerCausality is a class which implements the TemplateMethods in order to implement the Granger Causality feature selection Explaned in the paper GFSM: a Feature Selection Method for Improving Time Series Forecasting
37 def select(self, dataframe, target_columns, number_of_target_to_keep=1): 38 39 # make dataframe stationary 40 df, _ = stationary_dataframe(dataframe) 41 42 # compute granger causality matrix 43 lagrange_matrix = self.grangers_causation_matrix( 44 df, df.columns, test="ssr_ftest" 45 ) 46 47 # make the matrix symmetric using the max function agg 48 lgm = symmetrize(lagrange_matrix) 49 lgm_df = pd.DataFrame(lgm, columns=df.columns, index=df.columns) 50 51 # clustering using KMedoid 52 KMobj = KMedoids( 53 n_clusters=number_of_target_to_keep, 54 metric="precomputed", 55 init="k-medoids++", 56 ).fit(lgm) 57 clusters = KMobj.labels_ 58 59 self._selected_features = dict() 60 for target_column in target_columns: 61 self._selected_features[target_column] = self.gfsm_features( 62 lgm_df, clusters, target_column 63 ) 64 self._score = lgm_df[target_columns]
Select abstract method. Must be implemented.
Args
- dataframe (DataFrame) : dataframe which contains the data used to apply the feature selection. 1 column by feature and 1 line by entry
- target_columns (str[]) : array of the target column names used to apply the feature selection
- number_of_target_to_keep (int | None) : number of target to keep to select features. If None, algorithm will try to find the best compromise
66 def grangers_causation_matrix( 67 self, data, variables, test="ssr_ftest", maxlag=10, verbose=False 68 ): 69 """Check Granger Causality of all possible combinations of the Time series. 70 The rows are the response variable, columns are predictors. The values in the table 71 are the P-Values. P-Values lesser than the significance level (0.05), implies 72 the Null Hypothesis that the coefficients of the corresponding past values is 73 zero, that is, the X does not cause Y can be rejected. 74 75 Args: 76 data (DataFrame) : pandas dataframe containing the time series variables 77 variables : list containing names of the time series variables. 78 """ 79 80 # TODO: assert dataframe is stationary 81 df = pd.DataFrame( 82 np.zeros((len(variables), len(variables))), 83 columns=variables, 84 index=variables, 85 ) 86 87 # maxlag = int((data.shape[0] - 1) / (2 * (data.shape[1] + 1))) 88 89 for c in df.columns: 90 for r in df.index: 91 92 if c != r: 93 # Computing the lag order 94 # check for stationarity 95 df_c_r, _ = stationary_dataframe(data[[r, c]]) 96 lag = self.var_lag_order(df_c_r) 97 test_result = grangercausalitytests( 98 df_c_r, maxlag=lag, verbose=False 99 ) 100 p_values = [ 101 round(test_result[i + 1][0][test][1], 4) for i in range(lag) 102 ] 103 min_p_value = np.min(p_values) 104 if verbose: 105 print(f"Y = {r}, X = {c}, P Values = {p_values}") 106 df.loc[r, c] = min_p_value 107 108 else: 109 df.loc[r, c] = 1 110 111 df.columns = [var + "_x" for var in variables] 112 df.index = [var + "_y" for var in variables] 113 114 return df
Check Granger Causality of all possible combinations of the Time series. The rows are the response variable, columns are predictors. The values in the table are the P-Values. P-Values lesser than the significance level (0.05), implies the Null Hypothesis that the coefficients of the corresponding past values is zero, that is, the X does not cause Y can be rejected.
Args
- data (DataFrame) : pandas dataframe containing the time series variables
- variables : list containing names of the time series variables.
116 def var_lag_order(self, dataframe, criterion="aic"): 117 """ 118 Pass in a dataframe 119 Returns the optimal lag order given the criterion input for a VAR model 120 121 Args: 122 dataframe (DataFrame) : pandas dataframe 123 criterion (str) : criterion, `aic` by default 124 """ 125 # TODO: assert n_columns = 2 126 # TODO: assert dataframe stationary 127 model = VAR(dataframe) 128 select_order = model.select_order() 129 if criterion == "aic": 130 # We select the order based on AIC criterion 131 optimal_lag = select_order.aic 132 else: 133 # TODO: having other criterions handled 134 optimal_lag = select_order.aic 135 return optimal_lag
Pass in a dataframe Returns the optimal lag order given the criterion input for a VAR model
Args
- dataframe (DataFrame) : pandas dataframe
- criterion (str) : criterion,
aic
by default
137 def gfsm_features(self, matrix, labels, target): 138 """ 139 Returns the features in matrix having the max causality with the target for each cluster 140 141 Args: 142 matrix (DataFrame) : the granger Matrix 143 labels 144 target (str) : target name 145 """ 146 features = [] 147 for label in set(labels): 148 ind = np.where(labels == label) 149 max_feature = matrix[target].iloc[ind].idxmax() 150 features.append(max_feature) 151 return features
Returns the features in matrix having the max causality with the target for each cluster
Args
- matrix (DataFrame) : the granger Matrix
- labels
- target (str) : target name