Source code for test.examples.gsf.mltr30k.test_dataset

"""
Documentation
"""
# Python Modules
import argparse
import itertools
import os
import tempfile
import unittest

# 3rd Party Modules
import numpy as np
import tensorflow as tf

# Project Modules
import deletor.tfutils as tfutils

import examples.build_tfrecords as build
import examples.pipeline as pipeline
from deletor.constants import MIN_FLOAT_32

tfutils.disable_gpu()
tf.config.experimental_run_functions_eagerly(True)

np.set_printoptions(precision=6, suppress=True, edgeitems=200, linewidth=1000000)


[docs]def pairwise(iterable): # See: https://stackoverflow.com/a/5434936/4971706 a, b = itertools.tee(iterable) next(b, None) return zip(a, b)
[docs]class TestDataset(unittest.TestCase): data_file = 'data/mltr30k/valid.tfrecords.gz' stats_file = 'data/mltr30k/train.stats.pkl' dataset = None # noinspection DuplicatedCode
[docs] @classmethod def setUpClass(cls) -> None: scriptpath = os.path.realpath(__file__) directory = os.path.dirname(scriptpath) # The raw data is stored here svmpath = os.path.join(directory, 'test_data.svm') # Write a tfrecords file based on the raw data with tempfile.NamedTemporaryFile() as tfrfile: # Create a Namespace with the necessary arguments to write the # tfrecords file to disk. args = { 'input_file': svmpath, 'output_file': tfrfile.name, 'compression_type': None, 'compression_level': None } # Write the tfrecords file build.write_data(argparse.Namespace(**args)) dataset = pipeline.load_dataset(tfrfile.name, n_features=6).cache() # Load the dataset into the cache for _ in dataset: pass cls.dataset = dataset
[docs] def test_dataset(self): dataset = self.dataset n_queries = sum([1 for _ in dataset]) self.assertEqual(5, n_queries) expected_x = [[[3., 3., 0., 0., 3., 1.], [3., 0., 3., 0., 3., 1.], [3., 0., 2., 0., 3., 1.], [3., 0., 3., 0., 3., 1.]], [[3., 0., 3., 1., 3., 1.], [3., 0., 3., 1., 3., 1.], [3., 0., 3., 0., 3., 1.]], [[0., 0., 0., 0., 0., 0.], [0., 0., 0., 2., 2., 0.]], [[3., 0., 3., 3., 3., 0.428571], [7., 0., 2., 0., 7., 1.], [5., 0., 3., 3., 5., 0.714286], [6., 0., 3., 3., 6., 0.857143], [7., 1., 3., 2., 7., 1.]], [[4., 0., 1., 0., 4., 1.], [4., 0., 1., 0., 4., 1.], [4., 0., 1., 0., 4., 1.], [1., 0., 0., 0., 1., 0.25]]] expected_y = [[2., 2., 0., 2.], [0., 1., 3.], [2, 1], [0., 1., 0., 2., 0.], [2., 0., 0., 0.]] for i, (x, y) in enumerate(dataset): np.testing.assert_array_almost_equal(expected_x[i], x['sequence_dense'], decimal=5) np.testing.assert_array_equal(expected_y[i], y)
[docs] def test_truncate_list_size(self): scriptpath = os.path.realpath(__file__) directory = os.path.dirname(scriptpath) # The raw data is stored here svmpath = os.path.join(directory, 'test_data.svm') # Write a tfrecords file based on the raw data with tempfile.NamedTemporaryFile() as tfrfile: # Create a Namespace with the necessary arguments to write the # tfrecords file to disk. args = { 'input_file': svmpath, 'output_file': tfrfile.name, 'compression_type': None, 'compression_level': None } # Write the tfrecords file build.write_data(argparse.Namespace(**args)) dataset = pipeline.load_dataset(tfrfile.name, n_features=6) list_size = 3 padded_shapes = pipeline.make_padded_shapes(list_size) padding_values = pipeline.make_padding_values() dataset = dataset.map(lambda a, b: pipeline.truncate_document_list(a, b, list_size)) dataset = dataset.padded_batch(2, padded_shapes, padding_values) for x, y in dataset: self.assertEqual(x['sequence_dense'].numpy().shape[1], list_size) self.assertEqual(y.numpy().shape[1], list_size)
[docs] def test_shuffle_documents(self): x = {'sequence_dense': tf.reshape(tf.range(24), (4, 6)) * 2} y = tf.range(4) * 3 exp_x = [[24, 26, 28, 30, 32, 34], [0, 2, 4, 6, 8, 10], [36, 38, 40, 42, 44, 46], [12, 14, 16, 18, 20, 22]] exp_y = [6, 0, 9, 3] act_x, act_y = pipeline.shuffle_documents(x, y, 1) np.testing.assert_array_almost_equal(act_x['sequence_dense'], exp_x) np.testing.assert_array_almost_equal(act_y, exp_y)
[docs] def test_sample_documents_flat_indices(self): batch_size = 2 group_size = 3 multiples = 2 padded_shapes = pipeline.make_padded_shapes() padding_values = pipeline.make_padding_values() sample_documents = pipeline.sample_documents dataset = self.dataset.padded_batch(batch_size, padded_shapes, padding_values) # Expected output for sampling with the `flat_indices` method exp_x = [ [[[3., 3., 0., 0., 3., 1.], [3., 0., 3., 0., 3., 1.], [3., 0., 2., 0., 3., 1.]], [[3., 0., 3., 0., 3., 1.], [3., 3., 0., 0., 3., 1.], [3., 0., 3., 0., 3., 1.]], [[3., 0., 2., 0., 3., 1.], [3., 0., 3., 0., 3., 1.], [3., 3., 0., 0., 3., 1.]], [[3., 0., 3., 0., 3., 1.], [3., 0., 2., 0., 3., 1.], [3., 0., 3., 0., 3., 1.]], [[3., 3., 0., 0., 3., 1.], [3., 0., 3., 0., 3., 1.], [3., 0., 2., 0., 3., 1.]], [[3., 0., 3., 0., 3., 1.], [3., 3., 0., 0., 3., 1.], [3., 0., 3., 0., 3., 1.]], [[3., 0., 2., 0., 3., 1.], [3., 0., 3., 0., 3., 1.], [3., 3., 0., 0., 3., 1.]], [[3., 0., 3., 0., 3., 1.], [3., 0., 2., 0., 3., 1.], [3., 0., 3., 0., 3., 1.]]], [[[3., 0., 3., 1., 3., 1.], [3., 0., 3., 1., 3., 1.], [3., 0., 3., 0., 3., 1.]], [[0., 0., 0., 0., 0., 0.], [3., 0., 3., 1., 3., 1.], [3., 0., 3., 1., 3., 1.]], [[3., 0., 3., 0., 3., 1.], [0., 0., 0., 0., 0., 0.], [3., 0., 3., 1., 3., 1.]], [[3., 0., 3., 1., 3., 1.], [3., 0., 3., 0., 3., 1.], [0., 0., 0., 0., 0., 0.]], [[3., 0., 3., 1., 3., 1.], [3., 0., 3., 1., 3., 1.], [3., 0., 3., 0., 3., 1.]], [[0., 0., 0., 0., 0., 0.], [3., 0., 3., 1., 3., 1.], [3., 0., 3., 1., 3., 1.]], [[3., 0., 3., 0., 3., 1.], [0., 0., 0., 0., 0., 0.], [3., 0., 3., 1., 3., 1.]], [[3., 0., 3., 1., 3., 1.], [3., 0., 3., 0., 3., 1.], [0., 0., 0., 0., 0., 0.]]] ] exp_y = [ [[2, 2, 0], [2, 2, 2], [0, 2, 2], [2, 0, 2], [2, 2, 0], [2, 2, 2], [0, 2, 2], [2, 0, 2]], [[0, 1, 3], [MIN_FLOAT_32, 0, 1], [3, MIN_FLOAT_32, 0], [1, 3, MIN_FLOAT_32], [0, 1, 3], [MIN_FLOAT_32, 0, 1], [3, MIN_FLOAT_32, 0], [1, 3, MIN_FLOAT_32]] ] exp_scores = [ [0 + 4 + 8 + 12 + 16 + 20, 1 + 5 + 9 + 13 + 17 + 21, 2 + 6 + 10 + 14 + 18 + 22, 3 + 7 + 11 + 15 + 19 + 23], [24 + 28 + 32 + 36 + 40 + 44, 25 + 29 + 33 + 37 + 41 + 45, 26 + 30 + 34 + 38 + 42 + 46, 27 + 31 + 35 + 39 + 43 + 47] ] for i, (x, y) in enumerate(dataset): x, y = sample_documents(x, y, group_size, multiples, method='flat_indices', seed=-1) y, act_y = y act_x = x['sample_dense'] if i == 0: updates = tf.reshape(tf.range(2 * 8 * 3), (2, 8, 3)) scatter_idx = x['scatter_idx'] act_scores = tf.scatter_nd(scatter_idx, updates, (2, 4)) np.testing.assert_array_equal(act_x, exp_x) np.testing.assert_array_almost_equal(act_y, exp_y, decimal=5) np.testing.assert_array_equal(act_scores, exp_scores) # Check that the shuffling is different across epochs kwargs = {'method': 'flat_indices'} dataset = dataset.map(lambda a, b: sample_documents(a, b, group_size, multiples, **kwargs)) act_x = [[[] for _ in range(3)] for _ in range(2)] for epoch in range(2): for i, (x, y) in enumerate(dataset): act_x[epoch][i] = x['sample_dense'].numpy() is_different = [not np.isclose(act_x[0][i], act_x[1][i]).all() for i in range(3)] self.assertTrue(any(is_different))
[docs] def test_sample_windowed(self): batch_size = 2 n_docs = 5 n_features = 2 group_size = 3 x = {'sequence_dense': tf.reshape(tf.range(20), (batch_size, n_docs, n_features))} y = tf.reshape(tf.range(batch_size * n_docs), (batch_size, n_docs)) x['sequence_dense'] = tf.cast(x['sequence_dense'], tf.float32) y = tf.cast(y, tf.float32) exp_x = [ [[[0, 1], [2, 3], [4, 5]], [[2, 3], [4, 5], [6, 7]], [[4, 5], [6, 7], [8, 9]], [[6, 7], [8, 9], [0, 1]], [[8, 9], [0, 1], [2, 3]], [[0, 1], [2, 3], [4, 5]], [[2, 3], [4, 5], [6, 7]], [[4, 5], [6, 7], [8, 9]], [[6, 7], [8, 9], [0, 1]], [[8, 9], [0, 1], [2, 3]], [[0, 1], [2, 3], [4, 5]], [[2, 3], [4, 5], [6, 7]], [[4, 5], [6, 7], [8, 9]], [[6, 7], [8, 9], [0, 1]], [[8, 9], [0, 1], [2, 3]]], [[[10, 11], [12, 13], [14, 15]], [[12, 13], [14, 15], [16, 17]], [[14, 15], [16, 17], [18, 19]], [[16, 17], [18, 19], [10, 11]], [[18, 19], [10, 11], [12, 13]], [[10, 11], [12, 13], [14, 15]], [[12, 13], [14, 15], [16, 17]], [[14, 15], [16, 17], [18, 19]], [[16, 17], [18, 19], [10, 11]], [[18, 19], [10, 11], [12, 13]], [[10, 11], [12, 13], [14, 15]], [[12, 13], [14, 15], [16, 17]], [[14, 15], [16, 17], [18, 19]], [[16, 17], [18, 19], [10, 11]], [[18, 19], [10, 11], [12, 13]]] ] x, (y, y_sample) = pipeline.sample_documents(x, y, group_size, method='windowed', seed=-1) act_x = x['sample_dense'] act_y = y_sample np.testing.assert_array_equal(act_x, exp_x) np.testing.assert_array_equal(act_y, y) updates = tf.reshape( tf.range(batch_size * n_docs * group_size), (batch_size, group_size * n_docs) ) scatter_idx = x['scatter_idx'] exp_scores = [ [15, 18, 21, 24, 27], [60, 63, 66, 69, 72] ] act_scores = tf.scatter_nd(scatter_idx, updates, (batch_size, n_docs)) np.testing.assert_array_equal(act_scores, exp_scores)
if __name__ == '__main__': unittest.main()