#
#
# Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Python Modules
import argparse
import logging
import os
from functools import partial, reduce
# 3rd Party Modules
from typing import Optional
import numpy as np
import tensorflow as tf
# Project Modules
import examples.pipeline as pipeline
import examples.utils as utils
import deletor.tfutils as tfutils
from deletor.metrics import NormalizedDiscountedCumulativeGain
from deletor.models.boost import ModelParameter, WeakGroupwiseScoringNetwork
from deletor.models.gsf import GroupwiseScoringNetwork
from deletor.random.sample import IndependentMultiOutputSampler
tfutils.grow_memory()
np.set_printoptions(precision=6, suppress=True, edgeitems=100, linewidth=10000)
log = logging.getLogger('boost/mltr30k')
[docs]def prepare_data(args: argparse.Namespace):
list_size = args.list_size
scaler = args.scaler
train_file, valid_file, test_file = args.train_file, args.valid_file, args.test_file
datasets = [pipeline.load_dataset(f, scaler) for f in [train_file, valid_file, test_file]]
datasets = [d.filter(pipeline.is_valid_query) for d in datasets]
if list_size:
truncate_documents = partial(pipeline.truncate_document_list, list_size=list_size)
datasets = [d.map(lambda x, y: truncate_documents(x, y)) for d in datasets]
else:
# If the list size is not set we need to set it to the maximum number
# of documents in the training data.
list_size = datasets[0].reduce(
0,
lambda x, y: tf.maximum(x, tf.shape(y[0]['sequence_dense']))[0]
)
train_data = datasets[0].shard(2, 0), datasets[0].shard(2, 1)
valid_data = datasets[1].shard(2, 0), datasets[1].shard(2, 1)
train_data = list(map(lambda d: d.cache(), train_data))
valid_data = list(map(lambda d: d.cache(), valid_data))
test_data = datasets[2].cache()
n_docs = [d.reduce(0, lambda x, _: x + 1) for d in train_data]
# Do not shuffle instances so that it is easier to update instance weights
return list_size, n_docs, train_data, valid_data, test_data
[docs]def prepare_weak_data(train_data, valid_data, features, weights, args):
list_size = args.list_size
n_sampled_features = tf.shape(features)[0]
group_size = args.group_size
multiples = args.multiples
train_bsz = args.weak_training_batch_size
eval_bsz = args.weak_evaluation_batch_size
drop_remainder = args.drop_remainder
with_weights = weights is not None
tshapes = pipeline.make_padded_shapes(list_size, n_sampled_features, with_weights=with_weights)
tvalues = pipeline.make_padding_values(with_weights=with_weights)
vshapes = pipeline.make_padded_shapes(list_size, n_sampled_features)
vvalues = pipeline.make_padding_values()
select_features = pipeline.select_features
valid_data = [d.map(lambda a, b: select_features(a, b, features)) for d in valid_data]
if with_weights is True:
weights_data = tf.data.Dataset.from_tensor_slices(weights.numpy())
# Add the weights to the dataset. This has the effect of returning a tuple
# where the first element is a tuple of (x, y) and the second element is
# the weights. Note: we don't need weights for the second training
# dataset.
train_data[0] = tf.data.Dataset.zip((train_data[0], weights_data))
# This will unpack the weights to return a tuple of three elements:
# (x, y, w)
train_data[0] = train_data[0].map(lambda a, b: (a[0], a[1], b))
sampler = IndependentMultiOutputSampler(group_size, multiple=multiples)
train_data[0] = train_data[0].padded_batch(train_bsz, tshapes, tvalues, drop_remainder)
train_data[1] = train_data[1].padded_batch(train_bsz, vshapes, vvalues, drop_remainder)
valid_data = [d.padded_batch(eval_bsz, vshapes, vvalues, drop_remainder) for d in valid_data]
train_data = [d.cache() for d in train_data]
valid_data = [d.cache() for d in valid_data]
train_data = [d.map(sampler) for d in train_data]
valid_data = [d.map(sampler) for d in valid_data]
train_data = [d.prefetch(tf.data.experimental.AUTOTUNE) for d in train_data]
valid_data = [d.prefetch(tf.data.experimental.AUTOTUNE) for d in valid_data]
return train_data, valid_data
[docs]def prepare_test_data(data: tf.data.Dataset, features, args):
n_sampled_features = tf.shape(features)[0]
group_size = args.group_size
multiples = args.multiples
drop_remainder = args.drop_remainder
pad_shapes = pipeline.make_padded_shapes(args.list_size, n_sampled_features)
pad_values = pipeline.make_padding_values()
select_features = pipeline.select_features
data = data.map(lambda a, b: select_features(a, b, features))
data = data.padded_batch(1, pad_shapes, pad_values, drop_remainder)
data = data.cache()
sampler = IndependentMultiOutputSampler(group_size, multiple=multiples)
data = data.map(sampler)
data = data.prefetch(tf.data.experimental.AUTOTUNE)
return data
# def prepare_strong_data(orig_data, pred_data, args, is_training: bool):
[docs]def prepare_strong_data(new_data, args, is_training: bool):
"""
Create a new dataset that merges the original data with the predictions
from the weak models.
:param orig_data: A dataset created using the standard pipeline.
:param pred_data: A list of prediction datasets. Each prediction dataset
should contain data points with a single 1D tensor with predicted
scores for each document. Note: the number of documents in each
data point may be different.
:param args:
:param is_training:
:return:
"""
# First create a single dataset with all of the predictions.
# pred_data = tf.data.Dataset.zip(tuple(pred_data))
#
# # Create a new dataset that has a single tensor (as opposed to a list/tuple)
# # of all the values. Stacking the weights for each individual model will
# # result in a tensor of shape [n_models, n_documents], but we want it the
# # other way around, so we apply a transpose opperation.
# pred_data = pred_data.map(lambda *x1: tf.transpose(tf.stack(x1)))
#
# # Merge the original dataset with the prediction data using zip.
# new_data = tf.data.Dataset.zip((orig_data, pred_data))
#
# # A simple function for concatenating the predictions onto the dense
# # sequential features.
# def merge_weak_predictions(a, z):
# x = a[0]
# y = a[1]
# x['sequence_dense'] = tf.concat([x['sequence_dense'], z], axis=-1)
#
# return x, y
#
# # Concatenate the predictions onto the dense features using the function
# # above.
# new_data = new_data.map(merge_weak_predictions)
# Now do the other normal data processing stuff (i.e., padding and sampling)
pad_shapes = pipeline.make_padded_shapes(n_features=pipeline.N_FEATURES + args.n_models)
pad_values = pipeline.make_padding_values()
sampler = IndependentMultiOutputSampler(args.group_size, multiple=args.multiples)
bsz = args.strong_training_batch_size if is_training else args.strong_evaluation_batch_size
new_data = new_data.padded_batch(bsz, pad_shapes, pad_values, args.drop_remainder)
new_data = new_data.cache()
new_data = new_data.map(sampler)
return new_data.prefetch(tf.data.experimental.AUTOTUNE)
[docs]def setup_model(args: argparse.Namespace, n_model_features: int = 0):
n_units = args.n_weak_units if n_model_features == 0 else args.n_strong_units
model_params = {
ModelParameter.N_FEATURES: pipeline.N_FEATURES + n_model_features,
ModelParameter.N_SAMPLED_FEATURES: args.n_sampled_features,
ModelParameter.N_UNITS: n_units,
ModelParameter.GROUP_SIZE: args.group_size,
ModelParameter.USE_AVERAGE: args.use_average,
ModelParameter.SHARE_WEIGHTS: args.share_weights,
ModelParameter.DROPOUT_RATE: args.dropout_rate,
}
if n_model_features == 0:
model = WeakGroupwiseScoringNetwork(model_params)
else:
model = GroupwiseScoringNetwork(model_params)
optimizer = utils.make_optimizer(args)
loss = utils.make_loss(args, reduce=False)
metrics = [
NormalizedDiscountedCumulativeGain(k=1),
NormalizedDiscountedCumulativeGain(k=5),
NormalizedDiscountedCumulativeGain(k=10),
]
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)
return model
[docs]def make_callbacks(args, model_num: Optional[int] = None):
model_name = 'strong' if model_num is None else 'weak'
model_num = model_num or 0
monitor = 'val_ndcg_at_005'
checkpoint_path = os.path.join(args.checkpoint_dir, f'{model_name}_model_{model_num:02d}.mdl')
csv_path = f'/tmp/gsf_{model_name}_{model_num:02d}.csv'
callbacks = [
tf.keras.callbacks.EarlyStopping(
monitor=monitor,
min_delta=1e-4,
patience=10,
mode='max',
restore_best_weights=True
),
tf.keras.callbacks.ModelCheckpoint(
checkpoint_path,
monitor=monitor,
save_best_only=True,
mode='max',
),
tf.keras.callbacks.CSVLogger(csv_path)
]
return callbacks
# noinspection PyTypeChecker
[docs]def update_weights(weak_model, data: tf.data.Dataset, weights: tf.Tensor, ndcg_interval: float = 1):
metric = NormalizedDiscountedCumulativeGain(k=5)
ndcg = []
for x, (yd, ys), w in data:
y_pred = weak_model(x, training=False)
ndcg.append(metric(yd, y_pred))
ndcg = tf.concat(ndcg, axis=0)
def rescale(min_value=-ndcg_interval, max_value=ndcg_interval):
min_ndcg = tf.math.reduce_min(ndcg)
max_ndcg = tf.math.reduce_max(ndcg)
return (
(max_value - min_value)
* (ndcg - min_ndcg) / (max_ndcg - min_ndcg)
+ min_value
)
errors = 1.0 - ndcg
epsilon = tf.reduce_sum(weights * errors) / tf.reduce_sum(weights)
alpha = 0.5 * tf.math.log((1.0 - epsilon) / epsilon)
new_weights = weights * tf.math.exp(-2.0 * alpha * rescale(ndcg))
return new_weights / tf.reduce_sum(new_weights)
[docs]def main(args: argparse.Namespace):
"""
Split the training dataset `T` and validation dataset `V` into 2 parts:
1. Training/validation for the "weak" learners.
2. Training/validating an ensemble from the weak learners.
Assign uniform weights to every query/instance in T_{1}.
For computational efficiency assign the same hyper-parameters to all
the weak learners.
for i...N (until stopping criteria is met):
* Construct a weak neural NN_{i} that uses a random (but reproducible)
subset `F` of available features.
* Train NN_{i} on T_{1} until performance on V_{1} plateaus.
* Calculate the error on T_{1}. What this means is TBD but should
be based on the NDCG.
* Use the error to update the weights for each instance
Construct an ensemble model.
To train this model, each weak learner will make a prediction for each
data point (i.e., each document in a query) and these predictions will
be used as additional sequential input features to the ensemble model
(in addition to the full set of features for the data).
:param args:
:return:
"""
logging.basicConfig(
level=args.log_level,
format='%(asctime)-15s [%(name)s]:%(lineno)d %(levelname)s %(message)s'
)
list_size, n_documents, train_data, valid_data, test_data = prepare_data(args)
# This has to be after prepare_data
tf.config.experimental_run_functions_eagerly(args.run_eagerly)
# train the weak learners
# NOTE: The weights array could potentially have the wrong
# dimension / number of values if `drop_remainder` is true.
n_docs_0, n_docs_1 = n_documents
weights = tf.ones([n_docs_0], tf.float32) / tf.cast(n_docs_0, tf.float32)
log.debug(f"n_docs_0: {n_docs_0.numpy()} n_docs_1: {n_docs_1.numpy()}")
weak_models = []
tf.get_logger().setLevel(logging.ERROR)
for t in range(args.n_models):
log.info(f"Start training weak model {t:02d}")
log.debug(f"Using weights: {weights.numpy()}")
weak_models += [setup_model(args)]
model = weak_models[-1]
features = model.feature_indices
tdata, vdata = prepare_weak_data(train_data, valid_data, features, weights, args)
# Note the custom training loop doesn't work for this model, because
# there's a problem with the way weights are handled.
callbacks = make_callbacks(args, t)
model.fit(
tdata[0],
validation_data=vdata[0],
epochs=args.max_weak_epochs,
callbacks=callbacks,
verbose=0
)
weights = update_weights(model, tdata[0], weights)
def apply_model(a, b, m):
a[f'weak_model'] = m(a, training=False)
return a, b
log.info("Using the weak models to make predictions on the second half of the split data.")
(_, train_data_1), (_, valid_data_1) = train_data, valid_data
train_pred_datasets = []
valid_pred_datasets = []
test_pred_datasets = []
for i, weak_model in enumerate(weak_models):
log.debug(f"Making predictions for model: {i}")
features = weak_model.feature_indices
new_args = argparse.Namespace(**vars(args))
new_args.weak_training_batch_size = 1
new_args.weak_evaluation_batch_size = 1
tdata, vdata = prepare_weak_data(train_data, valid_data, features, None, new_args)
edata = prepare_test_data(test_data, features, args)
tdata[1] = tdata[1].map(lambda x1, y1: apply_model(x1, y1, weak_model))
vdata[1] = vdata[1].map(lambda x1, y1: apply_model(x1, y1, weak_model))
edata = edata.map(lambda x1, y1: apply_model(x1, y1, weak_model))
# The following code appears to make a dataset such that each row is a
# query (presumably in the original order) with the predictions for
# each document.
def train_gen():
for x1, y1 in tdata[1]:
yield x1['weak_model'][0]
def valid_gen():
for x1, y1 in vdata[1]:
yield x1['weak_model'][0]
def test_gen():
for x1, y1 in edata:
yield x1['weak_model'][0]
datasets = tf.data.Dataset.from_generator(train_gen, output_types=tf.float32)
train_pred_datasets.append(datasets)
datasets = tf.data.Dataset.from_generator(valid_gen, output_types=tf.float32)
valid_pred_datasets.append(datasets)
datasets = tf.data.Dataset.from_generator(test_gen, output_types=tf.float32)
test_pred_datasets.append(datasets)
def merge_datasets(orig_data, pred_data):
pred_data = tf.data.Dataset.zip(tuple(pred_data))
# Create a new dataset that has a single tensor (as opposed to a list/tuple)
# of all the values. Stacking the weights for each individual model will
# result in a tensor of shape [n_models, n_documents], but we want it the
# other way around, so we apply a transpose opperation.
pred_data = pred_data.map(lambda *x1: tf.transpose(tf.stack(x1)))
# Merge the original dataset with the prediction data using zip.
new_data = tf.data.Dataset.zip((orig_data, pred_data))
# A simple function for concatenating the predictions onto the dense
# sequential features.
def merge_weak_predictions(a, z):
x = a[0]
y = a[1]
x['sequence_dense'] = tf.concat([x['sequence_dense'], z], axis=-1)
return x, y
# Concatenate the predictions onto the dense features using the function
# above.
new_data = new_data.map(merge_weak_predictions)
return new_data
ens_train_data = merge_datasets(train_data_1, train_pred_datasets)
ens_valid_data = merge_datasets(valid_data_1, valid_pred_datasets)
ens_test_data = merge_datasets(test_data, test_pred_datasets)
ens_train_data = prepare_strong_data(ens_train_data, args, True)
ens_valid_data = prepare_strong_data(ens_valid_data, args, False)
ens_test_data = prepare_strong_data(ens_test_data, args, False)
strong_model = setup_model(args, n_model_features=args.n_models)
callbacks = make_callbacks(args)
log.info("Start training the ensemble model.")
strong_model.fit(
ens_train_data,
validation_data=ens_valid_data,
epochs=args.max_strong_epochs,
callbacks=callbacks,
verbose=1
)
strong_model.evaluate(ens_test_data)
# noinspection DuplicatedCode
if __name__ == '__main__':
# The current implementation does not work well on GPUs
cli = argparse.ArgumentParser(fromfile_prefix_chars='@')
cli.add_argument(
'--log-level',
required=False,
type=utils.log_level_type,
choices=[logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR],
default=logging.INFO
)
cli.add_argument(
'--train-file',
required=True,
type=str,
help="The training tfrecords file."
)
cli.add_argument(
'--valid-file',
required=True,
type=str,
help="The validation tfrecords file."
)
cli.add_argument(
'--test-file',
required=True,
type=str,
help="The test tfrecords file."
)
cli.add_argument(
'--checkpoint-dir',
required=True,
type=str,
help="The directory where model checkpoints will be saved."
)
cli.add_argument(
'--scaler',
required=False,
type=str,
nargs=2,
help=(
"This argument requires two parameters. The first is the path to "
"a scaler file created with the build dataset script. The second "
"is the name of the scaler to use. Choose one of: "
"minmax, standard, robust, power."
)
)
cli.add_argument(
'--shuffle-buffer',
required=False,
type=int,
default=5000
)
cli.add_argument(
'--run-eagerly',
required=False,
action='store_true'
)
cli.add_argument(
'--n-models',
required=False,
type=int,
default=10
)
cli.add_argument(
'--n-sampled-features',
required=False,
type=int,
default=8
)
cli.add_argument(
'--max-weak-epochs',
required=False,
type=int,
default=500,
help=(
"The maximum number of epochs before the training terminates no matter what for the "
"weak models."
)
)
cli.add_argument(
'--max-strong-epochs',
required=False,
type=int,
default=500,
help=(
"The maximum number of epochs before the training terminates no matter what for the "
"strong models."
)
)
cli.add_argument(
'--optimizer',
required=False,
type=str,
default='adagrad',
choices=['adagrad', 'adam', 'sgd', 'nesterov', 'rmsprop']
)
cli.add_argument(
'--learning-rate',
required=False,
type=float,
default=0.001
)
cli.add_argument(
'--loss',
required=False,
type=str,
choices=['ndcg', 'bidi_ndcg', 'softmax', 'cross_entropy', 'mse'],
default='ndcg'
)
cli.add_argument(
'--list-size',
required=False,
type=int,
default=None,
help="The maximum number of documents per query or no maximum if not set."
)
cli.add_argument(
'--group-size',
required=False,
type=int,
default=16,
help="The group size to use."
)
cli.add_argument(
'--multiples',
required=False,
type=int,
default=1,
help="The sampling multiplier."
)
cli.add_argument(
'--weak-training-batch-size',
required=False,
type=int,
default=128
)
cli.add_argument(
'--weak-evaluation-batch-size',
required=False,
type=int,
default=128
)
cli.add_argument(
'--strong-training-batch-size',
required=False,
type=int,
default=128
)
cli.add_argument(
'--strong-evaluation-batch-size',
required=False,
type=int,
default=128
)
cli.add_argument(
'--use-average',
required=False,
action='store_true',
default=False,
help=(
"According to the paper, when a document is sampled more than once its scores are "
"summed. When this option is set the scores are averaged over the number of times "
"each document is seen instead."
)
)
cli.add_argument(
'--share-weights',
required=False,
action='store_true',
default=False,
help="Apply each document through a shared dense layer before concatenating them."
)
cli.add_argument(
'--n-weak-units',
required=False,
type=int,
nargs='+',
default=[64, 32, 16]
)
cli.add_argument(
'--n-strong-units',
required=False,
type=int,
nargs='+',
default=[64, 32, 16]
)
cli.add_argument(
'--dropout-rate',
required=False,
type=float,
default=0.0
)
cli.add_argument(
'--drop-remainder',
action='store_true',
default=False,
help="This is necessary when using the keras training/eval loops."
)
cli.add_argument(
'--random-seed',
required=False,
type=int,
help="The random seed to use for sampling query results."
)
cli.set_defaults(func=main)
cli_args = cli.parse_args()
cli_args.func(cli_args)