Source code for deletor.random.sample

"""
Documentation
"""
# Python Modules
import abc

from typing import Dict, Optional

# 3rd Party Modules
import tensorflow as tf

# Project Modules
from deletor.constants import MIN_FLOAT_32


# Methods of sampling
# Note this analysis assumes that the number of samples is an exact multiple
# of the number of documents. If it is not, then some of the statements
# regarding whether every document is guaranteed to be sampled and how
# frequently relative to others will no longer hold.
# * Random: For each of `N` samples indices are drawn at random (with
#   replacement) from the range 0 - `n_documents`. There are no guarantees
#   that all documents will be sampled, some documents may be sampled more
#   (or less than others), and some documents may appear more than once
#   in a given sample.
# * Independent: For each of `N` samples the the indices for all of the
#   documents are shuffled and then only the top `sample_size` are kept.
#   There are no guarantees that all documents will be sampled, some
#   documents may be sampled more (or less) than others, however,
#   all documents will be unique within a sample.
# * RandomGuaranteed: Each of the documents is guaranteed to be in at least
#   one sample, but the other caveats about the `Random` method still apply.
#   [Implementation: Use `range` to determine the first value of each sample,
#    then randomly assign the other indices using random values.]
# * IndependentGuaranteed: Each of the documents is guaranteed to be in at
#   least one sample, but is otherwise similar to `Independent`.
#   [Implementation: Use `range` to determine the first value of each sample,
#    then assign the rest using the `Independent` method. This might be a
#    little tricky removing the first value from the set of indices to
#    shuffle.]
# * RoundRobin: A set of indices that is guaranteed to have all of the documents
#   in the same frequency is generated first and then shuffled. This will
#   ensure that all of the documents are included and have the same frequency
#   overall. However, the uniqueness of documents within a sample is not
#   guaranteed.


[docs]class DocumentSampler(abc.ABC): """ A base class for various methods of sampling a set of documents from a batch. """ def __init__( self, sample_size: int, n_samples: Optional[int] = None, multiple: Optional[int] = None, sample_pre_batch: bool = False, pad_value: float = MIN_FLOAT_32 ): if not (n_samples or multiple): raise ValueError("Either n_samples or multiple must be set.") if n_samples and multiple: raise ValueError("Either n_samples or multiple must be set, but not both.") # I'm not sure if @tf.function can handle None values so it's safer # to just use integer code values (i.e. > 0) to determine which to # use. n_samples = n_samples or 0 multiple = multiple or 0 self.n_samples = tf.constant(n_samples, tf.int32) self.multiple = tf.constant(multiple, tf.int32) self.sample_size = tf.constant(sample_size, tf.int32) self.sample_pre_batch = sample_pre_batch self.pad_value = tf.constant(pad_value, tf.float32) def __call__(self, x: Dict[str, tf.Tensor], y: tf.Tensor, w=None, **kwargs): exp_x_rank = 2 if self.sample_pre_batch else 3 exp_y_rank = 1 if self.sample_pre_batch else 2 x['sequence_dense'].get_shape().assert_has_rank(exp_x_rank) y.get_shape().assert_has_rank(exp_y_rank) return self.sample(x, y, w, **kwargs)
[docs] def sample(self, x: Dict[str, tf.Tensor], y: tf.Tensor, w=None, **kwargs): if self.sample_pre_batch is True: return self.sample_before_batching(x, y, w, **kwargs) else: return self.sample_after_batching(x, y, w, **kwargs)
[docs] @abc.abstractmethod def sample_before_batching(self, x: Dict[str, tf.Tensor], y: tf.Tensor, w=None, **kwargs): pass
[docs] @abc.abstractmethod def sample_after_batching(self, x: Dict[str, tf.Tensor], y: tf.Tensor, w=None, **kwargs): pass
[docs] @abc.abstractmethod def make_gather_indices(self, **kwargs): pass
[docs] @abc.abstractmethod def make_scatter_indices(self, **kwargs): pass
[docs] @abc.abstractmethod def count_sampled_documents(self, **kwargs): pass
# region MultiOutputSamplers
[docs]class IndependentMultiOutputSampler(DocumentSampler): def __init__( self, sample_size: int, n_samples: Optional[int] = None, multiple: Optional[int] = None, sample_pre_batch: bool = False, pad_value: float = MIN_FLOAT_32 ): """ A sampling method that generates samples independently from the others and guarantees that no document is included in a sample more than once as long as the ``sample_size`` (i.e., group size) is less than the number of documents. However, the frequency of each document is not guaranteed to be the same and there is the potential for some documents to be excluded completely. After applying this sampler, the input data (:math:`X`) dictionary will have 3 new entries and the :math:`y` value will be a 2 element tuple. The new :math:`X` entries are: * sample_dense Contains the sampled documents (the original feature tensor(s) are preserved in the ``sequential_dense`` entry). * scatter_idx Contains a set of indexes for use with :func:`tensorflow.scatter_nd` (or :func:`tensorflow.gather_nd`) to map the order of sampled documents back to their original order in the query. This is useful/essential for aggregating the scores of each document in a query. * document_counts Contains a tensor that keeps track of how many times each document has been sampled. This is useful if we want to average the scores over documents in a query instead of summing them. :param sample_size: The number of documents in each sample (i.e., the group size). :param n_samples: The number of samples to generate. :param multiple: The class will generate :math:`n H_{n}` samples by default. See the `coupon collector's problem <https://en.wikipedia.org/wiki/Coupon_collector%27s_problem>`_ for the meaning of :math:`n H_{n}`. You can increase the number of samples by a multiple of this value with this parameter. :param sample_pre_batch: Whether the sampler assumes the input has been padded and batched already or not. You almost certainly want this to be ``False``. :param pad_value: The value used to pad entries in the tensor (from :meth:`tensorflow.data.Dataset.padded_batch`). """ super().__init__(sample_size, n_samples, multiple, sample_pre_batch, pad_value) self.min_float = tf.constant(MIN_FLOAT_32) self.gamma = tf.constant(0.5772157, tf.float32)
[docs] def sample_before_batching(self, x: Dict[str, tf.Tensor], y: tf.Tensor, w=None, **kwargs): """ Each sample is generated independently of the other samples. Each sample is guaranteed to have unique documents, however, not every document is guaranteed to be included in the output and the frequency of some documents may be more (or less) than others. :param x: :param y: :param w: :return: """ # **For debugging and testing only** # Read the random values from the keyword arguments to ensure the # samples are deterministic during unit tests. random_values = kwargs.get('random_values') xd = x['sequence_dense'] y_shape = tf.shape(y) # (n_document,) n_documents = y_shape[0] multiple = self.multiple sample_size = self.sample_size # I should not need a mask, because the inputs are not batched/padded. if random_values is None: # There might be cases when we want more samples than we have documents. There are # quite a few ways to handle this (one natural way would be to pad). Instead I'll # use a simple scheme for sampling with replacement. # I'll generate at least as many random numbers to satisfy the sample size # (or more for the usual case when n_documents is larger). Then (below) I'll # take the floor mode of the shuffle index to make sure that none of the # indexes are out of range for the number of documents we actually have. group_size = tf.maximum(sample_size, n_documents) n_samples = self.get_number_of_samples(multiple, n_documents, sample_size) random_values = tf.random.uniform((n_samples, group_size)) else: n_samples = tf.shape(random_values)[0] # Use argsort to get a shuffled array of indices shuffle_idx = tf.argsort(random_values, direction='DESCENDING', stable=True) shuffle_idx = tf.math.floormod(shuffle_idx, n_documents) shuffle_idx = shuffle_idx[..., :sample_size] gather_indices = self.make_gather_indices gather_idx = gather_indices(None, n_samples, shuffle_idx, sample_size) x_sample = tf.gather_nd(xd, gather_idx) y_sample = tf.gather_nd(y, gather_idx) # In order for unbatching to work it is necessary (easiest) to clobber # the existing values rather than add them as new entries to the # dictionary. This shouldn't matter during training, but care should be # used if it is used for other purposes. x['sequence_dense'] = x_sample if w is None: return x, y_sample else: return x, y_sample, w
[docs] def sample_after_batching(self, x: Dict[str, tf.Tensor], y: tf.Tensor, w=None, **kwargs): """ Each sample is generated independently of the other samples. Each sample is guaranteed to have unique documents, however, not every document is guaranteed to be included in the output and the frequency of some documents may be more (or less) than others. :param x: :param y: :param w: :return: """ # **For debugging and testing only** # Read the random values from the keyword arguments to ensure the # samples are deterministic during unit tests. random_values = kwargs.get('random_values') xd = x['sequence_dense'] y_shape = tf.shape(y) # (batch_size, n_document) batch_size = y_shape[0] n_documents = y_shape[1] n_features = tf.shape(xd)[2] multiple = self.multiple # sample_size = tf.minimum(self.sample_size, n_documents) sample_size = self.sample_size # Create a mask of the non-padded/padded values mask = tf.cast(tf.greater_equal(y, 0), tf.float32) # (batch_size, n_documents) padlen = sample_size - n_documents def pad_mask(): paddings = tf.zeros([2, 2], tf.int32) paddings = tf.tensor_scatter_nd_add(paddings, [[1, 1]], [padlen]) return tf.pad(mask, paddings) mask = tf.cond(tf.greater(padlen, 0), pad_mask, lambda: mask) # A better estimate of the number of samples needed to see all values # at least once is given by the "coupon collectors problem". # So far, this website gives the most intuitive explanation, but # there are lots of more detailed explanations on the web. # http://www.cs.tufts.edu/comp/250P/classpages/coupon.html # See the Wikipedia page for details about the probability bounds # https://en.wikipedia.org/wiki/Coupon_collector%27s_problem # This links seems to have the formula for calculating the probability # that P(T > n). # https://math.stackexchange.com/a/1454749/382190 if random_values is None: # There might be cases when we want more samples than we have documents. There are # quite a few ways to handle this (one natural way would be to pad). Instead I'll # use a simple scheme for sampling with replacement. # I'll generate at least as many random numbers to satisfy the sample size # (or more for the usual case when n_documents is larger). Then (below) I'll # take the floor mod of the shuffle index to make sure that none of the # indexes are out of range for the number of documents we actually have. group_size = tf.maximum(sample_size, n_documents) n_samples = self.get_number_of_samples(multiple, n_documents, sample_size) random_values = tf.random.uniform((n_samples, group_size)) else: n_samples = tf.shape(random_values)[1] # Make sure the index of any padded value stays at the back of the list random_values = tf.expand_dims(mask, 1) * random_values # Use argsort to get a shuffled array of indices shuffle_idx = tf.argsort(random_values, direction='DESCENDING', stable=True) shuffle_idx = tf.math.floormod(shuffle_idx, n_documents) shuffle_idx = shuffle_idx[..., :sample_size] gather_indices = self.make_gather_indices scatter_indices = self.make_scatter_indices count_documents = self.count_sampled_documents gather_idx = gather_indices(batch_size, n_samples, shuffle_idx, sample_size) scatter_idx = scatter_indices(gather_idx) counts = count_documents(scatter_idx, batch_size, n_samples, n_documents, sample_size) y_sample = tf.gather_nd(y, gather_idx) x_sample = tf.gather_nd(xd, gather_idx) x['sample_dense'] = tf.reshape(x_sample, [batch_size, n_samples, sample_size, n_features]) x['scatter_idx'] = scatter_idx x['document_counts'] = counts if w is None: return x, (y, y_sample) else: return x, (y, y_sample), w
# region Utility Methods # noinspection DuplicatedCode
[docs] def get_number_of_samples(self, multiple, n_documents, sample_size): if multiple > 0: # A rough approximation of the expected number of samples needed to # see every document at least once. n = tf.cast(n_documents, tf.float32) nh_samples = n * tf.math.log(n) + self.gamma * n + 0.5 nh_samples = tf.math.ceil(nh_samples) / tf.cast(sample_size, tf.float32) nh_samples = tf.cast(nh_samples, tf.int32) n_samples = multiple * nh_samples else: n_samples = self.n_samples return n_samples
[docs] def make_gather_indices(self, batch_size, n_samples, shuffle_idx, sample_size, **kwargs): if batch_size is None: gather_idx = tf.reshape(shuffle_idx, [n_samples, sample_size, 1]) return gather_idx else: batch_idx = tf.repeat(tf.range(batch_size), n_samples * sample_size) shuffle_idx = tf.reshape(shuffle_idx, [batch_size * n_samples * sample_size]) gather_idx = tf.stack([batch_idx, shuffle_idx], axis=-1) return tf.reshape(gather_idx, (batch_size, n_samples, sample_size, 2))
[docs] def make_scatter_indices(self, gather_idx, **kwargs): return gather_idx
[docs] def count_sampled_documents(self, indices, batch_size, n_samples, n_documents, sample_size): if batch_size is None: updates = tf.ones([n_samples, sample_size]) counts = tf.scatter_nd(indices, updates, [n_documents]) else: updates = tf.ones((batch_size, n_samples, sample_size)) counts = tf.scatter_nd(indices, updates, (batch_size, n_documents)) return tf.cast(counts, tf.float32)
# endregion Utility Methods # endregion MultiOutputSamplers # region SingleOutputSamplers
[docs]class IndependentSingleOutputSampler(IndependentMultiOutputSampler): def __init__( self, sample_size: int, n_samples: Optional[int] = None, multiple: Optional[int] = None, sample_pre_batch: bool = False, pad_value: float = MIN_FLOAT_32 ): super().__init__(sample_size, n_samples, multiple, sample_pre_batch, pad_value) # noinspection DuplicatedCode
[docs] def get_number_of_samples(self, multiple, n_documents, sample_size): if multiple > 0: # A rough approximation of the expected number of samples needed to # see every document at least once. n = tf.cast(n_documents, tf.float32) nh_samples = n * tf.math.log(n) + self.gamma * n + 0.5 nh_samples = tf.math.round(nh_samples) nh_samples = tf.cast(nh_samples, tf.int32) n_samples = multiple * nh_samples else: n_samples = self.n_samples return n_samples
[docs] def make_scatter_indices(self, gather_idx, **kwargs): return gather_idx[:, :, ::self.sample_size, :]
[docs] def count_sampled_documents(self, indices, batch_size, n_samples, n_documents, sample_size): updates = tf.ones((batch_size, n_samples, 1)) counts = tf.scatter_nd(indices, updates, (batch_size, n_documents)) return tf.cast(counts, tf.float32)
# endregion SingleOutputSamplers