Source code for learners.features

# Copyright 2011 Hugo Larochelle. All rights reserved.
# 
# Redistribution and use in source and binary forms, with or without modification, are
# permitted provided that the following conditions are met:
# 
#    1. Redistributions of source code must retain the above copyright notice, this list of
#       conditions and the following disclaimer.
# 
#    2. Redistributions in binary form must reproduce the above copyright notice, this list
#       of conditions and the following disclaimer in the documentation and/or other materials
#       provided with the distribution.
# 
# THIS SOFTWARE IS PROVIDED BY Hugo Larochelle ``AS IS'' AND ANY EXPRESS OR IMPLIED
# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL Hugo Larochelle OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# 
# The views and conclusions contained in the software and documentation are those of the
# authors and should not be interpreted as representing official policies, either expressed
# or implied, of Hugo Larochelle.

"""
The ``learners.features`` module contains FeatureLearner objects,
meant for feature or representation learning. The MLProblems for
these Learners should be iterators over inputs. Their output should
be a new feature representation of the input.

The currently implemented algorithms are:

* FeatureLearner:      The general interface for learners of features.
* CenterAndNormalize:  Removes the input's mean and divides by its standard deviation, for each input.
* PCA:                 Principal Component Analysis learner.
* ZCA:                 ZCA whitening learner.
* RBM:                 Restricted Boltzmann Machine learner for feature extraction.
* k_means:             The k-means clustering algorithm.
* FeaturePipeline:     A learner made from a pipeline of simpler FeatureLearner objects.

"""

from generic import Learner,OnlineLearner
import mlpython.mlproblems.generic as mlpb
import mlpython.mathutils.nonlinear as mlnonlin
import mlpython.mathutils.linalg as mllin
import numpy as np

[docs]class FeatureLearner(Learner): """ Interface for all Learner objects that learn features. The only additional requirement from Learner is to define a method ``compute_features(example)`` that outputs the feature representation for some given example (normally a single input). """
[docs] def compute_features(self,example): """ Return the feature representation of some given example. A general implementation is provided here, but it is recommended that classes inheriting from FeatureLearner override it. """ # This should work in general, but is a bit more complicated than # it can be... return self.use(MLProblem([example],metadata={'input_size':len(example)})).__iter__()
[docs]class CenterAndNormalize(FeatureLearner): """ Removes the input's mean and divides by its standard deviation, for each input. Note that the mean and standard deviation is computed for each input vector individually, not on the dataset. Option ``regularizer`` is a small constant to add to the standard deviation, to avoid divisions by 0. **Required metadata:** * ``'input_size'``: Size of the inputs. """ def __init__( self, regularizer=10 ): self.regularizer = regularizer
[docs] def train(self,trainset): """ Does nothing: no training needed """
def forget(self): del self.transform del self.mean def compute_features(self,example): return (example - example.mean())/(example.std()+self.regularizer)
[docs] def use(self,dataset): """ Outputs the projection on the principal components, so as to obtain a representation with mean zero and identity covariance. """ return [ self.compute_features(input) for input in dataset ]
[docs] def test(self,dataset): """ Outputs the squared difference between the processed and original input. """ outputs = self.use(dataset) costs = np.zeros((len(dataset),1)) for input,output,cost in zip(dataset,outputs,costs): cost[0] = np.sum((input-self.compute_features(input))**2) return outputs,costs
[docs]class PCA(FeatureLearner): """ Principal Component Analysis. Outputs the input's projection on the principal components, so as to obtain a representation with mean zero and identity covariance. Option ``n_components`` is the number of principal components to compute. Option ``regularizer`` is a small constant to add to the diagonal of the estimated covariance matrix (default=1e-10). **Required metadata:** * ``'input_size'``: Size of the inputs. """ def __init__( self, n_components, regularizer=1e-10 ): self.n_components = n_components self.regularizer = regularizer
[docs] def train(self,trainset): """ Extract principal components. """ # Put data in Numpy matrix input_size = trainset.metadata['input_size'] trainmat = np.zeros((len(trainset),input_size)) t = 0 for input in trainset: trainmat[t,:] = input t+=1 # Compute mean and covariance self.mean = trainmat.mean(axis=0) train_cov = np.cov(trainmat,rowvar=0) # Add a small constant on the diagonal, to regularize train_cov += np.diag(self.regularizer*np.ones(input_size)) ## Compute principal components w,v = np.linalg.eigh(train_cov) s = (-w).argsort() w = w[s] v = v[:,s] self.pca_projection = v[:,:self.n_components] self.pca_scaling = 1./np.sqrt(w[:self.n_components]) self.transform = (self.pca_scaling).reshape((1,-1))*self.pca_projection
def forget(self): del self.transform del self.mean def compute_features(self,example): return np.dot(np.dot(example-self.mean,self.transform),self.pca_projection.T)
[docs] def use(self,dataset): """ Outputs the projection on the principal components, so as to obtain a representation with mean zero and identity covariance. """ return [ self.compute_features(input) for input in dataset ]
[docs] def test(self,dataset): """ Outputs the squared error of the reconstructed inputs. """ outputs = self.use(dataset) costs = np.zeros((len(dataset),1)) for input,output,cost in zip(dataset,outputs,costs): cost[0] = np.sum((input-self.mean -np.dot(output,(1./self.pca_scaling)*self.transform.T))**2) return outputs,costs
[docs]class ZCA(FeatureLearner): """ ZCA whitening preprocessing. Outputs the whitened input, which has the same dimensionality as the original input but with mean zero and identity covariance. Option ``regularizer`` is a small constant to add to the diagonal of the estimated covariance matrix (default=1e-10). **Required metadata:** * ``'input_size'``: Size of the inputs. """ def __init__( self, regularizer=1e-10 ): self.regularizer = regularizer
[docs] def train(self,trainset): """ Extract principal components required for ZCA whitening """ # Put data in Numpy matrix input_size = trainset.metadata['input_size'] trainmat = np.zeros((len(trainset),input_size)) t = 0 for input in trainset: trainmat[t,:] = input t+=1 # Compute mean and covariance self.mean = trainmat.mean(axis=0) train_cov = np.cov(trainmat,rowvar=0) # Add a small constant on the diagonal, to regularize train_cov += np.diag(self.regularizer*np.ones(input_size)) ## Compute principal components w,v = np.linalg.eigh(train_cov) s = (-w).argsort() w = w[s] v = v[:,s] self.projection = v self.scaling = 1./np.sqrt(w) self.transform = (self.scaling).reshape((1,-1))*self.projection
def forget(self): del self.transform del self.mean def compute_features(self,example): return np.dot(np.dot(example-self.mean,self.transform),self.projection.T)
[docs] def use(self,dataset): """ Outputs the whitened inputs. """ return [ self.compute_features(input) for input in dataset ]
[docs] def test(self,dataset): """ Outputs the squared error between the inputs and whitened inputs. """ outputs = self.use(dataset) costs = np.zeros((len(dataset),1)) for input,output,cost in zip(dataset,outputs,costs): cost[0] = np.sum((input-self.mean -output)**2) return outputs,costs
[docs]class RBM(OnlineLearner,FeatureLearner): """ Restricted Boltzmann Machine for feature learning Option ``n_stages`` is the number of training iterations. Options ``learning_rate`` and ``decrease_constant`` correspond to the learning rate and decrease constant used for stochastic gradient descent. Option ``hidden_size`` should be a positive integer specifying the number of hidden units (features). Option ``l1_regularization`` is the weight of L1 regularization on the connection matrix. Option ``seed`` determines the seed for randomly initializing the weights. **Required metadata:** * ``'input_size'``: Size of the inputs. """ def __init__(self, n_stages, learning_rate = 0.01, decrease_constant = 0, hidden_size = 100, l1_regularization = 0, seed = 1234 ): self.n_stages = n_stages self.stage = 0 self.learning_rate = learning_rate self.decrease_constant = decrease_constant self.hidden_size = hidden_size self.l1_regularization = l1_regularization self.seed = seed def initialize_learner(self,metadata): self.rng = np.random.mtrand.RandomState(self.seed) self.input_size = metadata['input_size'] if self.hidden_size <= 0: raise ValueError('hidden_size should be > 0') self.W = (2*self.rng.rand(self.hidden_size,self.input_size)-1)/self.input_size self.c = np.zeros((self.hidden_size)) self.b = np.zeros((self.input_size)) self.deltaW = np.zeros((self.hidden_size,self.input_size)) self.deltac = np.zeros((self.hidden_size)) self.deltab = np.zeros((self.input_size)) self.input = np.zeros((self.input_size)) self.hidden = np.zeros((self.hidden_size)) self.hidden_act = np.zeros((self.hidden_size)) self.hidden_prob = np.zeros((self.hidden_size)) self.neg_input = np.zeros((self.input_size)) self.neg_input_act = np.zeros((self.input_size)) self.neg_input_prob = np.zeros((self.input_size)) self.neg_hidden_act = np.zeros((self.hidden_size)) self.neg_hidden_prob = np.zeros((self.hidden_size)) self.neg_stats = np.zeros((self.hidden_size,self.input_size)) self.n_updates = 0 def update_learner(self,example): self.input[:] = example # Performing CD-1 mllin.product_matrix_vector(self.W,self.input,self.hidden_act) self.hidden_act += self.c mlnonlin.sigmoid(self.hidden_act,self.hidden_prob) np.less(self.rng.rand(self.hidden_size),self.hidden_prob,self.hidden) mllin.product_matrix_vector(self.W.T,self.hidden,self.neg_input_act) self.neg_input_act += self.b mlnonlin.sigmoid(self.neg_input_act,self.neg_input_prob) np.less(self.rng.rand(self.input_size),self.neg_input_prob,self.neg_input) mllin.product_matrix_vector(self.W,self.neg_input,self.neg_hidden_act) self.neg_hidden_act += self.c mlnonlin.sigmoid(self.neg_hidden_act,self.neg_hidden_prob) mllin.outer(self.hidden_prob,self.input,self.deltaW) mllin.outer(self.neg_hidden_prob,self.neg_input,self.neg_stats) self.deltaW -= self.neg_stats np.subtract(self.input,self.neg_input,self.deltab) np.subtract(self.hidden_prob,self.neg_hidden_prob,self.deltac) self.deltaW *= self.learning_rate/(1.+self.decrease_constant*self.n_updates) self.deltab *= self.learning_rate/(1.+self.decrease_constant*self.n_updates) self.deltac *= self.learning_rate/(1.+self.decrease_constant*self.n_updates) self.W += self.deltaW self.b += self.deltab self.c += self.deltac if self.l1_regularization > 0: self.W *= (np.abs(self.W) > (self.l1_regularization * self.learning_rate/(1.+self.decrease_constant*self.n_updates))) self.n_updates += 1 def use_learner(self,example): return [self.compute_features(example)] def compute_features(self,example): output = np.zeros((self.hidden_size)) mllin.product_matrix_vector(self.W,example,self.hidden_act) self.hidden_act += self.c mlnonlin.sigmoid(self.hidden_act,output) return output def cost(self,outputs,example): hidden = outputs[0] mllin.product_matrix_vector(self.W.T,hidden,self.neg_input_act) self.neg_input_act += self.b mlnonlin.sigmoid(self.neg_input_act,self.neg_input_prob) return [ np.sum((example-self.neg_input_prob)**2) ]
[docs]class k_means(FeatureLearner): """ The k-means clustering algorithm. We use the first few examples in the training set to initialize the cluster means. For a given input, the Learner outputs a vector in which the component at the index of the selected cluster is 1, and all others 0. Option ``n_stages`` is the number of iterations over the training set (default=10). Option ``n_clusters`` is the number of clusters (default=10). Option ``use_triangle_activation`` is True if the triangle activation function should be used to compute features. If False, than a hard one-hot feature representation is used (default=False). Option ``seed`` is the seed for the random number generator (only used when some clusters are initially empty, to spawn new clusters). **Required metadata:** * ``'input_size'``: Size of the inputs. """ def __init__( self, n_stages = 50, n_clusters = 10, use_triangle_activation = False, seed = 1234 ): self.n_stages = n_stages self.n_clusters = n_clusters self.use_triangle_activation = use_triangle_activation self.seed = seed self.rng = np.random.mtrand.RandomState(self.seed) # Used only when there are empty clusters self.stage = 0
[docs] def train(self,trainset): """ Extract clusters. """ self.input_size = trainset.metadata['input_size'] initialize = False if self.stage == 0: #self.cluster_means = self.rng.randn(self.n_clusters,self.input_size) #self.cluster_mean_squared_norms = (self.cluster_means**2).sum(1) #initialize = False self.cluster_means = np.zeros((self.n_clusters,self.input_size)) initialize = True cluster_sizes = np.zeros((self.n_clusters,)) new_means = np.zeros((self.n_clusters,self.input_size)) for it in range(self.stage,self.n_stages): t=0 cluster_sizes[:] = 0 new_means[:] = 0 for input in trainset: if initialize and t < self.n_clusters: # Initialize to first few examples self.cluster_means[t,:] = input cluster_sizes[t] += 1 new_means[t,:] += input if t == self.n_clusters-1: # Initialization done self.cluster_mean_squared_norms = (self.cluster_means**2).sum(1) initialize = False else: idx = (self.cluster_mean_squared_norms - 2*np.dot(self.cluster_means,input)).argmin() cluster_sizes[idx] += 1 new_means[idx,:] += input t+=1 empty_clusters = cluster_sizes == 0 nonempty_clusters = cluster_sizes != 0 cluster_sizes[empty_clusters] = -1 # avoid dividing by 0 self.cluster_means[:] = new_means / cluster_sizes.reshape((-1,1)) if np.sum(empty_clusters) > 0: # Empty clusters transformed into a random convex combination of other clusters comb = self.rng.rand(int(empty_clusters.sum()),int(nonempty_clusters.sum())) comb = comb / comb.sum(axis=1).reshape((-1,1)) self.cluster_means[empty_clusters,:] = np.dot(comb, self.cluster_means[nonempty_clusters,:]) self.cluster_mean_squared_norms = (self.cluster_means**2).sum(1) self.stage = self.n_stages
def forget(self): del self.cluster_means del self.cluster_mean_squared_norms self.stage = 0 def compute_cluster(self,input): return (self.cluster_mean_squared_norms - 2*np.dot(self.cluster_means,input)).argmin() def compute_features(self,input): if self.use_triangle_activation: output = np.sqrt(np.sum((self.cluster_means-input)**2,axis=1)) output = np.mean(output) - output output = np.maximum(output,0) else: output = np.zeros((self.n_clusters,)) idx = (self.cluster_mean_squared_norms - 2*np.dot(self.cluster_means,input)).argmin() output[idx] = 1 return output
[docs] def use(self,dataset): """ For a given input, the Learner outputs a vector in which the component at the index of the selected cluster is 1, and all others 0. """ outputs = np.zeros((len(dataset),self.n_clusters)) t=0 for input in dataset: outputs[t,:] = self.compute_features(input) t+=1 return outputs
[docs] def test(self,dataset): """ Outputs the squared error of the reconstructed inputs. """ outputs = self.use(dataset) costs = np.zeros((len(dataset),1)) for input,output,cost in zip(dataset,outputs,costs): cost[0] = np.sum((input-np.dot(output,self.cluster_means))**2) return outputs,costs
[docs]class FeaturePipeline(FeatureLearner): """ Learns a pipeline of FeatureLearners. Outputs the result of applying each trained features sequentially (i.e. stacked features). Option ``feature_learners`` is the list of FeatureLearner objects to train, corresponding to feature learners. **Required metadata:** * ``'input_size'``: Size of the inputs. """ def __init__( self, feature_learners ): self.feature_learners = feature_learners
[docs] def train(self,trainset): """ Trains the pipeline of features. """ feature_trainset = trainset n_trained_features = 0 for fl in self.feature_learners: fl.train(feature_trainset) # Create PreprocessedProblem from the feature learner n_trained_features += 1 def new_representation(input,metadata): for fl_preproc in self.feature_learners[:n_trained_features]: input = fl_preproc.compute_features(input) metadata['input_size'] = len(input) return input feature_trainset = mlpb.PreprocessedProblem(trainset,preprocess=new_representation)
def forget(self): for fl in self.feature_learners: fl.forget() def compute_features(self,example): for fl in self.feature_learners: example = fl.compute_features(example) return example
[docs] def use(self,dataset): """ Outputs the result of applying each FeatureLearners sequentially. """ return [ self.compute_features(input) for input in dataset ]
[docs] def test(self,dataset): """ Returns the outputs and costs based on the last FeatureLearner of the pipeline. """ def previous_to_last_features(input,metadata): for fl in self.feature_learners[:-1]: input = fl.compute_features(input) metadata['input_size'] = len(input) return input dataset = mlpb.PreprocessedProblem(dataset,preprocess=previous_to_last_features) return self.feature_learners[-1].test(dataset)