Module minder_utils.models.feature_extractors.keras_autoencoders
Expand source code
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Input, Conv2D, Flatten
from tensorflow.keras.callbacks import EarlyStopping
import os
from tensorflow import keras
def get_ae_model(model_type='nn', input_dim=(8, 14, 3), encoding_dim=24 * 7):
if model_type == 'nn':
input_layer = Flatten()(Input(shape=(input_dim,)))
encoded = Dense(256, activation='relu')(input_layer)
encoded = Dense(512, activation='relu')(encoded)
encoded = Dense(encoding_dim, activation='relu', name='latent')(encoded)
decoded = Dense(512, activation='relu')(encoded)
decoded = Dense(256, activation='relu')(decoded)
decoded = Dense(input_dim, activation='sigmoid')(decoded)
encoder = Model(input_layer, encoded)
autoencoder = Model(input_layer, decoded)
elif model_type == 'cnn':
input_layer = Input(shape=input_dim)
encoded = Conv2D(4, (3, 3), activation='relu', padding='same')(input_layer)
encoded = Conv2D(3, (3, 3), activation='relu', padding='same')(encoded)
latent = Flatten(name='latent')(encoded)
decoded = Conv2D(4, (3, 3), activation='relu', padding='same')(encoded)
decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(decoded)
encoder = Model(input_layer, latent)
autoencoder = Model(input_layer, decoded)
else:
raise NotImplementedError()
autoencoder.compile(optimizer='adam', loss='mse')
return encoder, autoencoder
class Extractor:
def __init__(self, save_path=None):
self.callbacks = [EarlyStopping(monitor='loss', patience=3)]
self.epochs = 100
self.batch_size = 512
self.existing_models = {}
self.save_path = save_path
def train(self, data, model_type, normalisation=None):
if self.save_path is not None:
try:
encoder = keras.models.load_model(os.path.join(self.save_path, model_type + str(normalisation)))
self.existing_models[model_type + str(normalisation)] = encoder
except (OSError, FileNotFoundError):
pass
if model_type + str(normalisation) in self.existing_models:
return
print('Train Extractor: ', model_type, normalisation)
encoder, model = get_ae_model(model_type)
# data = data.reshape(data.shape[0], -1)
# data = normalise(data, normalisation)
if model_type == 'nn':
data = data.reshape(data.shape[0], -1)
else:
# data = data.reshape([data.shape[0], 8, 14, 3])
data = data.transpose(0, 2, 3, 1)
model.fit(data, data, epochs=self.epochs, batch_size=self.batch_size, callbacks=self.callbacks, verbose=0)
self.existing_models[model_type + str(normalisation)] = encoder
encoder.save(os.path.join(self.save_path, model_type + str(normalisation)))
def transform(self, data, model_type, normalisation=None):
if model_type == 'nn':
data = data.reshape(data.shape[0], -1)
else:
# data = data.reshape([data.shape[0], 8, 14, 3])
data = data.transpose(0, 2, 3, 1)
return self.existing_models[model_type + str(normalisation)].predict(data)
Functions
def get_ae_model(model_type='nn', input_dim=(8, 14, 3), encoding_dim=168)
-
Expand source code
def get_ae_model(model_type='nn', input_dim=(8, 14, 3), encoding_dim=24 * 7): if model_type == 'nn': input_layer = Flatten()(Input(shape=(input_dim,))) encoded = Dense(256, activation='relu')(input_layer) encoded = Dense(512, activation='relu')(encoded) encoded = Dense(encoding_dim, activation='relu', name='latent')(encoded) decoded = Dense(512, activation='relu')(encoded) decoded = Dense(256, activation='relu')(decoded) decoded = Dense(input_dim, activation='sigmoid')(decoded) encoder = Model(input_layer, encoded) autoencoder = Model(input_layer, decoded) elif model_type == 'cnn': input_layer = Input(shape=input_dim) encoded = Conv2D(4, (3, 3), activation='relu', padding='same')(input_layer) encoded = Conv2D(3, (3, 3), activation='relu', padding='same')(encoded) latent = Flatten(name='latent')(encoded) decoded = Conv2D(4, (3, 3), activation='relu', padding='same')(encoded) decoded = Conv2D(3, (3, 3), activation='sigmoid', padding='same')(decoded) encoder = Model(input_layer, latent) autoencoder = Model(input_layer, decoded) else: raise NotImplementedError() autoencoder.compile(optimizer='adam', loss='mse') return encoder, autoencoder
Classes
class Extractor (save_path=None)
-
Expand source code
class Extractor: def __init__(self, save_path=None): self.callbacks = [EarlyStopping(monitor='loss', patience=3)] self.epochs = 100 self.batch_size = 512 self.existing_models = {} self.save_path = save_path def train(self, data, model_type, normalisation=None): if self.save_path is not None: try: encoder = keras.models.load_model(os.path.join(self.save_path, model_type + str(normalisation))) self.existing_models[model_type + str(normalisation)] = encoder except (OSError, FileNotFoundError): pass if model_type + str(normalisation) in self.existing_models: return print('Train Extractor: ', model_type, normalisation) encoder, model = get_ae_model(model_type) # data = data.reshape(data.shape[0], -1) # data = normalise(data, normalisation) if model_type == 'nn': data = data.reshape(data.shape[0], -1) else: # data = data.reshape([data.shape[0], 8, 14, 3]) data = data.transpose(0, 2, 3, 1) model.fit(data, data, epochs=self.epochs, batch_size=self.batch_size, callbacks=self.callbacks, verbose=0) self.existing_models[model_type + str(normalisation)] = encoder encoder.save(os.path.join(self.save_path, model_type + str(normalisation))) def transform(self, data, model_type, normalisation=None): if model_type == 'nn': data = data.reshape(data.shape[0], -1) else: # data = data.reshape([data.shape[0], 8, 14, 3]) data = data.transpose(0, 2, 3, 1) return self.existing_models[model_type + str(normalisation)].predict(data)
Methods
def train(self, data, model_type, normalisation=None)
-
Expand source code
def train(self, data, model_type, normalisation=None): if self.save_path is not None: try: encoder = keras.models.load_model(os.path.join(self.save_path, model_type + str(normalisation))) self.existing_models[model_type + str(normalisation)] = encoder except (OSError, FileNotFoundError): pass if model_type + str(normalisation) in self.existing_models: return print('Train Extractor: ', model_type, normalisation) encoder, model = get_ae_model(model_type) # data = data.reshape(data.shape[0], -1) # data = normalise(data, normalisation) if model_type == 'nn': data = data.reshape(data.shape[0], -1) else: # data = data.reshape([data.shape[0], 8, 14, 3]) data = data.transpose(0, 2, 3, 1) model.fit(data, data, epochs=self.epochs, batch_size=self.batch_size, callbacks=self.callbacks, verbose=0) self.existing_models[model_type + str(normalisation)] = encoder encoder.save(os.path.join(self.save_path, model_type + str(normalisation)))
def transform(self, data, model_type, normalisation=None)
-
Expand source code
def transform(self, data, model_type, normalisation=None): if model_type == 'nn': data = data.reshape(data.shape[0], -1) else: # data = data.reshape([data.shape[0], 8, 14, 3]) data = data.transpose(0, 2, 3, 1) return self.existing_models[model_type + str(normalisation)].predict(data)