# Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Documentation
"""
# Python Modules
import logging
from functools import partial
from typing import Any, Dict
# 3rd Party Modules
import tensorflow as tf
import tensorflow.keras.layers as tf_layers
# Project Modules
from deletor.models.utils import normalize_dropout
log = logging.getLogger(__name__)
[docs]class ModelParameter(object):
N_FEATURES = 'n_features'
"""
(**Required**) The number of features in the dataset (e.g., 136 for MSLR)
"""
N_UNITS = 'n_units'
"""
(**Required**) The number of hidden units for each layer.
"""
GROUP_SIZE = 'group_size'
"""
(**Required**) The number of documents in each group.
"""
USE_AVERAGE = 'use_average'
"""
(**Optional**) If ```True``` the final output will be an average over
each document across samples, otherwise it will be the sum.
(Default: ```True```)
"""
SHARE_WEIGHTS = 'share_weights'
"""
(**Optional**) If ```True``` then each document in the input is first
passed through a dense layer before being concatenated into groups.
(Default: ```False```)
"""
DROPOUT_RATE = 'dropout_rate'
"""
(**Optional**) The dropout rate for each layer. This can be a single number
in which case the same dropout will be applied for each layer. Or it can
be a list of the same size as `N_UNITS`.
(Default: 0)
"""
RANDOM_SEED = 'random_seed'
"""
(**Optional**) A random seed to use for any stochastic operations.
(Default: ```None```)
"""
# noinspection PySimplifyBooleanCheck
[docs]class GroupwiseScoringNetwork(tf.keras.Model):
"""
This class (tries to) implements the GSF model from Ai et al.
"""
default_group_size = 16
default_use_average = True
default_share_weights = False
def __init__(self, params: Dict[str, Any], index_pad_value: int = -1, **kwargs):
"""
:param params: See :class:`.ModelParameter` for the valid parameters.
:param index_pad_value:
:param kwargs:
"""
super().__init__(**kwargs)
log.debug(f'model_params: {params}')
# Model parameters
n_features = params[ModelParameter.N_FEATURES]
n_units = params[ModelParameter.N_UNITS]
n_layers = len(n_units)
group_size = params[ModelParameter.GROUP_SIZE]
use_average = params.get(ModelParameter.USE_AVERAGE, self.default_use_average)
share_weights = params.get(ModelParameter.SHARE_WEIGHTS, self.default_share_weights)
random_seed = params.get(ModelParameter.RANDOM_SEED)
self.model_params = params
self.n_features = n_features
self.group_size = group_size
self.use_average = use_average
self.share_weights = share_weights
self.random_seed = random_seed
self.index_pad_value = tf.constant(index_pad_value, tf.int32)
if share_weights is True:
self.input_layer_shape = [None, group_size * n_units[0]]
else:
self.input_layer_shape = [None, group_size * n_features]
# Model layers
kernel_initializer = tf.keras.initializers.he_uniform(seed=random_seed)
dense_layer = partial(tf_layers.Dense, kernel_initializer=kernel_initializer)
if share_weights is True:
self.n_shared_input = n_units[0]
self.shared_input_layer = dense_layer(n_units[0], name='shared_input')
self.shared_activation = tf_layers.PReLU(name='shared_activation')
self.shared_batch_norm = tf_layers.BatchNormalization(name='shared_batch_norm')
n_units = n_units[1:]
n_layers -= 1
dropout_rate = normalize_dropout(params.get(ModelParameter.DROPOUT_RATE), n_units)
self.hidden = [dense_layer(n) for n in n_units]
self.activation = [tf_layers.PReLU() for _ in range(n_layers)]
self.batch_norm = [tf_layers.BatchNormalization() for _ in range(n_layers)]
self.has_dropout = any([r > 0 for r in dropout_rate])
if self.has_dropout is True:
self.dropout = [tf_layers.Dropout(r) for r in dropout_rate]
self.scoring_layer = dense_layer(group_size)
# noinspection DuplicatedCode
[docs] @tf.function(experimental_relax_shapes=True)
def call(self, inputs, training: bool = True, **kwargs):
"""
:param inputs:
:param training:
:return:
"""
x = inputs
# Can only deal with dense features for now.
# The full set of query results for this instance.
# Shape = (batch_size, n_docs, n_features)
xd = x['sequence_dense']
# The current sample to use as inputs.
# Shape = (batch_size, n_samples, group_size, n_features)
xs = x['sample_dense']
# The indices of each sample into the full set of query results.
scatter_idx = x['scatter_idx']
# The batch size (should be the same for all/sampled)
batch_size = tf.shape(xd)[0]
# The maximum number of documents in the batch
n_doc = tf.shape(xd)[1]
# The number of samples for each document
n_samples = tf.shape(xs)[1]
# Keras needs these to be predefined constants apparently
# The group size (how many documents to compare at once)
# group_size = tf.shape(xs)[2]
group_size = self.group_size
# The number of features
# n_features = tf.shape(sampled_dense)[3]
n_features = self.n_features
if self.share_weights is True:
n_rows = batch_size * n_samples * group_size
n_cols = n_features
indata = tf.reshape(xs, [n_rows, n_cols])
in_lyr = self.shared_input_layer(indata)
in_lyr = self.shared_batch_norm(in_lyr)
in_lyr = self.shared_activation(in_lyr)
# n_out = tf.shape(in_lyr)[1]
n_out = self.n_shared_input
input_lyr = tf.reshape(in_lyr, (batch_size * n_samples, group_size * n_out))
else:
input_lyr = tf.reshape(xs, (batch_size * n_samples, group_size * n_features))
lyr = input_lyr
for i in range(len(self.hidden)):
lyr = self.hidden[i](lyr)
lyr = self.batch_norm[i](lyr, training=training)
lyr = self.activation[i](lyr)
if self.has_dropout is True:
lyr = self.dropout[i](lyr, training=training)
sample_scores = self.scoring_layer(lyr)
sample_scores = tf.reshape(sample_scores, (batch_size, n_samples, group_size))
scores = tf.scatter_nd(scatter_idx, sample_scores, (batch_size, n_doc))
if self.use_average is True:
document_counts = x['document_counts']
scores = tf.math.divide_no_nan(scores, document_counts)
return scores
[docs]class GroupwiseScoringNetwork2(tf.keras.Model):
default_group_size = 16
default_use_average = True
default_share_weights = False
def __init__(self, params: Dict[str, Any], index_pad_value: int = -1, **kwargs):
super().__init__(**kwargs)
# Model parameters
n_units = params[ModelParameter.N_UNITS]
n_layers = len(n_units)
group_size = params[ModelParameter.GROUP_SIZE]
use_average = params.get(ModelParameter.USE_AVERAGE, self.default_use_average)
share_weights = params.get(ModelParameter.SHARE_WEIGHTS, self.default_share_weights)
dropout_rate = normalize_dropout(params.get(ModelParameter.DROPOUT_RATE), n_units)
random_seed = params.get(ModelParameter.RANDOM_SEED)
self.model_params = params
self.group_size = tf.constant(group_size, tf.int32)
self.use_average = use_average
self.share_weights = share_weights
self.random_seed = random_seed
self.index_pad_value = tf.constant(index_pad_value, tf.int32)
# Model layers
kernel_initializer = tf.keras.initializers.he_uniform(seed=random_seed)
dense_layer = partial(tf_layers.Dense, kernel_initializer=kernel_initializer)
if share_weights is True:
self.shared_input_layer = dense_layer(n_units[0], name='shared_input')
self.shared_activation = tf_layers.PReLU(name='shared_activation')
self.shared_batch_norm = tf_layers.BatchNormalization(name='shared_batch_norm')
n_units = n_units[1:]
n_layers -= 1
self.hidden = [dense_layer(n) for n in n_units]
self.activation = [tf_layers.PReLU() for _ in range(n_layers)]
self.batch_norm = [tf_layers.BatchNormalization() for _ in range(n_layers)]
self.has_dropout = any([r > 0 for r in dropout_rate])
if self.has_dropout is True:
self.dropout = [tf_layers.Dropout(r) for r in dropout_rate]
self.scoring_layer = dense_layer(group_size)
# noinspection DuplicatedCode
[docs] @tf.function(experimental_relax_shapes=True)
def call(self, inputs, training: bool = True, **kwargs):
"""
:param inputs:
:param training:
:return:
"""
x = inputs
# Can only deal with dense features for now.
# We clobber the data during training in this model, so this is
# actually the sampled data when training. During evaluation this
# is the full data.
# The full set of query results for this instance.
# Shape = (batch_size, n_docs, n_features)
all_dense = x['sequence_dense']
# The current sample to use as inputs.
# Shape = (batch_size, group_size, n_features)
sampled_dense = all_dense if training else x['sample_dense']
# The batch size
batch_size = tf.shape(all_dense)[0]
# The number of samples for each document (for prediction).
n_samples = 1 if training is True else tf.shape(sampled_dense)[1]
# The group size (how many documents to compare at once)
group_size = tf.shape(sampled_dense)[1] if training else tf.shape(sampled_dense)[2]
# The number of features
n_features = tf.shape(sampled_dense)[2] if training else tf.shape(sampled_dense)[3]
if self.share_weights is True:
# If share_weights is True, then pass all the inputs through
# a common dense layer first. At a minimum it might help training
# by reducing the number of parameters, but it also might help
# remove some positional bias. I have no real reason to believe
# this, but it seems intuitive to me.
n_rows = batch_size * n_samples * group_size
n_cols = n_features
indata = tf.reshape(sampled_dense, [n_rows, n_cols])
in_lyr = self.shared_input_layer(indata)
in_lyr = self.shared_batch_norm(in_lyr)
in_lyr = self.shared_activation(in_lyr)
n_out = tf.shape(in_lyr)[1]
input_lyr = tf.reshape(in_lyr, (batch_size * n_samples, group_size * n_out))
else:
input_lyr = tf.reshape(sampled_dense, (batch_size * n_samples, group_size * n_features))
lyr = input_lyr
for i in range(len(self.hidden)):
lyr = self.hidden[i](lyr)
lyr = self.batch_norm[i](lyr, training=training)
lyr = self.activation[i](lyr)
if self.has_dropout is True:
lyr = self.dropout[i](lyr, training=training)
sample_scores = self.scoring_layer(lyr)
# Simply return the sample scores if we are predicting
if training is True:
return sample_scores
# We are predicting
# The indices of each sample into the full set of query results.
# In this version it is only used when not training (for prediction).
scatter_idx = x['scatter_idx']
# The maximum number of documents in the batch (for prediction).
n_doc = tf.shape(all_dense)[1]
sample_scores = tf.reshape(sample_scores, (batch_size, n_samples, group_size))
scores = tf.scatter_nd(scatter_idx, sample_scores, (batch_size, n_doc))
if self.use_average is True:
document_counts = x['document_counts']
scores = tf.math.divide_no_nan(scores, document_counts)
return scores