Source code for examples.gsf.mltr30k_boost

#
#
#  Copyright 2020 Reid Swanson
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

# Python Modules
import argparse
import logging
import os

from functools import partial, reduce

# 3rd Party Modules
from typing import Optional

import numpy as np
import tensorflow as tf

# Project Modules
import examples.pipeline as pipeline
import examples.utils as utils
import deletor.tfutils as tfutils

from deletor.metrics import NormalizedDiscountedCumulativeGain
from deletor.models.boost import ModelParameter, WeakGroupwiseScoringNetwork
from deletor.models.gsf import GroupwiseScoringNetwork

from deletor.random.sample import IndependentMultiOutputSampler

tfutils.grow_memory()
np.set_printoptions(precision=6, suppress=True, edgeitems=100, linewidth=10000)

log = logging.getLogger('boost/mltr30k')


[docs]def prepare_data(args: argparse.Namespace): list_size = args.list_size scaler = args.scaler train_file, valid_file, test_file = args.train_file, args.valid_file, args.test_file datasets = [pipeline.load_dataset(f, scaler) for f in [train_file, valid_file, test_file]] datasets = [d.filter(pipeline.is_valid_query) for d in datasets] if list_size: truncate_documents = partial(pipeline.truncate_document_list, list_size=list_size) datasets = [d.map(lambda x, y: truncate_documents(x, y)) for d in datasets] else: # If the list size is not set we need to set it to the maximum number # of documents in the training data. list_size = datasets[0].reduce( 0, lambda x, y: tf.maximum(x, tf.shape(y[0]['sequence_dense']))[0] ) train_data = datasets[0].shard(2, 0), datasets[0].shard(2, 1) valid_data = datasets[1].shard(2, 0), datasets[1].shard(2, 1) train_data = list(map(lambda d: d.cache(), train_data)) valid_data = list(map(lambda d: d.cache(), valid_data)) test_data = datasets[2].cache() n_docs = [d.reduce(0, lambda x, _: x + 1) for d in train_data] # Do not shuffle instances so that it is easier to update instance weights return list_size, n_docs, train_data, valid_data, test_data
[docs]def prepare_weak_data(train_data, valid_data, features, weights, args): list_size = args.list_size n_sampled_features = tf.shape(features)[0] group_size = args.group_size multiples = args.multiples train_bsz = args.weak_training_batch_size eval_bsz = args.weak_evaluation_batch_size drop_remainder = args.drop_remainder with_weights = weights is not None tshapes = pipeline.make_padded_shapes(list_size, n_sampled_features, with_weights=with_weights) tvalues = pipeline.make_padding_values(with_weights=with_weights) vshapes = pipeline.make_padded_shapes(list_size, n_sampled_features) vvalues = pipeline.make_padding_values() select_features = pipeline.select_features valid_data = [d.map(lambda a, b: select_features(a, b, features)) for d in valid_data] if with_weights is True: weights_data = tf.data.Dataset.from_tensor_slices(weights.numpy()) # Add the weights to the dataset. This has the effect of returning a tuple # where the first element is a tuple of (x, y) and the second element is # the weights. Note: we don't need weights for the second training # dataset. train_data[0] = tf.data.Dataset.zip((train_data[0], weights_data)) # This will unpack the weights to return a tuple of three elements: # (x, y, w) train_data[0] = train_data[0].map(lambda a, b: (a[0], a[1], b)) sampler = IndependentMultiOutputSampler(group_size, multiple=multiples) train_data[0] = train_data[0].padded_batch(train_bsz, tshapes, tvalues, drop_remainder) train_data[1] = train_data[1].padded_batch(train_bsz, vshapes, vvalues, drop_remainder) valid_data = [d.padded_batch(eval_bsz, vshapes, vvalues, drop_remainder) for d in valid_data] train_data = [d.cache() for d in train_data] valid_data = [d.cache() for d in valid_data] train_data = [d.map(sampler) for d in train_data] valid_data = [d.map(sampler) for d in valid_data] train_data = [d.prefetch(tf.data.experimental.AUTOTUNE) for d in train_data] valid_data = [d.prefetch(tf.data.experimental.AUTOTUNE) for d in valid_data] return train_data, valid_data
[docs]def prepare_test_data(data: tf.data.Dataset, features, args): n_sampled_features = tf.shape(features)[0] group_size = args.group_size multiples = args.multiples drop_remainder = args.drop_remainder pad_shapes = pipeline.make_padded_shapes(args.list_size, n_sampled_features) pad_values = pipeline.make_padding_values() select_features = pipeline.select_features data = data.map(lambda a, b: select_features(a, b, features)) data = data.padded_batch(1, pad_shapes, pad_values, drop_remainder) data = data.cache() sampler = IndependentMultiOutputSampler(group_size, multiple=multiples) data = data.map(sampler) data = data.prefetch(tf.data.experimental.AUTOTUNE) return data
# def prepare_strong_data(orig_data, pred_data, args, is_training: bool):
[docs]def prepare_strong_data(new_data, args, is_training: bool): """ Create a new dataset that merges the original data with the predictions from the weak models. :param orig_data: A dataset created using the standard pipeline. :param pred_data: A list of prediction datasets. Each prediction dataset should contain data points with a single 1D tensor with predicted scores for each document. Note: the number of documents in each data point may be different. :param args: :param is_training: :return: """ # First create a single dataset with all of the predictions. # pred_data = tf.data.Dataset.zip(tuple(pred_data)) # # # Create a new dataset that has a single tensor (as opposed to a list/tuple) # # of all the values. Stacking the weights for each individual model will # # result in a tensor of shape [n_models, n_documents], but we want it the # # other way around, so we apply a transpose opperation. # pred_data = pred_data.map(lambda *x1: tf.transpose(tf.stack(x1))) # # # Merge the original dataset with the prediction data using zip. # new_data = tf.data.Dataset.zip((orig_data, pred_data)) # # # A simple function for concatenating the predictions onto the dense # # sequential features. # def merge_weak_predictions(a, z): # x = a[0] # y = a[1] # x['sequence_dense'] = tf.concat([x['sequence_dense'], z], axis=-1) # # return x, y # # # Concatenate the predictions onto the dense features using the function # # above. # new_data = new_data.map(merge_weak_predictions) # Now do the other normal data processing stuff (i.e., padding and sampling) pad_shapes = pipeline.make_padded_shapes(n_features=pipeline.N_FEATURES + args.n_models) pad_values = pipeline.make_padding_values() sampler = IndependentMultiOutputSampler(args.group_size, multiple=args.multiples) bsz = args.strong_training_batch_size if is_training else args.strong_evaluation_batch_size new_data = new_data.padded_batch(bsz, pad_shapes, pad_values, args.drop_remainder) new_data = new_data.cache() new_data = new_data.map(sampler) return new_data.prefetch(tf.data.experimental.AUTOTUNE)
[docs]def setup_model(args: argparse.Namespace, n_model_features: int = 0): n_units = args.n_weak_units if n_model_features == 0 else args.n_strong_units model_params = { ModelParameter.N_FEATURES: pipeline.N_FEATURES + n_model_features, ModelParameter.N_SAMPLED_FEATURES: args.n_sampled_features, ModelParameter.N_UNITS: n_units, ModelParameter.GROUP_SIZE: args.group_size, ModelParameter.USE_AVERAGE: args.use_average, ModelParameter.SHARE_WEIGHTS: args.share_weights, ModelParameter.DROPOUT_RATE: args.dropout_rate, } if n_model_features == 0: model = WeakGroupwiseScoringNetwork(model_params) else: model = GroupwiseScoringNetwork(model_params) optimizer = utils.make_optimizer(args) loss = utils.make_loss(args, reduce=False) metrics = [ NormalizedDiscountedCumulativeGain(k=1), NormalizedDiscountedCumulativeGain(k=5), NormalizedDiscountedCumulativeGain(k=10), ] model.compile(optimizer=optimizer, loss=loss, metrics=metrics) return model
[docs]def make_callbacks(args, model_num: Optional[int] = None): model_name = 'strong' if model_num is None else 'weak' model_num = model_num or 0 monitor = 'val_ndcg_at_005' checkpoint_path = os.path.join(args.checkpoint_dir, f'{model_name}_model_{model_num:02d}.mdl') csv_path = f'/tmp/gsf_{model_name}_{model_num:02d}.csv' callbacks = [ tf.keras.callbacks.EarlyStopping( monitor=monitor, min_delta=1e-4, patience=10, mode='max', restore_best_weights=True ), tf.keras.callbacks.ModelCheckpoint( checkpoint_path, monitor=monitor, save_best_only=True, mode='max', ), tf.keras.callbacks.CSVLogger(csv_path) ] return callbacks
# noinspection PyTypeChecker
[docs]def update_weights(weak_model, data: tf.data.Dataset, weights: tf.Tensor, ndcg_interval: float = 1): metric = NormalizedDiscountedCumulativeGain(k=5) ndcg = [] for x, (yd, ys), w in data: y_pred = weak_model(x, training=False) ndcg.append(metric(yd, y_pred)) ndcg = tf.concat(ndcg, axis=0) def rescale(min_value=-ndcg_interval, max_value=ndcg_interval): min_ndcg = tf.math.reduce_min(ndcg) max_ndcg = tf.math.reduce_max(ndcg) return ( (max_value - min_value) * (ndcg - min_ndcg) / (max_ndcg - min_ndcg) + min_value ) errors = 1.0 - ndcg epsilon = tf.reduce_sum(weights * errors) / tf.reduce_sum(weights) alpha = 0.5 * tf.math.log((1.0 - epsilon) / epsilon) new_weights = weights * tf.math.exp(-2.0 * alpha * rescale(ndcg)) return new_weights / tf.reduce_sum(new_weights)
[docs]def main(args: argparse.Namespace): """ Split the training dataset `T` and validation dataset `V` into 2 parts: 1. Training/validation for the "weak" learners. 2. Training/validating an ensemble from the weak learners. Assign uniform weights to every query/instance in T_{1}. For computational efficiency assign the same hyper-parameters to all the weak learners. for i...N (until stopping criteria is met): * Construct a weak neural NN_{i} that uses a random (but reproducible) subset `F` of available features. * Train NN_{i} on T_{1} until performance on V_{1} plateaus. * Calculate the error on T_{1}. What this means is TBD but should be based on the NDCG. * Use the error to update the weights for each instance Construct an ensemble model. To train this model, each weak learner will make a prediction for each data point (i.e., each document in a query) and these predictions will be used as additional sequential input features to the ensemble model (in addition to the full set of features for the data). :param args: :return: """ logging.basicConfig( level=args.log_level, format='%(asctime)-15s [%(name)s]:%(lineno)d %(levelname)s %(message)s' ) list_size, n_documents, train_data, valid_data, test_data = prepare_data(args) # This has to be after prepare_data tf.config.experimental_run_functions_eagerly(args.run_eagerly) # train the weak learners # NOTE: The weights array could potentially have the wrong # dimension / number of values if `drop_remainder` is true. n_docs_0, n_docs_1 = n_documents weights = tf.ones([n_docs_0], tf.float32) / tf.cast(n_docs_0, tf.float32) log.debug(f"n_docs_0: {n_docs_0.numpy()} n_docs_1: {n_docs_1.numpy()}") weak_models = [] tf.get_logger().setLevel(logging.ERROR) for t in range(args.n_models): log.info(f"Start training weak model {t:02d}") log.debug(f"Using weights: {weights.numpy()}") weak_models += [setup_model(args)] model = weak_models[-1] features = model.feature_indices tdata, vdata = prepare_weak_data(train_data, valid_data, features, weights, args) # Note the custom training loop doesn't work for this model, because # there's a problem with the way weights are handled. callbacks = make_callbacks(args, t) model.fit( tdata[0], validation_data=vdata[0], epochs=args.max_weak_epochs, callbacks=callbacks, verbose=0 ) weights = update_weights(model, tdata[0], weights) def apply_model(a, b, m): a[f'weak_model'] = m(a, training=False) return a, b log.info("Using the weak models to make predictions on the second half of the split data.") (_, train_data_1), (_, valid_data_1) = train_data, valid_data train_pred_datasets = [] valid_pred_datasets = [] test_pred_datasets = [] for i, weak_model in enumerate(weak_models): log.debug(f"Making predictions for model: {i}") features = weak_model.feature_indices new_args = argparse.Namespace(**vars(args)) new_args.weak_training_batch_size = 1 new_args.weak_evaluation_batch_size = 1 tdata, vdata = prepare_weak_data(train_data, valid_data, features, None, new_args) edata = prepare_test_data(test_data, features, args) tdata[1] = tdata[1].map(lambda x1, y1: apply_model(x1, y1, weak_model)) vdata[1] = vdata[1].map(lambda x1, y1: apply_model(x1, y1, weak_model)) edata = edata.map(lambda x1, y1: apply_model(x1, y1, weak_model)) # The following code appears to make a dataset such that each row is a # query (presumably in the original order) with the predictions for # each document. def train_gen(): for x1, y1 in tdata[1]: yield x1['weak_model'][0] def valid_gen(): for x1, y1 in vdata[1]: yield x1['weak_model'][0] def test_gen(): for x1, y1 in edata: yield x1['weak_model'][0] datasets = tf.data.Dataset.from_generator(train_gen, output_types=tf.float32) train_pred_datasets.append(datasets) datasets = tf.data.Dataset.from_generator(valid_gen, output_types=tf.float32) valid_pred_datasets.append(datasets) datasets = tf.data.Dataset.from_generator(test_gen, output_types=tf.float32) test_pred_datasets.append(datasets) def merge_datasets(orig_data, pred_data): pred_data = tf.data.Dataset.zip(tuple(pred_data)) # Create a new dataset that has a single tensor (as opposed to a list/tuple) # of all the values. Stacking the weights for each individual model will # result in a tensor of shape [n_models, n_documents], but we want it the # other way around, so we apply a transpose opperation. pred_data = pred_data.map(lambda *x1: tf.transpose(tf.stack(x1))) # Merge the original dataset with the prediction data using zip. new_data = tf.data.Dataset.zip((orig_data, pred_data)) # A simple function for concatenating the predictions onto the dense # sequential features. def merge_weak_predictions(a, z): x = a[0] y = a[1] x['sequence_dense'] = tf.concat([x['sequence_dense'], z], axis=-1) return x, y # Concatenate the predictions onto the dense features using the function # above. new_data = new_data.map(merge_weak_predictions) return new_data ens_train_data = merge_datasets(train_data_1, train_pred_datasets) ens_valid_data = merge_datasets(valid_data_1, valid_pred_datasets) ens_test_data = merge_datasets(test_data, test_pred_datasets) ens_train_data = prepare_strong_data(ens_train_data, args, True) ens_valid_data = prepare_strong_data(ens_valid_data, args, False) ens_test_data = prepare_strong_data(ens_test_data, args, False) strong_model = setup_model(args, n_model_features=args.n_models) callbacks = make_callbacks(args) log.info("Start training the ensemble model.") strong_model.fit( ens_train_data, validation_data=ens_valid_data, epochs=args.max_strong_epochs, callbacks=callbacks, verbose=1 ) strong_model.evaluate(ens_test_data)
# noinspection DuplicatedCode if __name__ == '__main__': # The current implementation does not work well on GPUs cli = argparse.ArgumentParser(fromfile_prefix_chars='@') cli.add_argument( '--log-level', required=False, type=utils.log_level_type, choices=[logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR], default=logging.INFO ) cli.add_argument( '--train-file', required=True, type=str, help="The training tfrecords file." ) cli.add_argument( '--valid-file', required=True, type=str, help="The validation tfrecords file." ) cli.add_argument( '--test-file', required=True, type=str, help="The test tfrecords file." ) cli.add_argument( '--checkpoint-dir', required=True, type=str, help="The directory where model checkpoints will be saved." ) cli.add_argument( '--scaler', required=False, type=str, nargs=2, help=( "This argument requires two parameters. The first is the path to " "a scaler file created with the build dataset script. The second " "is the name of the scaler to use. Choose one of: " "minmax, standard, robust, power." ) ) cli.add_argument( '--shuffle-buffer', required=False, type=int, default=5000 ) cli.add_argument( '--run-eagerly', required=False, action='store_true' ) cli.add_argument( '--n-models', required=False, type=int, default=10 ) cli.add_argument( '--n-sampled-features', required=False, type=int, default=8 ) cli.add_argument( '--max-weak-epochs', required=False, type=int, default=500, help=( "The maximum number of epochs before the training terminates no matter what for the " "weak models." ) ) cli.add_argument( '--max-strong-epochs', required=False, type=int, default=500, help=( "The maximum number of epochs before the training terminates no matter what for the " "strong models." ) ) cli.add_argument( '--optimizer', required=False, type=str, default='adagrad', choices=['adagrad', 'adam', 'sgd', 'nesterov', 'rmsprop'] ) cli.add_argument( '--learning-rate', required=False, type=float, default=0.001 ) cli.add_argument( '--loss', required=False, type=str, choices=['ndcg', 'bidi_ndcg', 'softmax', 'cross_entropy', 'mse'], default='ndcg' ) cli.add_argument( '--list-size', required=False, type=int, default=None, help="The maximum number of documents per query or no maximum if not set." ) cli.add_argument( '--group-size', required=False, type=int, default=16, help="The group size to use." ) cli.add_argument( '--multiples', required=False, type=int, default=1, help="The sampling multiplier." ) cli.add_argument( '--weak-training-batch-size', required=False, type=int, default=128 ) cli.add_argument( '--weak-evaluation-batch-size', required=False, type=int, default=128 ) cli.add_argument( '--strong-training-batch-size', required=False, type=int, default=128 ) cli.add_argument( '--strong-evaluation-batch-size', required=False, type=int, default=128 ) cli.add_argument( '--use-average', required=False, action='store_true', default=False, help=( "According to the paper, when a document is sampled more than once its scores are " "summed. When this option is set the scores are averaged over the number of times " "each document is seen instead." ) ) cli.add_argument( '--share-weights', required=False, action='store_true', default=False, help="Apply each document through a shared dense layer before concatenating them." ) cli.add_argument( '--n-weak-units', required=False, type=int, nargs='+', default=[64, 32, 16] ) cli.add_argument( '--n-strong-units', required=False, type=int, nargs='+', default=[64, 32, 16] ) cli.add_argument( '--dropout-rate', required=False, type=float, default=0.0 ) cli.add_argument( '--drop-remainder', action='store_true', default=False, help="This is necessary when using the keras training/eval loops." ) cli.add_argument( '--random-seed', required=False, type=int, help="The random seed to use for sampling query results." ) cli.set_defaults(func=main) cli_args = cli.parse_args() cli_args.func(cli_args)