"""
A script to convert the MLTR data files in libsvm format into tfrecords.
"""
# Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Python Modules
import argparse
import logging
import shelve
import sys
from collections import defaultdict
# 3rd Party Modules
import tensorflow as tf
from sklearn.preprocessing import MinMaxScaler, StandardScaler, RobustScaler, PowerTransformer
# Project Modules
from examples.pipeline import load_dataset
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)-15s [%(name)s]:%(lineno)d %(levelname)s %(message)s'
)
log = logging.getLogger('buildrec')
# region Tensorflow Type Converters
# See: https://www.tensorflow.org/tutorials/load_data/tfrecord
[docs]def bytes_feature(value):
"""
Returns a bytes_list from a string / byte.
"""
if isinstance(value, type(tf.constant(0))):
value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
[docs]def float_feature(value):
"""Returns a float_list from a float / double."""
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
[docs]def int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
# endregion Tensorflow Type Converters
# region Make Dataset
[docs]def svm_generator(fh, current_sequence):
# Note: This almost certainly misses the last example
previous_qid = current_sequence[0][0] if current_sequence else None
next_sequence = []
for line in fh:
tokens = line.split()
target = int(tokens[0])
qid = int(tokens[1][4:])
raw_features = [t.split(':') for t in tokens[2:]]
raw_features = [(t[0], float(t[1])) for t in raw_features]
raw_features = {k: v for k, v in raw_features}
if qid == previous_qid or previous_qid is None:
current_sequence.append((qid, target, raw_features))
previous_qid = qid
else:
next_sequence.append((qid, target, raw_features))
yield current_sequence, next_sequence
[docs]def make_example(sequence):
first_item = sequence[0]
# This will be the same for all items in the sequence
qid = int64_feature(first_item[0])
context = {'qid': qid}
context = tf.train.Features(feature=context)
target_list = []
source_lists = defaultdict(list)
for qid, target, features in sequence:
target_list.append(int64_feature(target))
for k, v in features.items():
source_lists[k].append(float_feature(v))
feature_list = {k: tf.train.FeatureList(feature=v) for k, v in source_lists.items()}
feature_list['target'] = tf.train.FeatureList(feature=target_list)
feature_lists = tf.train.FeatureLists(feature_list=feature_list)
example = tf.train.SequenceExample(context=context, feature_lists=feature_lists)
return example
[docs]def make_dataset(args: argparse.Namespace):
write_data(args)
write_scalers(args)
[docs]def write_data(args: argparse.Namespace):
input_file, output_file = args.input_file, args.output_file
compression_type, compression_level = args.compression_type, args.compression_level
tfropts = tf.io.TFRecordOptions(compression_type, compression_level=compression_level)
# I believe tensorflow_rank has functions for reading data in libsvm format
# and it is undoubtedly much more robust that this. However, this was
# an exercise for me in understanding how `tf.Feature`s and and
# `tf.Example`s worked.
with open(input_file, 'r') as reader, tf.io.TFRecordWriter(output_file, tfropts) as writer:
i = 0
limit = args.limit or sys.maxsize
current_sequence = []
while True:
try:
current_sequence, next_sequence = next(svm_generator(reader, current_sequence))
except StopIteration:
break
finally:
example = make_example(current_sequence)
writer.write(example.SerializeToString())
current_sequence = next_sequence
if i >= limit:
break
if i % 1000 == 0:
log.info(f"Writing sequence: {i:5d}")
i += 1
log.info(f"Finished writing {i:5d} sequences")
[docs]def write_scalers(args: argparse.Namespace):
output_file, scaler_file = args.output_file, args.scaler_file
if scaler_file:
log.info("Writing scalers")
scalers = {
'minmax': MinMaxScaler(),
'standard': StandardScaler(),
'robust': RobustScaler(),
'power': PowerTransformer()
}
dataset = load_dataset(output_file)
# Some preprocessing methods don't have a partial fit method, so
# we've got to load the whole data into memory. It should only be
# a couple gigabytes.
examples = []
for src, tgt in dataset:
examples.append(src['sequence_dense'])
x = tf.concat(examples, axis=0)
for scaler in scalers.values():
scaler.fit(x)
with shelve.open(scaler_file, 'n') as db:
for scaler_name, scaler in scalers.items():
db[scaler_name] = scaler
log.info("Done writing scalers")
# endregion Make Dataset
[docs]def make_command_line_options():
cli = argparse.ArgumentParser(fromfile_prefix_chars='@')
# region Dataset Options
cli.add_argument(
'--input-file',
required=True,
type=str,
help="The path to the Microsoft Learning To Rank file is saved."
)
cli.add_argument(
'--output-file',
required=True,
type=str,
help="The path where the tensorflow records file will be saved."
)
cli.add_argument(
'--scaler-file',
required=False,
type=str,
help=(
"Fit several sklern scalers to the data and save them to this file. "
"(minimax: MinMaxScaler, standard: StandardScaler, "
"robust: RobustScaler, power: PowerTransformer)"
)
)
cli.add_argument(
'--limit',
required=False,
type=int,
help=(
"This option takes an integer argument that limits the number of "
"documents read in the file. Once this many documents has been read "
"the script will terminate. This can be useful for creating smaller "
"datasets for debugging."
)
)
cli.add_argument(
'--compression-type',
required=False,
type=str,
choices=('GZIP', 'ZLIB'),
help="The compression type to use for storing the tensorflow records."
)
cli.add_argument(
'--compression-level',
required=False,
type=int,
default=6,
help=(
"This specifies the level of compression to use when one of the "
"compression types is specified "
)
)
cli.set_defaults(func=make_dataset)
# endregion Dataset Options
return cli
if __name__ == '__main__':
clo = make_command_line_options()
cli_args = clo.parse_args()
cli_args.func(cli_args)