Source code for deletor.preprocessing
"""
Documentation
"""
# These are ported from the sklearn preprocessing module.
# Python Modules
import abc
# 3rd Party Modules
import numpy as np
import tensorflow as tf
# Project Modules
from deletor.math.utils import spacing
[docs]class MinMaxScaler(DataTransformer):
def __init__(
self,
min_adjust: np.ndarray,
scale: np.ndarray,
data_min: np.ndarray,
data_max: np.ndarray,
data_range: np.ndarray,
n_samples: int
):
self.min_adjust = tf.constant(min_adjust, dtype=tf.float32)
self.scale = tf.constant(scale, dtype=tf.float32)
self.data_min = tf.constant(data_min, dtype=tf.float32)
self.data_max = tf.constant(data_max, dtype=tf.float32)
self.data_range = tf.constant(data_range, dtype=tf.float32)
self.n_samples = tf.constant(n_samples)
[docs] @tf.function
def transform(self, x: tf.Tensor):
x *= self.scale
x += self.min_adjust
return x
[docs] @tf.function
def inverse_transform(self, x: tf.Tensor):
x -= self.min_adjust
x /= self.scale
return x
# noinspection DuplicatedCode
[docs]class StandardScaler(DataTransformer):
def __init__(
self,
scale: np.ndarray,
mean: np.ndarray,
var: np.ndarray,
n_samples: int,
with_mean: bool = True,
with_std: bool = True
):
self.scale = tf.constant(scale, dtype=tf.float32)
self.mean = tf.constant(mean, dtype=tf.float32)
self.var = tf.constant(var, dtype=tf.float32)
self.n_samples = tf.constant(n_samples)
self.with_mean = tf.constant(with_mean)
self.with_std = tf.constant(with_std)
[docs] @tf.function
def transform(self, x: tf.Tensor):
if self.with_mean:
x -= self.mean
if self.with_std:
x /= self.scale
return x
[docs] @tf.function
def inverse_transform(self, x: tf.Tensor):
if self.with_std:
x *= self.scale
if self.with_mean:
x += self.mean
return x
# noinspection DuplicatedCode
[docs]class RobustScaler(DataTransformer):
def __init__(
self,
center: np.ndarray,
scale: np.ndarray,
with_centering: bool = True,
with_scaling: bool = True
):
self.center = tf.constant(center, dtype=tf.float32)
self.scale = tf.constant(scale, dtype=tf.float32)
self.with_centering = tf.constant(with_centering)
self.with_scaling = tf.constant(with_scaling)
[docs] @tf.function
def transform(self, x: tf.Tensor):
if self.with_centering:
x -= self.center
if self.with_scaling:
x /= self.scale
return x
[docs] @tf.function
def inverse_transform(self, x: tf.Tensor):
if self.with_scaling:
x *= self.scale
if self.with_centering:
x += self.center
return x
[docs]class PowerTransformer(DataTransformer):
def __init__(self, lambdas: np.ndarray, scaler=None):
self.lambdas = tf.constant(lambdas, dtype=tf.float32)
if scaler:
self.scaler = StandardScaler(
scaler.scale_,
scaler.mean_,
scaler.var_,
scaler.n_samples_seen_,
scaler.with_mean,
scaler.with_std
)
else:
self.scaler = None
# noinspection DuplicatedCode
[docs] @tf.function
def transform(self, x: tf.Tensor):
def yeo_johnson(inputs):
col, lmbda = inputs[0], inputs[1][0]
mask = tf.cast(tf.math.greater_equal(col, 0), tf.float32)
inv_mask = tf.cast(tf.math.less(col, 0), tf.float32)
# When x >= 0
if tf.abs(lmbda) < spacing(tf.constant(1., dtype=tf.float32)):
out_1 = mask * tf.math.log1p(col)
else:
out_1 = mask * tf.math.divide_no_nan(tf.math.pow(col + 1, lmbda) - 1, lmbda)
out_1 = tf.where(tf.logical_not(tf.math.is_finite(out_1)), tf.zeros_like(out_1), out_1)
# When x < 0
if abs(lmbda - 2) > np.spacing(1.):
out_2 = -tf.math.divide_no_nan(tf.math.pow(-col + 1, 2 - lmbda) - 1, 2 - lmbda)
out_2 *= inv_mask
else:
out_2 = inv_mask * tf.math.log1p(-col)
out_2 = tf.where(tf.logical_not(tf.math.is_finite(out_2)), tf.zeros_like(out_2), out_2)
result = out_1 + out_2
return result
x = tf.transpose(x)
lambdas = tf.reshape(tf.repeat(self.lambdas, tf.shape(x)[1]), tf.shape(x))
x = tf.map_fn(yeo_johnson, (x, lambdas), dtype=tf.float32)
x = tf.transpose(x)
if self.scaler:
x = self.scaler.transform(x)
return x
# noinspection DuplicatedCode
[docs] @tf.function
def inverse_transform(self, x: tf.Tensor):
def yeo_johnson(inputs):
col, lmbda = inputs[0], inputs[1][0]
mask = tf.cast(tf.math.greater_equal(col, 0), tf.float32)
inv_mask = tf.cast(tf.math.less(col, 0), tf.float32)
if abs(lmbda) < spacing(tf.constant(1., dtype=tf.float32)):
out_1 = mask * tf.exp(col) - 1
else:
out_1 = mask * (tf.math.pow(col * lmbda + 1, 1 / lmbda) - 1)
out_1 = tf.where(tf.logical_not(tf.math.is_finite(out_1)), tf.zeros_like(out_1), out_1)
if abs(lmbda - 2) > spacing(tf.constant(1., dtype=tf.float32)):
out_2 = inv_mask * (1. - tf.math.pow(-(2. - lmbda) * col + 1, 1. / (2. - lmbda)))
else:
out_2 = inv_mask * (1. - tf.math.exp(-col))
out_2 = tf.where(tf.logical_not(tf.math.is_finite(out_2)), tf.zeros_like(out_2), out_2)
result = out_1 + out_2
return result
if self.scaler:
x = self.scaler.inverse_transform(x)
x = tf.transpose(x)
lambdas = tf.reshape(tf.repeat(self.lambdas, tf.shape(x)[1]), tf.shape(x))
x = tf.map_fn(yeo_johnson, (x, lambdas), dtype=tf.float32)
x = tf.transpose(x)
return x