Source code for deletor.models.gsf

# Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Documentation
"""
# Python Modules
import logging

from functools import partial
from typing import Any, Dict

# 3rd Party Modules
import tensorflow as tf
import tensorflow.keras.layers as tf_layers

# Project Modules
from deletor.models.utils import normalize_dropout


log = logging.getLogger(__name__)


[docs]class ModelParameter(object): N_FEATURES = 'n_features' """ (**Required**) The number of features in the dataset (e.g., 136 for MSLR) """ N_UNITS = 'n_units' """ (**Required**) The number of hidden units for each layer. """ GROUP_SIZE = 'group_size' """ (**Required**) The number of documents in each group. """ USE_AVERAGE = 'use_average' """ (**Optional**) If ```True``` the final output will be an average over each document across samples, otherwise it will be the sum. (Default: ```True```) """ SHARE_WEIGHTS = 'share_weights' """ (**Optional**) If ```True``` then each document in the input is first passed through a dense layer before being concatenated into groups. (Default: ```False```) """ DROPOUT_RATE = 'dropout_rate' """ (**Optional**) The dropout rate for each layer. This can be a single number in which case the same dropout will be applied for each layer. Or it can be a list of the same size as `N_UNITS`. (Default: 0) """ RANDOM_SEED = 'random_seed' """ (**Optional**) A random seed to use for any stochastic operations. (Default: ```None```) """
# noinspection PySimplifyBooleanCheck
[docs]class GroupwiseInputNetwork(tf.keras.Model): default_group_size = 16 default_share_weights = False default_use_average = True def __init__(self, params: Dict[str, Any], index_pad_value: int = -1, **kwargs): """ :param params: See :class:`.ModelParameter` for the valid parameters. :param index_pad_value: :param kwargs: """ super().__init__(**kwargs) # Model parameters n_units = params[ModelParameter.N_UNITS] n_layers = len(n_units) group_size = params[ModelParameter.GROUP_SIZE] share_weights = params.get(ModelParameter.SHARE_WEIGHTS, self.default_share_weights) use_average = params.get(ModelParameter.USE_AVERAGE, self.default_use_average) random_seed = params.get(ModelParameter.RANDOM_SEED) self.model_params = params self.group_size = tf.constant(group_size, tf.int32) self.share_weights = share_weights self.use_average = use_average self.random_seed = random_seed self.index_pad_value = tf.constant(index_pad_value, tf.int32) # Model layers kernel_initializer = tf.keras.initializers.he_uniform(seed=random_seed) dense_layer = partial(tf_layers.Dense, kernel_initializer=kernel_initializer) if share_weights is True: self.shared_input_layer = dense_layer(n_units[0], name='shared_input') self.shared_activation = tf_layers.PReLU(name='shared_activation') self.shared_batch_norm = tf_layers.BatchNormalization(name='shared_batch_norm') n_units = n_units[1:] n_layers -= 1 dropout_rate = normalize_dropout(params.get(ModelParameter.DROPOUT_RATE), n_units) self.hidden = [dense_layer(n) for n in n_units] self.activation = [tf_layers.PReLU() for _ in range(n_layers)] self.batch_norm = [tf_layers.BatchNormalization() for _ in range(n_layers)] has_dropout = any([r > 0 for r in dropout_rate]) self.has_dropout = has_dropout if has_dropout is True: self.dropout = [tf_layers.Dropout(r) for r in dropout_rate] self.scoring_layer = dense_layer(1) # noinspection DuplicatedCode
[docs] @tf.function(experimental_relax_shapes=True) def call(self, inputs, training: bool = True, **kwargs): x = inputs # Can only deal with dense features for now. # The full set of query results for this instance. # Shape = (batch_size, n_docs, n_features) all_dense = x['sequence_dense'] # The current sample to use as inputs. # Shape = (batch_size, n_samples, group_size, n_features) sampled_dense = x['sample_dense'] # The indices of each sample into the full set of query results. scatter_idx = x['scatter_idx'] # The batch size (should be the same for all/sampled) batch_size = tf.shape(all_dense)[0] # The maximum number of documents in the batch n_doc = tf.shape(all_dense)[1] # The number of samples for each document n_samples = tf.shape(sampled_dense)[1] # The group size (how many documents to compare at once) group_size = tf.shape(sampled_dense)[2] n_features = tf.shape(sampled_dense)[3] # n_features = self.n_features if self.share_weights is True: n_rows = batch_size * n_samples * group_size n_cols = n_features indata = tf.reshape(sampled_dense, [n_rows, n_cols]) in_lyr = self.shared_input_layer(indata) in_lyr = self.shared_batch_norm(in_lyr) in_lyr = self.shared_activation(in_lyr) n_out = tf.shape(in_lyr)[1] input_lyr = tf.reshape(in_lyr, (batch_size * n_samples, group_size * n_out)) else: input_lyr = tf.reshape(sampled_dense, (batch_size * n_samples, group_size * n_features)) lyr = input_lyr for i in range(len(self.hidden)): lyr = self.hidden[i](lyr) lyr = self.batch_norm[i](lyr, training=training) lyr = self.activation[i](lyr) if self.has_dropout is True: lyr = self.dropout[i](lyr, training=training) sample_scores = self.scoring_layer(lyr) sample_scores = tf.reshape(sample_scores, (batch_size, n_samples, 1)) scores = tf.scatter_nd(scatter_idx, sample_scores, (batch_size, n_doc)) if self.use_average is True: document_counts = x['document_counts'] scores = tf.math.divide_no_nan(scores, document_counts) return scores
[docs]class GroupwiseScoringNetwork(tf.keras.Model): """ This class (tries to) implements the GSF model from Ai et al. """ default_group_size = 16 default_use_average = True default_share_weights = False def __init__(self, params: Dict[str, Any], index_pad_value: int = -1, **kwargs): """ :param params: See :class:`.ModelParameter` for the valid parameters. :param index_pad_value: :param kwargs: """ super().__init__(**kwargs) log.debug(f'model_params: {params}') # Model parameters n_features = params[ModelParameter.N_FEATURES] n_units = params[ModelParameter.N_UNITS] n_layers = len(n_units) group_size = params[ModelParameter.GROUP_SIZE] use_average = params.get(ModelParameter.USE_AVERAGE, self.default_use_average) share_weights = params.get(ModelParameter.SHARE_WEIGHTS, self.default_share_weights) random_seed = params.get(ModelParameter.RANDOM_SEED) self.model_params = params self.n_features = n_features self.group_size = group_size self.use_average = use_average self.share_weights = share_weights self.random_seed = random_seed self.index_pad_value = tf.constant(index_pad_value, tf.int32) if share_weights is True: self.input_layer_shape = [None, group_size * n_units[0]] else: self.input_layer_shape = [None, group_size * n_features] # Model layers kernel_initializer = tf.keras.initializers.he_uniform(seed=random_seed) dense_layer = partial(tf_layers.Dense, kernel_initializer=kernel_initializer) if share_weights is True: self.n_shared_input = n_units[0] self.shared_input_layer = dense_layer(n_units[0], name='shared_input') self.shared_activation = tf_layers.PReLU(name='shared_activation') self.shared_batch_norm = tf_layers.BatchNormalization(name='shared_batch_norm') n_units = n_units[1:] n_layers -= 1 dropout_rate = normalize_dropout(params.get(ModelParameter.DROPOUT_RATE), n_units) self.hidden = [dense_layer(n) for n in n_units] self.activation = [tf_layers.PReLU() for _ in range(n_layers)] self.batch_norm = [tf_layers.BatchNormalization() for _ in range(n_layers)] self.has_dropout = any([r > 0 for r in dropout_rate]) if self.has_dropout is True: self.dropout = [tf_layers.Dropout(r) for r in dropout_rate] self.scoring_layer = dense_layer(group_size) # noinspection DuplicatedCode
[docs] @tf.function(experimental_relax_shapes=True) def call(self, inputs, training: bool = True, **kwargs): """ :param inputs: :param training: :return: """ x = inputs # Can only deal with dense features for now. # The full set of query results for this instance. # Shape = (batch_size, n_docs, n_features) xd = x['sequence_dense'] # The current sample to use as inputs. # Shape = (batch_size, n_samples, group_size, n_features) xs = x['sample_dense'] # The indices of each sample into the full set of query results. scatter_idx = x['scatter_idx'] # The batch size (should be the same for all/sampled) batch_size = tf.shape(xd)[0] # The maximum number of documents in the batch n_doc = tf.shape(xd)[1] # The number of samples for each document n_samples = tf.shape(xs)[1] # Keras needs these to be predefined constants apparently # The group size (how many documents to compare at once) # group_size = tf.shape(xs)[2] group_size = self.group_size # The number of features # n_features = tf.shape(sampled_dense)[3] n_features = self.n_features if self.share_weights is True: n_rows = batch_size * n_samples * group_size n_cols = n_features indata = tf.reshape(xs, [n_rows, n_cols]) in_lyr = self.shared_input_layer(indata) in_lyr = self.shared_batch_norm(in_lyr) in_lyr = self.shared_activation(in_lyr) # n_out = tf.shape(in_lyr)[1] n_out = self.n_shared_input input_lyr = tf.reshape(in_lyr, (batch_size * n_samples, group_size * n_out)) else: input_lyr = tf.reshape(xs, (batch_size * n_samples, group_size * n_features)) lyr = input_lyr for i in range(len(self.hidden)): lyr = self.hidden[i](lyr) lyr = self.batch_norm[i](lyr, training=training) lyr = self.activation[i](lyr) if self.has_dropout is True: lyr = self.dropout[i](lyr, training=training) sample_scores = self.scoring_layer(lyr) sample_scores = tf.reshape(sample_scores, (batch_size, n_samples, group_size)) scores = tf.scatter_nd(scatter_idx, sample_scores, (batch_size, n_doc)) if self.use_average is True: document_counts = x['document_counts'] scores = tf.math.divide_no_nan(scores, document_counts) return scores
[docs]class GroupwiseScoringNetwork2(tf.keras.Model): default_group_size = 16 default_use_average = True default_share_weights = False def __init__(self, params: Dict[str, Any], index_pad_value: int = -1, **kwargs): super().__init__(**kwargs) # Model parameters n_units = params[ModelParameter.N_UNITS] n_layers = len(n_units) group_size = params[ModelParameter.GROUP_SIZE] use_average = params.get(ModelParameter.USE_AVERAGE, self.default_use_average) share_weights = params.get(ModelParameter.SHARE_WEIGHTS, self.default_share_weights) dropout_rate = normalize_dropout(params.get(ModelParameter.DROPOUT_RATE), n_units) random_seed = params.get(ModelParameter.RANDOM_SEED) self.model_params = params self.group_size = tf.constant(group_size, tf.int32) self.use_average = use_average self.share_weights = share_weights self.random_seed = random_seed self.index_pad_value = tf.constant(index_pad_value, tf.int32) # Model layers kernel_initializer = tf.keras.initializers.he_uniform(seed=random_seed) dense_layer = partial(tf_layers.Dense, kernel_initializer=kernel_initializer) if share_weights is True: self.shared_input_layer = dense_layer(n_units[0], name='shared_input') self.shared_activation = tf_layers.PReLU(name='shared_activation') self.shared_batch_norm = tf_layers.BatchNormalization(name='shared_batch_norm') n_units = n_units[1:] n_layers -= 1 self.hidden = [dense_layer(n) for n in n_units] self.activation = [tf_layers.PReLU() for _ in range(n_layers)] self.batch_norm = [tf_layers.BatchNormalization() for _ in range(n_layers)] self.has_dropout = any([r > 0 for r in dropout_rate]) if self.has_dropout is True: self.dropout = [tf_layers.Dropout(r) for r in dropout_rate] self.scoring_layer = dense_layer(group_size) # noinspection DuplicatedCode
[docs] @tf.function(experimental_relax_shapes=True) def call(self, inputs, training: bool = True, **kwargs): """ :param inputs: :param training: :return: """ x = inputs # Can only deal with dense features for now. # We clobber the data during training in this model, so this is # actually the sampled data when training. During evaluation this # is the full data. # The full set of query results for this instance. # Shape = (batch_size, n_docs, n_features) all_dense = x['sequence_dense'] # The current sample to use as inputs. # Shape = (batch_size, group_size, n_features) sampled_dense = all_dense if training else x['sample_dense'] # The batch size batch_size = tf.shape(all_dense)[0] # The number of samples for each document (for prediction). n_samples = 1 if training is True else tf.shape(sampled_dense)[1] # The group size (how many documents to compare at once) group_size = tf.shape(sampled_dense)[1] if training else tf.shape(sampled_dense)[2] # The number of features n_features = tf.shape(sampled_dense)[2] if training else tf.shape(sampled_dense)[3] if self.share_weights is True: # If share_weights is True, then pass all the inputs through # a common dense layer first. At a minimum it might help training # by reducing the number of parameters, but it also might help # remove some positional bias. I have no real reason to believe # this, but it seems intuitive to me. n_rows = batch_size * n_samples * group_size n_cols = n_features indata = tf.reshape(sampled_dense, [n_rows, n_cols]) in_lyr = self.shared_input_layer(indata) in_lyr = self.shared_batch_norm(in_lyr) in_lyr = self.shared_activation(in_lyr) n_out = tf.shape(in_lyr)[1] input_lyr = tf.reshape(in_lyr, (batch_size * n_samples, group_size * n_out)) else: input_lyr = tf.reshape(sampled_dense, (batch_size * n_samples, group_size * n_features)) lyr = input_lyr for i in range(len(self.hidden)): lyr = self.hidden[i](lyr) lyr = self.batch_norm[i](lyr, training=training) lyr = self.activation[i](lyr) if self.has_dropout is True: lyr = self.dropout[i](lyr, training=training) sample_scores = self.scoring_layer(lyr) # Simply return the sample scores if we are predicting if training is True: return sample_scores # We are predicting # The indices of each sample into the full set of query results. # In this version it is only used when not training (for prediction). scatter_idx = x['scatter_idx'] # The maximum number of documents in the batch (for prediction). n_doc = tf.shape(all_dense)[1] sample_scores = tf.reshape(sample_scores, (batch_size, n_samples, group_size)) scores = tf.scatter_nd(scatter_idx, sample_scores, (batch_size, n_doc)) if self.use_average is True: document_counts = x['document_counts'] scores = tf.math.divide_no_nan(scores, document_counts) return scores