"""
Documentation
"""
# Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Python Modules
import shelve
from functools import partial
from typing import List, Optional
# 3rd Party Modules
import tensorflow as tf
# Project Modules
from deletor.constants import MIN_FLOAT_32
from deletor.preprocessing import MinMaxScaler, StandardScaler, RobustScaler, PowerTransformer
# There are 136 features in the MLTR dataset.
N_FEATURES = 136
[docs]def make_feature_description(n_features: int):
"""
Specify the feature schemas needed to parse the records.
:param n_features: The number of sequential features in the dataset.
:return: A tuple of dictionaries. The first item of the tuple is a
dictionary containing the context features and the second
item contains the sequence features. In this example there
are no context features and all the sequence features are
dense.
"""
# I store the qid as a context feature in the records file, but it is not
# actually a feature. In some cases it might might make sense to include it
# for debugging purposes, but I don't include it here.
context_features = {'qid': tf.io.FixedLenFeature([], dtype=tf.int64)}
# The input features X
sequence_features = {
f'{k+1}': tf.io.FixedLenSequenceFeature([], dtype=tf.float32)
for k in range(n_features)
}
# The target feature Y
sequence_features['target'] = tf.io.FixedLenSequenceFeature([], dtype=tf.int64)
return context_features, sequence_features
[docs]@tf.function
def parse_example(proto, context_desc, sequence_desc):
"""
Parses a single sequence example using the feature descriptions provided.
:param proto: The raw record from the tfrecords file.
:param context_desc: The schema for the context features.
:param sequence_desc: The schema for the sequential features.
:return:
"""
# Read the parsed example from the file.
example = tf.io.parse_single_sequence_example(proto, context_desc, sequence_desc)
contextual, sequential = example
# Extract the target from the sequential features and delete it from the
# dictionary.
target = sequential['target']
del sequential['target']
# Convert dictionary of features into a tensor.
dense_features = tf.stack(
[a for k, a in sorted(sequential.items(), key=lambda x: int(x[0])) if k != 'target'],
axis=1
)
# If the normalize function is annotated with @tf.function, then I need
# to return a flat list here, because @tf.function won't allow dict
# operations (e.g., mydict['some_key'] = x). Removing the annotation
# seems to have fixed the problem. Any extra performance I might get
# by annotating it should be small since I'll cache the values anyway.
return (
{
'context_meta_qid': contextual['qid'],
'context_one_hot': 0.,
'context_multi_hot': 0.,
'context_dense': 0.,
'sequence_one_hot': 0.,
'sequence_multi_hot': 0.,
'sequence_dense': dense_features
},
tf.cast(target, dtype=tf.float32)
)
[docs]def make_scaler(name: str, s):
if name == 'minmax':
return MinMaxScaler(
s.min_, s.scale_, s.data_min_, s.data_max_, s.data_range_, s.n_samples_seen_
)
if name == 'standard':
return StandardScaler(s.scale_, s.mean_, s.var_, s.n_samples_seen_, s.with_mean, s.with_std)
if name == 'robust':
return RobustScaler(s.center_, s.scale_, s.with_centering, s.with_scaling)
if name == 'power':
# noinspection PyProtectedMember
return PowerTransformer(s.lambdas_, s._scaler if hasattr(s, '_scaler') else None)
[docs]def make_padded_shapes(
list_size: Optional[int] = None,
n_features: int = N_FEATURES,
sample_pre_batch: bool = False,
with_weights: bool = False
):
if with_weights is True:
shapes = (
{
'context_meta_qid': (),
'context_one_hot': (),
'context_multi_hot': (),
'context_dense': (),
'sequence_one_hot': (),
'sequence_multi_hot': (),
'sequence_dense': tf.TensorShape([list_size, n_features])
},
tf.TensorShape([list_size]),
()
)
else:
shapes = (
{
'context_meta_qid': (),
'context_one_hot': (),
'context_multi_hot': (),
'context_dense': (),
'sequence_one_hot': (),
'sequence_multi_hot': (),
'sequence_dense': tf.TensorShape([list_size, n_features])
},
tf.TensorShape([list_size])
)
if sample_pre_batch:
shapes[0]['sample_dense'] = tf.TensorShape([list_size, n_features])
return shapes
[docs]def make_padding_values(sample_pre_batch: bool = False, with_weights: bool = False):
if with_weights is True:
values = (
{
'context_meta_qid': tf.cast(0, tf.int64),
'context_one_hot': 0.,
'context_multi_hot': 0.,
'context_dense': 0.,
'sequence_one_hot': 0.,
'sequence_multi_hot': 0.,
'sequence_dense': 0.
},
MIN_FLOAT_32,
0.0
)
else:
values = (
{
'context_meta_qid': tf.cast(0, tf.int64),
'context_one_hot': 0.,
'context_multi_hot': 0.,
'context_dense': 0.,
'sequence_one_hot': 0.,
'sequence_multi_hot': 0.,
'sequence_dense': 0.
},
MIN_FLOAT_32
)
if sample_pre_batch:
values[0]['sample_dense'] = 0.
return values
[docs]def load_dataset(dataset_filename: str, scalers: List[str] = None, n_features: int = N_FEATURES):
"""
Load a tensorflow records dataset file into tf.Dataset object and optionally apply a
scaling method to the dense features.
:param dataset_filename: The path where the dataset file is located.
:param scalers: A two item list containing the information necessary to scale the data.
The first element is the path to the scalers shelve file.
The second element is the name of the scaler to use.
It should be one of [minmax, standard, robust, power].
:param n_features: The number of features in the dataset.
:return: A tensorflow Dataset object.
"""
# Try to infer the compression type from the filename suffix.
compression_type = (
'GZIP' if dataset_filename.endswith('.gz') else
'ZLIB' if dataset_filename.endswith('.zip') else
None
)
context_desc, sequence_desc = make_feature_description(n_features)
dataset = tf.data.TFRecordDataset(
dataset_filename,
compression_type,
int(1e6),
tf.data.experimental.AUTOTUNE
)
parse_fn = partial(parse_example, context_desc=context_desc, sequence_desc=sequence_desc)
dataset = dataset.map(parse_fn, num_parallel_calls=tf.data.experimental.AUTOTUNE)
if scalers:
scaler_filename, scaler_name = scalers
with shelve.open(scaler_filename, 'r') as db:
scaler = make_scaler(scaler_name, db[scaler_name])
# @tf.function
def normalize(x, y):
x['sequence_dense'] = scaler.transform(x['sequence_dense'])
return x, y
dataset = dataset.map(normalize, num_parallel_calls=tf.data.experimental.AUTOTUNE)
return dataset
[docs]def apply_map(fn, *datasets, **kwargs):
return [dataset.map(lambda x, y: fn(x, y, **kwargs)) for dataset in datasets]
[docs]def is_valid_query(_, y):
n_docs = tf.shape(y)[0]
n_rel = tf.math.reduce_sum(tf.cast(tf.math.greater(y, 0), tf.float32))
return n_docs > 1 and n_rel > 0
[docs]def truncate_document_list(x, y, list_size: int):
dense_features = x['sequence_dense']
n_docs = tf.shape(dense_features)[0]
keep_docs = tf.minimum(n_docs, list_size)
x['sequence_dense'] = dense_features[:keep_docs]
y = y[:keep_docs]
return x, y
[docs]def shuffle_documents(x, y, seed: Optional[int] = None):
argsort = tf.argsort(tf.random.uniform(tf.shape(y), seed=seed))
shuffle_idx = tf.reshape(argsort, (-1, 1))
dense_features = x['sequence_dense']
x['sequence_dense'] = tf.gather_nd(dense_features, shuffle_idx)
y = tf.gather_nd(y, shuffle_idx)
return x, y
[docs]def expand_dims_for_unbatch(x, y):
keys = ['context_one_hot', 'context_multi_hot', 'context_dense', 'sequence_one_hot',
'sequence_multi_hot']
for k in keys:
x[k] = tf.expand_dims(tf.repeat(x[k], tf.shape(y)[0]), 1)
y = tf.expand_dims(y, 1)
return x, y
[docs]def squeeze_for_unbatch(x, y):
for k in x:
x[k] = tf.squeeze(x[k])
y = tf.squeeze(y)
return x, y
[docs]def pad_groups(x, y, group_size: int):
xs = x['sample_dense']
yd, ys = y[0], y[1]
gs = tf.shape(ys)[2]
padlen = tf.cast(group_size - gs, tf.int32)
if padlen > 0:
x_paddings = tf.zeros([4, 2], dtype=tf.int32)
y_paddings = tf.zeros([3, 2], dtype=tf.int32)
x_paddings = tf.tensor_scatter_nd_add(x_paddings, [[2, 1]], [padlen])
y_paddings = tf.tensor_scatter_nd_add(y_paddings, [[2, 1]], [padlen])
xs = tf.pad(xs, x_paddings, constant_values=0.0)
ys = tf.pad(ys, y_paddings, constant_values=MIN_FLOAT_32)
x['sample_dense'] = xs
return x, (yd, ys)
[docs]def select_features(x, y, indices):
"""
Select a subset of the (sequential) features and overwrite the
`sequence_dense` field of the input (`x`) dictionary.
:param x: The input data containing a dictionary of tensors.
:param y: The target values.
:param indices: The indices of the features to select.
:return:
"""
x['sequence_dense'] = tf.gather(x['sequence_dense'], indices, axis=-1)
return x, y