Module fcmpy.ml.classification.eltcn

For more information and details about the algorithm, please refer to Pattern classification with Evolving Long-term Cognitive Networks Gonzalo Nápoles a,b,⇑, Agnieszka Jastrze˛bska c, Yamisleydi Salgueiro d

Expand source code
'''
For more information and details about the algorithm, please refer to
Pattern classification with Evolving Long-term Cognitive
Networks
Gonzalo Nápoles a,b,⇑, Agnieszka Jastrze˛bska c, Yamisleydi Salgueiro d
'''

import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn import datasets
from sklearn import model_selection
from sklearn.utils import shuffle
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
from tensorflow.keras import backend as K
from tensorflow.python.keras.backend import set_session
from tensorflow.keras import regularizers
import matplotlib.pyplot as plt
from scipy.stats import entropy
from math import log, e
import warnings
warnings.filterwarnings("ignore")


class WeightRegularizer(tf.keras.regularizers.Regularizer):
    '''
    regularizing the wieghts
    '''

    def __init__(self, coef, mask):
        self.coef = coef
        self.mask = mask

    def __call__(self, weight_matrix):
        tensor = tf.convert_to_tensor(self.coef, np.float32)
        reg1 = K.sum(K.abs(tf.math.multiply(tf.math.subtract(weight_matrix,tensor), self.mask)))
        reg2 = 0.001 * K.sum(K.square(weight_matrix))
        return reg1 + reg2

    def get_config(self):
        return {'coefficients': self.coef, 'penalization': self.mask}
    

def run_model(file, layers=5, folds=5, epochs=1000, verbose=True, regularize=True):
    '''
    :param file: expects optimize .arff values with features vals [0,1]
    :param layers: numbers of nn layers (def 5)
    :param folds: how many validation folds (def 5)
    :param epochs: number of epochs (def 1000)
    :param verbose:
    :param regularize:
    :return: accuracy and weights matrix
    '''
    X, y, out = read_arff(file)
    hidden = len(X[0])
    
    skf = StratifiedKFold(n_splits=folds)
    skf.get_n_splits(X, y)
    
    acc_arr = []
    ent_arr = []
    for train_index, test_index in skf.split(X, y):
        X_train, X_test = np.asarray(X[train_index]).astype(np.float32),np.asarray(X[test_index]).astype(np.float32) #X[train_index], X[test_index]
        y_train, y_test = np.asarray(y[train_index]).astype(np.float32),np.asarray(y[test_index]).astype(np.float32) #y[train_index], y[test_index]
#         X = np.asarray(X).astype(np.float32)
        if(regularize):
            coef, mask = coefficients(matrix=X_train)
            network = [tf.keras.layers.Flatten()]     
    
        for i in range(layers):
            reg = None if (i == layers-1 or not regularize) else WeightRegularizer(coef, mask)
            dense = tf.keras.layers.Dense(hidden, activation=tf.nn.tanh, kernel_regularizer=reg)
            network.append(dense)

        network.append(tf.keras.layers.Dense(out, activation=tf.nn.softmax))

        model = tf.keras.models.Sequential(network)
        model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
        history = model.fit(X_train, y_train, epochs=epochs, verbose=0)
        
        if(verbose):
            plot_loss_weights(history, model, mask)
        
        weights = model.get_weights()
        relajo = error(weights, coef, mask)
        ent_arr.append(relajo)
        
        loss, acc = model.evaluate(X_test, y_test, verbose=0)
        acc_arr.append(acc)

    return np.mean(acc_arr), np.mean(ent_arr), weights 

def coefficients(matrix):
    n, m = matrix.shape
    temp1 = np.sum(matrix, axis=0)
    temp2 = np.sum(matrix**2, axis=0)
    
    df_data = pd.DataFrame(data=matrix,dtype=float)
    pearson = np.array(df_data.corr())
    
    mask = np.zeros((m, m))
    coef = np.zeros((m, m))
    for i in range(0, m):
        for j in range(0, m):
            den = n * temp2[i] - temp1[i] ** 2
            if (den != 0):
                coef[i,j] = (n * np.sum(matrix[:,i] * matrix[:,j]) - temp1[i] * temp1[j]) / den
                
                if(abs(pearson[i,j]) > 0.5):
                    mask[i,j] = log(n)
 
    return coef, mask

def plot_loss_weights(history, model, mask):
    '''
    plotting bar plots of individual weights for fully connected FCM
    :param history: data used to plot
    :param model: created model
    :param mask:
    :return:
    '''
    fig1, axes1 = plt.subplots(figsize=(15,5))
    fig2, axes2 = plt.subplots(figsize=(15,5))
        
    # plotting the training loss
    axes1.plot(history.history['loss'])
    axes1.set_title('model loss')
    axes1.set(ylabel='loss value', xlabel='epoch')

    # plotting the weights
    weights = model.get_weights()

    data = [xi.flatten().tolist() for xi in weights[:-2:2]]
    df = pd.DataFrame(data)
    
    header = []
    for i in range(len(mask)):
        for j in range(len(mask)):
            header.append('w'+str(i+1)+str(j+1)+"*" if (mask[i,j] != 0) else 'w'+str(i+1)+str(j+1))
            
    df.columns = header
    import seaborn as sns
    axes2 = sns.boxplot(data=df)
    axes2.set_title('weights')
    axes2.set(ylabel='value across layers', xlabel='weight')
    
def error(weights, coef, mask):
    '''
    returns error of the model for predicting correct classes
    :param weights: weight matrix
    :param coef: coefficients
    :param mask:
    :return: Error
    '''
    if(np.sum(mask) == 0):
        return 0
    
    coef = coef.flatten()
    mask = mask.flatten()
    array = []
    
    count = 0
    matrix = [xi.flatten() for xi in weights[:-2:2]]
    for i in range(len(matrix[0])):
        layer = column(matrix, i)
        for j in range(len(layer)):
            if(mask[j] > 0):
                den = max(1, max(layer[j], coef[j]))
                array.append(abs(layer[j] - coef[j]) / den)
                count += 1

    return np.sum(array)/count

def column(matrix, i):
    return [row[i] for row in matrix]
    
def read_arff(file):
    '''
    reading from .arff file
    consistency, in the MP algorithm we take the arff without header, so we cannot req header here
    :param file:
    :return:
    '''

    from scipy.io import arff
    data, meta = arff.loadarff(file)
    frame = pd.DataFrame(data)
    class_att = meta.names()[-1]
    y = frame[class_att]
    labels = np.unique(y)
    mapping = pd.Series([x[0] for x in enumerate(labels)], index = labels)
    y = np.array(y.map(mapping))
    X = np.array(frame)[:,0:-1]
    return X, y, len(labels)    
    
def run(path,folds=5):
    '''
    takes a directory where data file is (in .arff format) and do the whole calculation for you :)
    return fully connected FCM n x n for each fold and weight matrix n x nclasses
    :param path:
    :return:
    '''
    import os
    files = os.listdir(path)
    print(f"file in your data directory {files}. make sure they are .arff files!")
    print("running...")
    results = {}
    for file in files:
        if ".arff" not in file:
            print(f"{file} is not an .arff file")
        acc, ent, weights = run_model(path + "/" + file)
        # optimizing weights values for them to stay between [-1,1]
        for i in range(len(weights)):

            # print(np.abs(weights)[i])
            mx = np.max(np.abs(weights)[i])
            # print(f'max {mx}')
            if mx > 1:
                weights[i] /= mx
                # weights[i] *= mx
                # weights /= mx
        avgW = np.zeros((weights[0].shape[0],weights[0].shape[1]))     
        for i in np.arange(0,folds,step=2):
            avgW += weights[i]
        classW = np.zeros((weights[0].shape[0],weights[0].shape[1]))     
        for i in np.arange(1,folds,step=2):
            classW += weights[i]
        results[file] = {'acc':acc,'ent':ent,'weights':weights,'avgW':avgW/folds, 'classW':classW/folds}
        print(file.replace('.arff','') + "," + str(acc)+ "," + str(ent))
    return results

Functions

def coefficients(matrix)
Expand source code
def coefficients(matrix):
    n, m = matrix.shape
    temp1 = np.sum(matrix, axis=0)
    temp2 = np.sum(matrix**2, axis=0)
    
    df_data = pd.DataFrame(data=matrix,dtype=float)
    pearson = np.array(df_data.corr())
    
    mask = np.zeros((m, m))
    coef = np.zeros((m, m))
    for i in range(0, m):
        for j in range(0, m):
            den = n * temp2[i] - temp1[i] ** 2
            if (den != 0):
                coef[i,j] = (n * np.sum(matrix[:,i] * matrix[:,j]) - temp1[i] * temp1[j]) / den
                
                if(abs(pearson[i,j]) > 0.5):
                    mask[i,j] = log(n)
 
    return coef, mask
def column(matrix, i)
Expand source code
def column(matrix, i):
    return [row[i] for row in matrix]
def error(weights, coef, mask)

returns error of the model for predicting correct classes :param weights: weight matrix :param coef: coefficients :param mask: :return: Error

Expand source code
def error(weights, coef, mask):
    '''
    returns error of the model for predicting correct classes
    :param weights: weight matrix
    :param coef: coefficients
    :param mask:
    :return: Error
    '''
    if(np.sum(mask) == 0):
        return 0
    
    coef = coef.flatten()
    mask = mask.flatten()
    array = []
    
    count = 0
    matrix = [xi.flatten() for xi in weights[:-2:2]]
    for i in range(len(matrix[0])):
        layer = column(matrix, i)
        for j in range(len(layer)):
            if(mask[j] > 0):
                den = max(1, max(layer[j], coef[j]))
                array.append(abs(layer[j] - coef[j]) / den)
                count += 1

    return np.sum(array)/count
def plot_loss_weights(history, model, mask)

plotting bar plots of individual weights for fully connected FCM :param history: data used to plot :param model: created model :param mask: :return:

Expand source code
def plot_loss_weights(history, model, mask):
    '''
    plotting bar plots of individual weights for fully connected FCM
    :param history: data used to plot
    :param model: created model
    :param mask:
    :return:
    '''
    fig1, axes1 = plt.subplots(figsize=(15,5))
    fig2, axes2 = plt.subplots(figsize=(15,5))
        
    # plotting the training loss
    axes1.plot(history.history['loss'])
    axes1.set_title('model loss')
    axes1.set(ylabel='loss value', xlabel='epoch')

    # plotting the weights
    weights = model.get_weights()

    data = [xi.flatten().tolist() for xi in weights[:-2:2]]
    df = pd.DataFrame(data)
    
    header = []
    for i in range(len(mask)):
        for j in range(len(mask)):
            header.append('w'+str(i+1)+str(j+1)+"*" if (mask[i,j] != 0) else 'w'+str(i+1)+str(j+1))
            
    df.columns = header
    import seaborn as sns
    axes2 = sns.boxplot(data=df)
    axes2.set_title('weights')
    axes2.set(ylabel='value across layers', xlabel='weight')
def read_arff(file)

reading from .arff file consistency, in the MP algorithm we take the arff without header, so we cannot req header here :param file: :return:

Expand source code
def read_arff(file):
    '''
    reading from .arff file
    consistency, in the MP algorithm we take the arff without header, so we cannot req header here
    :param file:
    :return:
    '''

    from scipy.io import arff
    data, meta = arff.loadarff(file)
    frame = pd.DataFrame(data)
    class_att = meta.names()[-1]
    y = frame[class_att]
    labels = np.unique(y)
    mapping = pd.Series([x[0] for x in enumerate(labels)], index = labels)
    y = np.array(y.map(mapping))
    X = np.array(frame)[:,0:-1]
    return X, y, len(labels)    
def run(path, folds=5)

takes a directory where data file is (in .arff format) and do the whole calculation for you :) return fully connected FCM n x n for each fold and weight matrix n x nclasses :param path: :return:

Expand source code
def run(path,folds=5):
    '''
    takes a directory where data file is (in .arff format) and do the whole calculation for you :)
    return fully connected FCM n x n for each fold and weight matrix n x nclasses
    :param path:
    :return:
    '''
    import os
    files = os.listdir(path)
    print(f"file in your data directory {files}. make sure they are .arff files!")
    print("running...")
    results = {}
    for file in files:
        if ".arff" not in file:
            print(f"{file} is not an .arff file")
        acc, ent, weights = run_model(path + "/" + file)
        # optimizing weights values for them to stay between [-1,1]
        for i in range(len(weights)):

            # print(np.abs(weights)[i])
            mx = np.max(np.abs(weights)[i])
            # print(f'max {mx}')
            if mx > 1:
                weights[i] /= mx
                # weights[i] *= mx
                # weights /= mx
        avgW = np.zeros((weights[0].shape[0],weights[0].shape[1]))     
        for i in np.arange(0,folds,step=2):
            avgW += weights[i]
        classW = np.zeros((weights[0].shape[0],weights[0].shape[1]))     
        for i in np.arange(1,folds,step=2):
            classW += weights[i]
        results[file] = {'acc':acc,'ent':ent,'weights':weights,'avgW':avgW/folds, 'classW':classW/folds}
        print(file.replace('.arff','') + "," + str(acc)+ "," + str(ent))
    return results
def run_model(file, layers=5, folds=5, epochs=1000, verbose=True, regularize=True)

:param file: expects optimize .arff values with features vals [0,1] :param layers: numbers of nn layers (def 5) :param folds: how many validation folds (def 5) :param epochs: number of epochs (def 1000) :param verbose: :param regularize: :return: accuracy and weights matrix

Expand source code
def run_model(file, layers=5, folds=5, epochs=1000, verbose=True, regularize=True):
    '''
    :param file: expects optimize .arff values with features vals [0,1]
    :param layers: numbers of nn layers (def 5)
    :param folds: how many validation folds (def 5)
    :param epochs: number of epochs (def 1000)
    :param verbose:
    :param regularize:
    :return: accuracy and weights matrix
    '''
    X, y, out = read_arff(file)
    hidden = len(X[0])
    
    skf = StratifiedKFold(n_splits=folds)
    skf.get_n_splits(X, y)
    
    acc_arr = []
    ent_arr = []
    for train_index, test_index in skf.split(X, y):
        X_train, X_test = np.asarray(X[train_index]).astype(np.float32),np.asarray(X[test_index]).astype(np.float32) #X[train_index], X[test_index]
        y_train, y_test = np.asarray(y[train_index]).astype(np.float32),np.asarray(y[test_index]).astype(np.float32) #y[train_index], y[test_index]
#         X = np.asarray(X).astype(np.float32)
        if(regularize):
            coef, mask = coefficients(matrix=X_train)
            network = [tf.keras.layers.Flatten()]     
    
        for i in range(layers):
            reg = None if (i == layers-1 or not regularize) else WeightRegularizer(coef, mask)
            dense = tf.keras.layers.Dense(hidden, activation=tf.nn.tanh, kernel_regularizer=reg)
            network.append(dense)

        network.append(tf.keras.layers.Dense(out, activation=tf.nn.softmax))

        model = tf.keras.models.Sequential(network)
        model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
        history = model.fit(X_train, y_train, epochs=epochs, verbose=0)
        
        if(verbose):
            plot_loss_weights(history, model, mask)
        
        weights = model.get_weights()
        relajo = error(weights, coef, mask)
        ent_arr.append(relajo)
        
        loss, acc = model.evaluate(X_test, y_test, verbose=0)
        acc_arr.append(acc)

    return np.mean(acc_arr), np.mean(ent_arr), weights 

Classes

class WeightRegularizer (coef, mask)

regularizing the wieghts

Expand source code
class WeightRegularizer(tf.keras.regularizers.Regularizer):
    '''
    regularizing the wieghts
    '''

    def __init__(self, coef, mask):
        self.coef = coef
        self.mask = mask

    def __call__(self, weight_matrix):
        tensor = tf.convert_to_tensor(self.coef, np.float32)
        reg1 = K.sum(K.abs(tf.math.multiply(tf.math.subtract(weight_matrix,tensor), self.mask)))
        reg2 = 0.001 * K.sum(K.square(weight_matrix))
        return reg1 + reg2

    def get_config(self):
        return {'coefficients': self.coef, 'penalization': self.mask}

Ancestors

  • tensorflow.python.keras.regularizers.Regularizer

Methods

def get_config(self)

Returns the config of the regularizer.

An regularizer config is a Python dictionary (serializable) containing all configuration parameters of the regularizer. The same regularizer can be reinstantiated later (without any saved state) from this configuration.

This method is optional if you are just training and executing models, exporting to and from SavedModels, or using weight checkpoints.

This method is required for Keras model_to_estimator, saving and loading models to HDF5 formats, Keras model cloning, some visualization utilities, and exporting models to and from JSON.

Returns

Python dictionary.

Expand source code
def get_config(self):
    return {'coefficients': self.coef, 'penalization': self.mask}