Source code for examples.build_tfrecords

"""
A script to convert the MLTR data files in libsvm format into tfrecords.
"""
# Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Python Modules
import argparse
import logging
import shelve
import sys

from collections import defaultdict

# 3rd Party Modules
import tensorflow as tf

from sklearn.preprocessing import MinMaxScaler, StandardScaler, RobustScaler, PowerTransformer

# Project Modules
from examples.pipeline import load_dataset


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)-15s [%(name)s]:%(lineno)d %(levelname)s %(message)s'
)
log = logging.getLogger('buildrec')


# region Tensorflow Type Converters
# See: https://www.tensorflow.org/tutorials/load_data/tfrecord
[docs]def bytes_feature(value): """ Returns a bytes_list from a string / byte. """ if isinstance(value, type(tf.constant(0))): value = value.numpy() # BytesList won't unpack a string from an EagerTensor. return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
[docs]def float_feature(value): """Returns a float_list from a float / double.""" return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
[docs]def int64_feature(value): """Returns an int64_list from a bool / enum / int / uint.""" return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
# endregion Tensorflow Type Converters # region Make Dataset
[docs]def svm_generator(fh, current_sequence): # Note: This almost certainly misses the last example previous_qid = current_sequence[0][0] if current_sequence else None next_sequence = [] for line in fh: tokens = line.split() target = int(tokens[0]) qid = int(tokens[1][4:]) raw_features = [t.split(':') for t in tokens[2:]] raw_features = [(t[0], float(t[1])) for t in raw_features] raw_features = {k: v for k, v in raw_features} if qid == previous_qid or previous_qid is None: current_sequence.append((qid, target, raw_features)) previous_qid = qid else: next_sequence.append((qid, target, raw_features)) yield current_sequence, next_sequence
[docs]def make_example(sequence): first_item = sequence[0] # This will be the same for all items in the sequence qid = int64_feature(first_item[0]) context = {'qid': qid} context = tf.train.Features(feature=context) target_list = [] source_lists = defaultdict(list) for qid, target, features in sequence: target_list.append(int64_feature(target)) for k, v in features.items(): source_lists[k].append(float_feature(v)) feature_list = {k: tf.train.FeatureList(feature=v) for k, v in source_lists.items()} feature_list['target'] = tf.train.FeatureList(feature=target_list) feature_lists = tf.train.FeatureLists(feature_list=feature_list) example = tf.train.SequenceExample(context=context, feature_lists=feature_lists) return example
[docs]def make_dataset(args: argparse.Namespace): write_data(args) write_scalers(args)
[docs]def write_data(args: argparse.Namespace): input_file, output_file = args.input_file, args.output_file compression_type, compression_level = args.compression_type, args.compression_level tfropts = tf.io.TFRecordOptions(compression_type, compression_level=compression_level) # I believe tensorflow_rank has functions for reading data in libsvm format # and it is undoubtedly much more robust that this. However, this was # an exercise for me in understanding how `tf.Feature`s and and # `tf.Example`s worked. with open(input_file, 'r') as reader, tf.io.TFRecordWriter(output_file, tfropts) as writer: i = 0 limit = args.limit or sys.maxsize current_sequence = [] while True: try: current_sequence, next_sequence = next(svm_generator(reader, current_sequence)) except StopIteration: break finally: example = make_example(current_sequence) writer.write(example.SerializeToString()) current_sequence = next_sequence if i >= limit: break if i % 1000 == 0: log.info(f"Writing sequence: {i:5d}") i += 1 log.info(f"Finished writing {i:5d} sequences")
[docs]def write_scalers(args: argparse.Namespace): output_file, scaler_file = args.output_file, args.scaler_file if scaler_file: log.info("Writing scalers") scalers = { 'minmax': MinMaxScaler(), 'standard': StandardScaler(), 'robust': RobustScaler(), 'power': PowerTransformer() } dataset = load_dataset(output_file) # Some preprocessing methods don't have a partial fit method, so # we've got to load the whole data into memory. It should only be # a couple gigabytes. examples = [] for src, tgt in dataset: examples.append(src['sequence_dense']) x = tf.concat(examples, axis=0) for scaler in scalers.values(): scaler.fit(x) with shelve.open(scaler_file, 'n') as db: for scaler_name, scaler in scalers.items(): db[scaler_name] = scaler log.info("Done writing scalers")
# endregion Make Dataset
[docs]def make_command_line_options(): cli = argparse.ArgumentParser(fromfile_prefix_chars='@') # region Dataset Options cli.add_argument( '--input-file', required=True, type=str, help="The path to the Microsoft Learning To Rank file is saved." ) cli.add_argument( '--output-file', required=True, type=str, help="The path where the tensorflow records file will be saved." ) cli.add_argument( '--scaler-file', required=False, type=str, help=( "Fit several sklern scalers to the data and save them to this file. " "(minimax: MinMaxScaler, standard: StandardScaler, " "robust: RobustScaler, power: PowerTransformer)" ) ) cli.add_argument( '--limit', required=False, type=int, help=( "This option takes an integer argument that limits the number of " "documents read in the file. Once this many documents has been read " "the script will terminate. This can be useful for creating smaller " "datasets for debugging." ) ) cli.add_argument( '--compression-type', required=False, type=str, choices=('GZIP', 'ZLIB'), help="The compression type to use for storing the tensorflow records." ) cli.add_argument( '--compression-level', required=False, type=int, default=6, help=( "This specifies the level of compression to use when one of the " "compression types is specified " ) ) cli.set_defaults(func=make_dataset) # endregion Dataset Options return cli
if __name__ == '__main__': clo = make_command_line_options() cli_args = clo.parse_args() cli_args.func(cli_args)