"""
Documentation
"""
# Python Modules
import argparse
import itertools
import os
import tempfile
import unittest
# 3rd Party Modules
import numpy as np
import tensorflow as tf
# Project Modules
import deletor.tfutils as tfutils
import examples.build_tfrecords as build
import examples.pipeline as pipeline
from deletor.constants import MIN_FLOAT_32
tfutils.disable_gpu()
tf.config.experimental_run_functions_eagerly(True)
np.set_printoptions(precision=6, suppress=True, edgeitems=200, linewidth=1000000)
[docs]def pairwise(iterable):
# See: https://stackoverflow.com/a/5434936/4971706
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
[docs]class TestDataset(unittest.TestCase):
data_file = 'data/mltr30k/valid.tfrecords.gz'
stats_file = 'data/mltr30k/train.stats.pkl'
dataset = None
# noinspection DuplicatedCode
[docs] @classmethod
def setUpClass(cls) -> None:
scriptpath = os.path.realpath(__file__)
directory = os.path.dirname(scriptpath)
# The raw data is stored here
svmpath = os.path.join(directory, 'test_data.svm')
# Write a tfrecords file based on the raw data
with tempfile.NamedTemporaryFile() as tfrfile:
# Create a Namespace with the necessary arguments to write the
# tfrecords file to disk.
args = {
'input_file': svmpath,
'output_file': tfrfile.name,
'compression_type': None,
'compression_level': None
}
# Write the tfrecords file
build.write_data(argparse.Namespace(**args))
dataset = pipeline.load_dataset(tfrfile.name, n_features=6).cache()
# Load the dataset into the cache
for _ in dataset:
pass
cls.dataset = dataset
[docs] def test_dataset(self):
dataset = self.dataset
n_queries = sum([1 for _ in dataset])
self.assertEqual(5, n_queries)
expected_x = [[[3., 3., 0., 0., 3., 1.],
[3., 0., 3., 0., 3., 1.],
[3., 0., 2., 0., 3., 1.],
[3., 0., 3., 0., 3., 1.]],
[[3., 0., 3., 1., 3., 1.],
[3., 0., 3., 1., 3., 1.],
[3., 0., 3., 0., 3., 1.]],
[[0., 0., 0., 0., 0., 0.],
[0., 0., 0., 2., 2., 0.]],
[[3., 0., 3., 3., 3., 0.428571],
[7., 0., 2., 0., 7., 1.],
[5., 0., 3., 3., 5., 0.714286],
[6., 0., 3., 3., 6., 0.857143],
[7., 1., 3., 2., 7., 1.]],
[[4., 0., 1., 0., 4., 1.],
[4., 0., 1., 0., 4., 1.],
[4., 0., 1., 0., 4., 1.],
[1., 0., 0., 0., 1., 0.25]]]
expected_y = [[2., 2., 0., 2.],
[0., 1., 3.],
[2, 1],
[0., 1., 0., 2., 0.],
[2., 0., 0., 0.]]
for i, (x, y) in enumerate(dataset):
np.testing.assert_array_almost_equal(expected_x[i], x['sequence_dense'], decimal=5)
np.testing.assert_array_equal(expected_y[i], y)
[docs] def test_truncate_list_size(self):
scriptpath = os.path.realpath(__file__)
directory = os.path.dirname(scriptpath)
# The raw data is stored here
svmpath = os.path.join(directory, 'test_data.svm')
# Write a tfrecords file based on the raw data
with tempfile.NamedTemporaryFile() as tfrfile:
# Create a Namespace with the necessary arguments to write the
# tfrecords file to disk.
args = {
'input_file': svmpath,
'output_file': tfrfile.name,
'compression_type': None,
'compression_level': None
}
# Write the tfrecords file
build.write_data(argparse.Namespace(**args))
dataset = pipeline.load_dataset(tfrfile.name, n_features=6)
list_size = 3
padded_shapes = pipeline.make_padded_shapes(list_size)
padding_values = pipeline.make_padding_values()
dataset = dataset.map(lambda a, b: pipeline.truncate_document_list(a, b, list_size))
dataset = dataset.padded_batch(2, padded_shapes, padding_values)
for x, y in dataset:
self.assertEqual(x['sequence_dense'].numpy().shape[1], list_size)
self.assertEqual(y.numpy().shape[1], list_size)
[docs] def test_shuffle_documents(self):
x = {'sequence_dense': tf.reshape(tf.range(24), (4, 6)) * 2}
y = tf.range(4) * 3
exp_x = [[24, 26, 28, 30, 32, 34],
[0, 2, 4, 6, 8, 10],
[36, 38, 40, 42, 44, 46],
[12, 14, 16, 18, 20, 22]]
exp_y = [6, 0, 9, 3]
act_x, act_y = pipeline.shuffle_documents(x, y, 1)
np.testing.assert_array_almost_equal(act_x['sequence_dense'], exp_x)
np.testing.assert_array_almost_equal(act_y, exp_y)
[docs] def test_sample_documents_flat_indices(self):
batch_size = 2
group_size = 3
multiples = 2
padded_shapes = pipeline.make_padded_shapes()
padding_values = pipeline.make_padding_values()
sample_documents = pipeline.sample_documents
dataset = self.dataset.padded_batch(batch_size, padded_shapes, padding_values)
# Expected output for sampling with the `flat_indices` method
exp_x = [
[[[3., 3., 0., 0., 3., 1.],
[3., 0., 3., 0., 3., 1.],
[3., 0., 2., 0., 3., 1.]],
[[3., 0., 3., 0., 3., 1.],
[3., 3., 0., 0., 3., 1.],
[3., 0., 3., 0., 3., 1.]],
[[3., 0., 2., 0., 3., 1.],
[3., 0., 3., 0., 3., 1.],
[3., 3., 0., 0., 3., 1.]],
[[3., 0., 3., 0., 3., 1.],
[3., 0., 2., 0., 3., 1.],
[3., 0., 3., 0., 3., 1.]],
[[3., 3., 0., 0., 3., 1.],
[3., 0., 3., 0., 3., 1.],
[3., 0., 2., 0., 3., 1.]],
[[3., 0., 3., 0., 3., 1.],
[3., 3., 0., 0., 3., 1.],
[3., 0., 3., 0., 3., 1.]],
[[3., 0., 2., 0., 3., 1.],
[3., 0., 3., 0., 3., 1.],
[3., 3., 0., 0., 3., 1.]],
[[3., 0., 3., 0., 3., 1.],
[3., 0., 2., 0., 3., 1.],
[3., 0., 3., 0., 3., 1.]]],
[[[3., 0., 3., 1., 3., 1.],
[3., 0., 3., 1., 3., 1.],
[3., 0., 3., 0., 3., 1.]],
[[0., 0., 0., 0., 0., 0.],
[3., 0., 3., 1., 3., 1.],
[3., 0., 3., 1., 3., 1.]],
[[3., 0., 3., 0., 3., 1.],
[0., 0., 0., 0., 0., 0.],
[3., 0., 3., 1., 3., 1.]],
[[3., 0., 3., 1., 3., 1.],
[3., 0., 3., 0., 3., 1.],
[0., 0., 0., 0., 0., 0.]],
[[3., 0., 3., 1., 3., 1.],
[3., 0., 3., 1., 3., 1.],
[3., 0., 3., 0., 3., 1.]],
[[0., 0., 0., 0., 0., 0.],
[3., 0., 3., 1., 3., 1.],
[3., 0., 3., 1., 3., 1.]],
[[3., 0., 3., 0., 3., 1.],
[0., 0., 0., 0., 0., 0.],
[3., 0., 3., 1., 3., 1.]],
[[3., 0., 3., 1., 3., 1.],
[3., 0., 3., 0., 3., 1.],
[0., 0., 0., 0., 0., 0.]]]
]
exp_y = [
[[2, 2, 0],
[2, 2, 2],
[0, 2, 2],
[2, 0, 2],
[2, 2, 0],
[2, 2, 2],
[0, 2, 2],
[2, 0, 2]],
[[0, 1, 3],
[MIN_FLOAT_32, 0, 1],
[3, MIN_FLOAT_32, 0],
[1, 3, MIN_FLOAT_32],
[0, 1, 3],
[MIN_FLOAT_32, 0, 1],
[3, MIN_FLOAT_32, 0],
[1, 3, MIN_FLOAT_32]]
]
exp_scores = [
[0 + 4 + 8 + 12 + 16 + 20,
1 + 5 + 9 + 13 + 17 + 21,
2 + 6 + 10 + 14 + 18 + 22,
3 + 7 + 11 + 15 + 19 + 23],
[24 + 28 + 32 + 36 + 40 + 44,
25 + 29 + 33 + 37 + 41 + 45,
26 + 30 + 34 + 38 + 42 + 46,
27 + 31 + 35 + 39 + 43 + 47]
]
for i, (x, y) in enumerate(dataset):
x, y = sample_documents(x, y, group_size, multiples, method='flat_indices', seed=-1)
y, act_y = y
act_x = x['sample_dense']
if i == 0:
updates = tf.reshape(tf.range(2 * 8 * 3), (2, 8, 3))
scatter_idx = x['scatter_idx']
act_scores = tf.scatter_nd(scatter_idx, updates, (2, 4))
np.testing.assert_array_equal(act_x, exp_x)
np.testing.assert_array_almost_equal(act_y, exp_y, decimal=5)
np.testing.assert_array_equal(act_scores, exp_scores)
# Check that the shuffling is different across epochs
kwargs = {'method': 'flat_indices'}
dataset = dataset.map(lambda a, b: sample_documents(a, b, group_size, multiples, **kwargs))
act_x = [[[] for _ in range(3)] for _ in range(2)]
for epoch in range(2):
for i, (x, y) in enumerate(dataset):
act_x[epoch][i] = x['sample_dense'].numpy()
is_different = [not np.isclose(act_x[0][i], act_x[1][i]).all() for i in range(3)]
self.assertTrue(any(is_different))
[docs] def test_sample_windowed(self):
batch_size = 2
n_docs = 5
n_features = 2
group_size = 3
x = {'sequence_dense': tf.reshape(tf.range(20), (batch_size, n_docs, n_features))}
y = tf.reshape(tf.range(batch_size * n_docs), (batch_size, n_docs))
x['sequence_dense'] = tf.cast(x['sequence_dense'], tf.float32)
y = tf.cast(y, tf.float32)
exp_x = [
[[[0, 1], [2, 3], [4, 5]],
[[2, 3], [4, 5], [6, 7]],
[[4, 5], [6, 7], [8, 9]],
[[6, 7], [8, 9], [0, 1]],
[[8, 9], [0, 1], [2, 3]],
[[0, 1], [2, 3], [4, 5]],
[[2, 3], [4, 5], [6, 7]],
[[4, 5], [6, 7], [8, 9]],
[[6, 7], [8, 9], [0, 1]],
[[8, 9], [0, 1], [2, 3]],
[[0, 1], [2, 3], [4, 5]],
[[2, 3], [4, 5], [6, 7]],
[[4, 5], [6, 7], [8, 9]],
[[6, 7], [8, 9], [0, 1]],
[[8, 9], [0, 1], [2, 3]]],
[[[10, 11], [12, 13], [14, 15]],
[[12, 13], [14, 15], [16, 17]],
[[14, 15], [16, 17], [18, 19]],
[[16, 17], [18, 19], [10, 11]],
[[18, 19], [10, 11], [12, 13]],
[[10, 11], [12, 13], [14, 15]],
[[12, 13], [14, 15], [16, 17]],
[[14, 15], [16, 17], [18, 19]],
[[16, 17], [18, 19], [10, 11]],
[[18, 19], [10, 11], [12, 13]],
[[10, 11], [12, 13], [14, 15]],
[[12, 13], [14, 15], [16, 17]],
[[14, 15], [16, 17], [18, 19]],
[[16, 17], [18, 19], [10, 11]],
[[18, 19], [10, 11], [12, 13]]]
]
x, (y, y_sample) = pipeline.sample_documents(x, y, group_size, method='windowed', seed=-1)
act_x = x['sample_dense']
act_y = y_sample
np.testing.assert_array_equal(act_x, exp_x)
np.testing.assert_array_equal(act_y, y)
updates = tf.reshape(
tf.range(batch_size * n_docs * group_size),
(batch_size, group_size * n_docs)
)
scatter_idx = x['scatter_idx']
exp_scores = [
[15, 18, 21, 24, 27],
[60, 63, 66, 69, 72]
]
act_scores = tf.scatter_nd(scatter_idx, updates, (batch_size, n_docs))
np.testing.assert_array_equal(act_scores, exp_scores)
if __name__ == '__main__':
unittest.main()