Source code for xrf.base

"""ExPLAINABLE rANDOM fORESTS (xrf)

Classes implementing random forest classifiers and regressors with
example attribution, i.e., each prediction is associated with a weight
distribution over the training examples. The examples used in forming
a prediction can be limited by their number (k) or by their cumulative
weight (c).

Author: Henrik Boström (bostromh@kth.se)

Copyright 2024 Henrik Boström

License: BSD 3 clause
"""

__version__ = "0.1.1"

import numpy as np
from scipy.sparse import csr_array
from joblib import Parallel, delayed, cpu_count
import time
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.preprocessing import normalize

[docs]class XRandomForestClassifier(): """ Explainable Random Forest Classifier. An explainable random forest classifier is generated in the same way as a standard random forest classifier, but provides example attributions, i.e., each prediction is associated with a weight distribution over the training examples, and allows for selecting a subset of the examples with the highest weight when forming predictions. The same set of parameters are available as for `sklearn.ensemble.RandomForestClassifier` """ def __init__(self,**kwargs): kwargs.update({"oob_score": True}) self.model = RandomForestClassifier() self.model.__dict__.update(kwargs) self.fitted = False self.time_fit = None self.classes_ = None self.y = None self.coverage = None def __repr__(self): return (f"XRandomForestClassifier(model={self.model}")
[docs] def fit(self, X, y): """ Fit explainable random forest classifier. Parameters ---------- X : array-like of shape (n_samples, n_features) training objects y : array-like of shape (n_values,) training labels (numerical values) Returns ------- self : object Fitted XRandomForestClassifier. """ tic = time.time() self.model.fit(X, y) leaves = self.model.apply(X) coverage = [] for i in range(len(self.model.estimators_)): bag = np.bincount( np.random.RandomState(self.model.estimators_[i]. random_state). randint(0,len(X),len(X)), minlength=len(X)) in_bag = bag > 0 rows = leaves[:,i][in_bag] cols = np.arange(len(X))[in_bag] m = csr_array((bag[in_bag], (rows, cols)), shape=(self.model.estimators_[i]. tree_.node_count, len(X))).tolil() m = normalize(m, norm='l1') coverage.append(m) self.coverage = coverage class_indexes = np.array([np.argwhere(self.model.classes_ == y[i])[0][0] for i in range(len(y))]) self.y = np.zeros((len(y), len(self.model.classes_))) self.y[np.arange(len(y)), class_indexes] = 1 self.fitted = True self.classes_ = self.model.classes_ toc = time.time() self.time_fit = toc-tic return self
[docs] def predict_proba(self, X, k=None, c=None, return_examples=False, return_weights=False, normalize_weights=True): """ Predict class probabilities for X. Parameters ---------- X : array-like of shape (n_samples, n_features) test objects k : no. of top-weighted training examples to use when forming predictions, default=None c : cumulative weight of top-weighted training examples to use when forming predictions, default=None return_examples : Boolean, default=False whether or not to output the indexes of training examples that are used when forming predictions return_weights : Boolean, default=False whether or not to output the weights of the training examples that are used when forming predictions (in decreasing order) normalize_weights : Boolean, default=True whether returned weights should be normalized or not Returns ------- probabilities : ndarray of (n_samples,n_classes) with real values class probability distributions examples : ndarray of (n_samples, k) or (n_samples, ) of lists indexes of training examples used when forming predictions Only returned if return_examples == True. weights : ndarray of (n_samples, k) or (n_samples, ) of lists example weights used when forming predictions Only returned if return_weights == True. """ leaves = self.model.apply(X) cdfs = np.array([self.coverage[i][leaves[:,i],:] for i in range(len(self.model.estimators_))]) n_jobs = cpu_count() n_chunks, rem_chunks = divmod(len(self.model.estimators_), n_jobs) indexes = [(i*n_chunks+min(i, rem_chunks), (i+1)*n_chunks+min(i+1, rem_chunks)) for i in range(n_jobs)] cdfs = Parallel(n_jobs=n_jobs, require="sharedmem")( delayed(sum)(cdfs[ind[0]:ind[1]]) for ind in indexes) cdfs = sum(cdfs) cdfs = normalize(cdfs, norm='l1').toarray() weights = cdfs y_train = self.y if k is not None or c is not None: sorted_weights_indexes = np.flip(np.argsort(weights, axis=1), axis=1) if k is not None: top_indexes = sorted_weights_indexes[:, :k] weighted_predictions = normalize( [np.dot(y_train[top_indexes[i]].T, weights[i, top_indexes[i]]) for i in range(len(weights))], norm="l1") else: sorted_weights = np.array([weights[i, sorted_weights_indexes[i]] for i in range(len(weights))]) cum_weights = np.cumsum(sorted_weights, axis=1) filtered_cum_weights = np.where(cum_weights<c, cum_weights, np.inf) k = np.argmax(filtered_cum_weights, axis=1)+1 top_indexes = np.array([sorted_weights_indexes[i, :k[i]] for i in range(len(weights))], dtype="object") weighted_predictions = normalize( [np.dot(y_train[top_indexes[i]].T, weights[i, top_indexes[i]]) for i in range(len(weights))], norm="l1") else: # k is None and c is None: weighted_predictions = np.array([np.dot(y_train.T, weights[i]) for i in range(len(weights))]) results = [weighted_predictions] if return_examples: if k is None and c is None: results.append(np.tile(np.arange(len(y_train)), (len(X),1))) else: results.append(top_indexes) if return_weights: if k is None and c is None: results.append(weights) else: weights = np.array([weights[i][top_indexes[i][weights[i][ top_indexes[i]] > 0]] for i in range(len(top_indexes))], dtype="object") if normalize_weights: weights = np.array([w/np.sum(w) for w in weights], dtype="object") if len(weights.shape) > 1: weights = weights.astype(float) results.append(weights) if len(results) == 1: return results[0] else: return results
[docs] def predict(self, X, k=None, c=None, return_examples=False, return_weights=False, normalize_weights=True): """ Predict class for X. Parameters ---------- X : array-like of shape (n_samples, n_features) test objects k : no. of top-weighted training examples to use when forming predictions, default=None c : cumulative weight of top-weighted training examples to use when forming predictions, default=None return_examples : Boolean, default=False whether or not to output the indexes of training examples that are used when forming predictions return_weights : Boolean, default=False whether or not to output the weights of the training examples that are used when forming predictions (in decreasing order) normalize_weights : Boolean, default=True whether returned weights should be normalized or not Returns ------- labels : ndarray of (n_samples,) with class labels predicted classes examples : ndarray of (n_samples, k) or (n_samples, ) of lists indexes of training examples used when forming predictions Only returned if return_examples == True. weights : ndarray of (n_samples, k) or (n_samples, ) of lists example weights used when forming predictions Only returned if return_weights == True. """ results = self.predict_proba(X, k, c, return_examples, return_weights, normalize_weights) if isinstance(results, list): results[0] = np.array([self.model.classes_[np.argmax(results[0][i])] for i in range(len(results[0]))]) else: results = np.array([self.model.classes_[np.argmax(results[i])] for i in range(len(results))]) return results
[docs]class XRandomForestRegressor(): """Explainable Random Forest Regressor. An explainable random forest regressor is generated in the same way as a standard random forest regressor, but provides example attributions, i.e., each prediction is associated with a weight distribution over the training examples, and allows for selecting a subset of the examples with the highest weight when forming predictions. The same set of parameters are available as for `sklearn.ensemble.RandomForestRegressor` """ def __init__(self,**kwargs): kwargs.update({"oob_score": True}) self.model = RandomForestRegressor() self.model.__dict__.update(kwargs) self.fitted = False self.time_fit = None self.y = None self.coverage = None def __repr__(self): return (f"XRandomForestRegressor(model={self.model}")
[docs] def fit(self, X, y): """ Fit explainable random forest regressor. Parameters ---------- X : array-like of shape (n_samples, n_features) training objects y : array-like of shape (n_values,) training labels (numerical values) Returns ------- self : object Fitted XRandomForestRegressor. """ tic = time.time() self.model.fit(X, y) leaves = self.model.apply(X) coverage = [] for i in range(len(self.model.estimators_)): bag = np.bincount( np.random.RandomState(self.model.estimators_[i]. random_state). randint(0,len(X),len(X)), minlength=len(X)) in_bag = bag > 0 rows = leaves[:,i][in_bag] cols = np.arange(len(X))[in_bag] m = csr_array((bag[in_bag], (rows, cols)), shape=(self.model.estimators_[i]. tree_.node_count, len(X))) m = normalize(m, norm='l1') coverage.append(m) self.coverage = coverage self.y = y self.fitted = True toc = time.time() self.time_fit = toc-tic return self
[docs] def predict(self, X, k=None, c=None, return_examples=False, return_weights=False, normalize_weights=True): """ Predict regression target for X. Parameters ---------- X : array-like of shape (n_samples, n_features) test objects k : no. of top-weighted training examples to use when forming predictions, default=None c : cumulative weight of top-weighted training examples to use when forming predictions, default=None return_examples : Boolean, default=False whether or not to output the indexes of training examples that are used when forming predictions return_weights : Boolean, default=False whether or not to output the weights of the training examples that are used when forming predictions (in decreasing order) normalize_weights : Boolean, default=True whether returned weights should be normalized or not Returns ------- predictions : ndarray of (n_samples,) with real values point predictions examples : ndarray of (n_samples, k) or (n_samples, ) of lists indexes of training examples used when forming predictions Only returned if return_examples == True. weights : ndarray of (n_samples, k) or (n_samples, ) of lists example weights used when forming predictions Only returned if return_weights == True. """ leaves = self.model.apply(X) cdfs = np.array([self.coverage[i][leaves[:,i],:] for i in range(len(self.model.estimators_))]) n_jobs = cpu_count() n_chunks, rem_chunks = divmod(len(self.model.estimators_), n_jobs) indexes = [(i*n_chunks+min(i, rem_chunks), (i+1)*n_chunks+min(i+1, rem_chunks)) for i in range(n_jobs)] cdfs = Parallel(n_jobs=n_jobs, require="sharedmem")( delayed(sum)(cdfs[ind[0]:ind[1]]) for ind in indexes) cdfs = sum(cdfs) cdfs = normalize(cdfs, norm='l1').toarray() weights = cdfs y_train = self.y if k is not None or c is not None: sorted_weights_indexes = np.flip(np.argsort(weights, axis=1), axis=1) if k is not None: top_indexes = sorted_weights_indexes[:, :k] weighted_predictions = np.sum([weights[i, top_indexes[i]] \ * y_train[top_indexes[i]] \ / np.sum(weights[i, top_indexes[i]]) for i in range(len(top_indexes))], axis=1) else: sorted_weights = np.array([weights[i, sorted_weights_indexes[i]] for i in range(len(weights))]) cum_weights = np.cumsum(sorted_weights, axis=1) filtered_cum_weights = np.where(cum_weights<c, cum_weights, np.inf) k = np.argmax(filtered_cum_weights, axis=1)+1 top_indexes = [sorted_weights_indexes[i, :k[i]] for i in range(len(weights))] weighted_predictions = np.array([ np.sum(weights[i, top_indexes[i]] \ * y_train[top_indexes[i]] \ / np.sum(weights[i, top_indexes[i]])) for i in range(len(top_indexes))]) else: weighted_predictions = np.sum(weights*y_train, axis=1) results = [weighted_predictions] if return_examples: if k is None and c is None: results.append(np.tile(np.arange(len(y_train)), (len(X),1))) else: results.append(top_indexes) if return_weights: if k is None and c is None: results.append(weights) else: weights = np.array([weights[i][top_indexes[i][weights[i][ top_indexes[i]] > 0]] for i in range(len(top_indexes))], dtype="object") if normalize_weights: weights = np.array([w/np.sum(w) for w in weights], dtype="object") if len(weights.shape) > 1: weights = weights.astype(float) results.append(weights) if len(results) == 1: return results[0] else: return results