Module fcmpy.ml.classification.FCM_MP
New Idea: Inverse learning and topology post-optimization in Long-term Cognitive Network reference:Deterministic learning of hybrid Fuzzy Cognitive Maps and network reduction approaches Gonzalo Nápoles a,b,∗, Agnieszka Jastrzębska c, Carlos Mosquera a, Koen Vanhoof a, Władysław Homenda c
Expand source code
"""
New Idea: Inverse learning and topology post-optimization in Long-term Cognitive Network
reference:Deterministic learning of hybrid Fuzzy Cognitive Maps and network
reduction approaches
Gonzalo Nápoles a,b,∗, Agnieszka Jastrzębska c, Carlos Mosquera a, Koen Vanhoof a,
Władysław Homenda c
"""
from scipy.io import arff
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import mean_squared_error
from datetime import datetime
import numpy as np
import pandas as pd
import argparse
import os
import csv
import sys
import gc
def read_arff(f, **kwargs):
'''
reading from arff file
:param f: filename
:param kwargs: dictionary of paramters
:return: X,Y and labels (classes)
'''
data, meta = arff.loadarff(f)
df = pd.DataFrame(data) # dataset
class_att = meta.names()[-1]
Y = df[class_att]
# replacing labels
labels = np.unique(Y)
mapping = pd.Series([x[0] for x in enumerate(labels)], index=labels)
Y = Y.map(mapping) # target
X = np.array(df.iloc[:, :-1], dtype=np.float64)
return X, Y, labels
L, U = .1, .9
def logit(X, slope=1.0, h=0.0, q=1.0, v=1.0, **kwargs):
return 1.0 / np.power(1.0 + q * np.exp(-slope * (X - h)), 1.0 / v)
def expit(Y, slope=1.0, h=0.0, q=1.0, v=1.0, **kwargs):
return h - np.log((np.power(Y, -v) - 1.0) / q) / slope
class Model(object):
'''
FCM model
'''
def __init__(self, T=None, p=[], rule=0, b1=1.0, L=0, **kwargs):
self.T, self.p = T, p
self.weights = None
self.p_out = p.copy()
if rule == 0:
self.rule = lambda a, W, out: logit(np.dot(a, W), *p)
elif rule == 1:
self.rule = lambda a, W, out: b1 * logit(np.dot(a, W), *p) + (1. - b1) * out[0]
else:
self.rule = lambda a, W, out: b1 * logit(np.dot(a, W), *p) + (1. - b1) * logit(
W.diagonal() * np.sum(out[-L - 1:-1], axis=0), *p)
def train(self, X_train, Y_train, verbose=False, callback=None, **kwargs):
'''
:param X_train: features
:param Y_train: labels
:param verbose:
:param callback:
:param kwargs: dict of params
writes calculated weight matrix to the self.weights
'''
W = self.map(X_train)
W[np.isnan(W)] = 0
self.weights = W, np.random.random((X_train.shape[1], Y_train.shape[1]))
_, out = self.test(X_train)
# pseudo-inverse learning
W_out = np.dot(np.linalg.pinv(out[-1]), expit(Y_train * (U - L) + L, *self.p))
# normalizing the features importance[-1,1]
mx = np.max(np.abs(W_out))
if mx > 1:
W_out /= mx
self.p_out[0] *= mx
self.p_out[1] /= mx
# normalizing weights values importance[-1,1]
mxW = np.max(np.abs(W))
if mxW > 1:
W /= mxW
self.weights = W, W_out
def test(self, X, **kwargs):
'''
testing obtained weight tmatrix
:param X:
:param kwargs: dict of params
:return:
'''
W, W_out = self.weights
out = [X]
for t in range(1, self.T + 1):
out.append(self.rule(out[t - 1], W, out))
z = logit(np.dot(out[-1], W_out), *self.p)
return (z - L) / (U - L), out
def map(self, X_train, **kwargs):
'''
returns Weight matrix
:param X_train: features
:param kwargs: dict of params
:return:
'''
n, m = X_train.shape
T1 = np.sum(X_train, axis=0)
T3 = np.sum(X_train ** 2, axis=0)
W = np.random.random((m, m))
for i in range(0, m):
for j in range(0, m):
d = n * T3[i] - T1[i] ** 2
if d != 0:
W[i, j] = (n * np.sum(X_train[:, i] * X_train[:, j]) - T1[i] * T1[j]) / d
return W
def cross_val(f, folds=10, **kwargs):
'''
cross valudation (loss)
:param f: file
:param folds: number of fold (def 10)
:param kwargs: dict of parameters
:return:
'''
X, Y, labels = read_arff(f)
mse = []
n_outputs = kwargs["M"] # output variables
if X.min() < 0. or X.max() > 1.:
print("Numerical values need to be normalized.")
raise Exception
if kwargs["T"] is None:
kwargs["T"] = X.shape[1] - n_outputs # n_inputs
seed = kwargs.get('seed', 1)
np.random.seed(seed)
skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=np.random.RandomState(seed))
train_error = 0
test_error = 0
start = datetime.now()
for train_index, test_index in skf.split(X, Y):
X_train, X_test, Y_train, Y_test = X[train_index], X[test_index], Y[train_index], Y[test_index]
model = Model(**kwargs)
model.train(X_train[:, :-n_outputs], X_train[:, -n_outputs:].reshape(-1, 1), **kwargs)
X_hat, _ = model.test(X_train[:, :-n_outputs]) # training error
X_true = X_train[:, -n_outputs:].reshape(-1, 1)
fold_train_error = np.sum(np.power(X_true - X_hat, 2) / n_outputs) / len(X_true)
train_error += fold_train_error / folds
y_hat, _ = model.test(X_test[:, :-n_outputs])
y_true = X_test[:, -n_outputs:]
fold_test_error = np.sum(np.power(y_true - y_hat, 2) / n_outputs) / len(y_true)
test_error += fold_test_error / folds
elapsed = datetime.now() - start
slope, h, _, _ = kwargs["p"]
return {
# arguments
"b1": "%.2f" % kwargs["b1"], "L": kwargs["L"], "slope": "%.2f" % slope, "h": "%.2f" % h,
# metrics
"train_error": train_error,
"test_error": test_error,
"training_time": elapsed.total_seconds() / folds,
"weights":model.weights[0],
"importance":model.weights[1]
}
def run(**params):
'''
runs the whole algorithm for you, the only thing we ask is the parameters. Most of them can be default
:param params:
params = {
'L':0, # For reasoning rule 3", default=0
'M':1, #Output variables", default=1
'T':None, # FCM Iterations default=None
'b1':1.0, # type=float, default=1.0
'folds':10, # Number of folds in a (Stratified)K-Fold"
'output':'./output.csv', # Output csv file", default="./output.csv"
'p':[1.0, 1.0, 1.0, 1.0],
'rule':0, # Reasoning rule", choices=[0, 1, 2], default=0
'sources':['irisnorm.arff'],
'verbose':False # store_true, verbosity }
:return:
'''
data_sources = [f for f in params['sources'] if f.endswith('.arff')]
results = []
with open("./output.csv", mode='w') as csv_file:
writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
headers = None
MSE_hist = []
for f in data_sources:
print("Processing {}".format(f))
# here the model is being created and called
result = cross_val(f, **params)
print(result)
result['filename'] = f
results.append(result)
if headers is None:
headers = list(result.keys())
writer.writerow(["name"] + headers)
MSE_hist += [result["test_error"]]
writer.writerow([os.path.basename(f)[:-5]] + [result[k] for k in headers])
if params['verbose']:
print("MSE: %.4f" % MSE_hist[-1])
csv_file.flush()
sys.stdout.flush()
gc.collect()
print("MSE Average of the model across the %d datasets: %.4f" % (len(data_sources), np.average(MSE_hist)))
return results
Functions
def cross_val(f, folds=10, **kwargs)
-
cross valudation (loss) :param f: file :param folds: number of fold (def 10) :param kwargs: dict of parameters :return:
Expand source code
def cross_val(f, folds=10, **kwargs): ''' cross valudation (loss) :param f: file :param folds: number of fold (def 10) :param kwargs: dict of parameters :return: ''' X, Y, labels = read_arff(f) mse = [] n_outputs = kwargs["M"] # output variables if X.min() < 0. or X.max() > 1.: print("Numerical values need to be normalized.") raise Exception if kwargs["T"] is None: kwargs["T"] = X.shape[1] - n_outputs # n_inputs seed = kwargs.get('seed', 1) np.random.seed(seed) skf = StratifiedKFold(n_splits=folds, shuffle=True, random_state=np.random.RandomState(seed)) train_error = 0 test_error = 0 start = datetime.now() for train_index, test_index in skf.split(X, Y): X_train, X_test, Y_train, Y_test = X[train_index], X[test_index], Y[train_index], Y[test_index] model = Model(**kwargs) model.train(X_train[:, :-n_outputs], X_train[:, -n_outputs:].reshape(-1, 1), **kwargs) X_hat, _ = model.test(X_train[:, :-n_outputs]) # training error X_true = X_train[:, -n_outputs:].reshape(-1, 1) fold_train_error = np.sum(np.power(X_true - X_hat, 2) / n_outputs) / len(X_true) train_error += fold_train_error / folds y_hat, _ = model.test(X_test[:, :-n_outputs]) y_true = X_test[:, -n_outputs:] fold_test_error = np.sum(np.power(y_true - y_hat, 2) / n_outputs) / len(y_true) test_error += fold_test_error / folds elapsed = datetime.now() - start slope, h, _, _ = kwargs["p"] return { # arguments "b1": "%.2f" % kwargs["b1"], "L": kwargs["L"], "slope": "%.2f" % slope, "h": "%.2f" % h, # metrics "train_error": train_error, "test_error": test_error, "training_time": elapsed.total_seconds() / folds, "weights":model.weights[0], "importance":model.weights[1] }
def expit(Y, slope=1.0, h=0.0, q=1.0, v=1.0, **kwargs)
-
Expand source code
def expit(Y, slope=1.0, h=0.0, q=1.0, v=1.0, **kwargs): return h - np.log((np.power(Y, -v) - 1.0) / q) / slope
def logit(X, slope=1.0, h=0.0, q=1.0, v=1.0, **kwargs)
-
Expand source code
def logit(X, slope=1.0, h=0.0, q=1.0, v=1.0, **kwargs): return 1.0 / np.power(1.0 + q * np.exp(-slope * (X - h)), 1.0 / v)
def read_arff(f, **kwargs)
-
reading from arff file :param f: filename :param kwargs: dictionary of paramters :return: X,Y and labels (classes)
Expand source code
def read_arff(f, **kwargs): ''' reading from arff file :param f: filename :param kwargs: dictionary of paramters :return: X,Y and labels (classes) ''' data, meta = arff.loadarff(f) df = pd.DataFrame(data) # dataset class_att = meta.names()[-1] Y = df[class_att] # replacing labels labels = np.unique(Y) mapping = pd.Series([x[0] for x in enumerate(labels)], index=labels) Y = Y.map(mapping) # target X = np.array(df.iloc[:, :-1], dtype=np.float64) return X, Y, labels
def run(**params)
-
runs the whole algorithm for you, the only thing we ask is the parameters. Most of them can be default :param params: params = { 'L':0, # For reasoning rule 3", default=0 'M':1, #Output variables", default=1 'T':None, # FCM Iterations default=None 'b1':1.0, # type=float, default=1.0 'folds':10, # Number of folds in a (Stratified)K-Fold" 'output':'./output.csv', # Output csv file", default="./output.csv" 'p':[1.0, 1.0, 1.0, 1.0], 'rule':0, # Reasoning rule", choices=[0, 1, 2], default=0 'sources':['irisnorm.arff'], 'verbose':False # store_true, verbosity }
:return:
Expand source code
def run(**params): ''' runs the whole algorithm for you, the only thing we ask is the parameters. Most of them can be default :param params: params = { 'L':0, # For reasoning rule 3", default=0 'M':1, #Output variables", default=1 'T':None, # FCM Iterations default=None 'b1':1.0, # type=float, default=1.0 'folds':10, # Number of folds in a (Stratified)K-Fold" 'output':'./output.csv', # Output csv file", default="./output.csv" 'p':[1.0, 1.0, 1.0, 1.0], 'rule':0, # Reasoning rule", choices=[0, 1, 2], default=0 'sources':['irisnorm.arff'], 'verbose':False # store_true, verbosity } :return: ''' data_sources = [f for f in params['sources'] if f.endswith('.arff')] results = [] with open("./output.csv", mode='w') as csv_file: writer = csv.writer(csv_file, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) headers = None MSE_hist = [] for f in data_sources: print("Processing {}".format(f)) # here the model is being created and called result = cross_val(f, **params) print(result) result['filename'] = f results.append(result) if headers is None: headers = list(result.keys()) writer.writerow(["name"] + headers) MSE_hist += [result["test_error"]] writer.writerow([os.path.basename(f)[:-5]] + [result[k] for k in headers]) if params['verbose']: print("MSE: %.4f" % MSE_hist[-1]) csv_file.flush() sys.stdout.flush() gc.collect() print("MSE Average of the model across the %d datasets: %.4f" % (len(data_sources), np.average(MSE_hist))) return results
Classes
class Model (T=None, p=[], rule=0, b1=1.0, L=0, **kwargs)
-
FCM model
Expand source code
class Model(object): ''' FCM model ''' def __init__(self, T=None, p=[], rule=0, b1=1.0, L=0, **kwargs): self.T, self.p = T, p self.weights = None self.p_out = p.copy() if rule == 0: self.rule = lambda a, W, out: logit(np.dot(a, W), *p) elif rule == 1: self.rule = lambda a, W, out: b1 * logit(np.dot(a, W), *p) + (1. - b1) * out[0] else: self.rule = lambda a, W, out: b1 * logit(np.dot(a, W), *p) + (1. - b1) * logit( W.diagonal() * np.sum(out[-L - 1:-1], axis=0), *p) def train(self, X_train, Y_train, verbose=False, callback=None, **kwargs): ''' :param X_train: features :param Y_train: labels :param verbose: :param callback: :param kwargs: dict of params writes calculated weight matrix to the self.weights ''' W = self.map(X_train) W[np.isnan(W)] = 0 self.weights = W, np.random.random((X_train.shape[1], Y_train.shape[1])) _, out = self.test(X_train) # pseudo-inverse learning W_out = np.dot(np.linalg.pinv(out[-1]), expit(Y_train * (U - L) + L, *self.p)) # normalizing the features importance[-1,1] mx = np.max(np.abs(W_out)) if mx > 1: W_out /= mx self.p_out[0] *= mx self.p_out[1] /= mx # normalizing weights values importance[-1,1] mxW = np.max(np.abs(W)) if mxW > 1: W /= mxW self.weights = W, W_out def test(self, X, **kwargs): ''' testing obtained weight tmatrix :param X: :param kwargs: dict of params :return: ''' W, W_out = self.weights out = [X] for t in range(1, self.T + 1): out.append(self.rule(out[t - 1], W, out)) z = logit(np.dot(out[-1], W_out), *self.p) return (z - L) / (U - L), out def map(self, X_train, **kwargs): ''' returns Weight matrix :param X_train: features :param kwargs: dict of params :return: ''' n, m = X_train.shape T1 = np.sum(X_train, axis=0) T3 = np.sum(X_train ** 2, axis=0) W = np.random.random((m, m)) for i in range(0, m): for j in range(0, m): d = n * T3[i] - T1[i] ** 2 if d != 0: W[i, j] = (n * np.sum(X_train[:, i] * X_train[:, j]) - T1[i] * T1[j]) / d return W
Methods
def map(self, X_train, **kwargs)
-
returns Weight matrix :param X_train: features :param kwargs: dict of params :return:
Expand source code
def map(self, X_train, **kwargs): ''' returns Weight matrix :param X_train: features :param kwargs: dict of params :return: ''' n, m = X_train.shape T1 = np.sum(X_train, axis=0) T3 = np.sum(X_train ** 2, axis=0) W = np.random.random((m, m)) for i in range(0, m): for j in range(0, m): d = n * T3[i] - T1[i] ** 2 if d != 0: W[i, j] = (n * np.sum(X_train[:, i] * X_train[:, j]) - T1[i] * T1[j]) / d return W
def test(self, X, **kwargs)
-
testing obtained weight tmatrix :param X: :param kwargs: dict of params :return:
Expand source code
def test(self, X, **kwargs): ''' testing obtained weight tmatrix :param X: :param kwargs: dict of params :return: ''' W, W_out = self.weights out = [X] for t in range(1, self.T + 1): out.append(self.rule(out[t - 1], W, out)) z = logit(np.dot(out[-1], W_out), *self.p) return (z - L) / (U - L), out
def train(self, X_train, Y_train, verbose=False, callback=None, **kwargs)
-
:param X_train: features :param Y_train: labels :param verbose: :param callback: :param kwargs: dict of params writes calculated weight matrix to the self.weights
Expand source code
def train(self, X_train, Y_train, verbose=False, callback=None, **kwargs): ''' :param X_train: features :param Y_train: labels :param verbose: :param callback: :param kwargs: dict of params writes calculated weight matrix to the self.weights ''' W = self.map(X_train) W[np.isnan(W)] = 0 self.weights = W, np.random.random((X_train.shape[1], Y_train.shape[1])) _, out = self.test(X_train) # pseudo-inverse learning W_out = np.dot(np.linalg.pinv(out[-1]), expit(Y_train * (U - L) + L, *self.p)) # normalizing the features importance[-1,1] mx = np.max(np.abs(W_out)) if mx > 1: W_out /= mx self.p_out[0] *= mx self.p_out[1] /= mx # normalizing weights values importance[-1,1] mxW = np.max(np.abs(W)) if mxW > 1: W /= mxW self.weights = W, W_out