# Original Copyright 2020 The TensorFlow Ranking Authors.
# Modified Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Python Modules
from typing import List, Optional
# 3rd Party Modules
import tensorflow as tf
# Project Modules
import deletor.tfutils as tfutils
from deletor.constants import MIN_FLOAT_32
[docs]def is_label_valid(labels: tf.Tensor):
"""
Return a boolean vector indicating which labels are valid (i.e., >= 0).
:param labels:
:return:
"""
return tf.greater_equal(labels, 0.0)
[docs]def sort_by_scores(
scores: tf.Tensor,
features_list: List[tf.Tensor],
k: Optional[int] = None,
shuffle_ties: bool = True,
seed: Optional[tf.Tensor] = None
):
scores = tf.cast(scores, tf.float32)
scores.get_shape().assert_has_rank(2)
list_size = tf.shape(scores)[1]
k = list_size if k is None else tf.minimum(k, list_size)
shuffle_idx = None
if shuffle_ties is True:
shuffle_idx = tfutils.to_nd_indices(
tf.argsort(tf.random.uniform(tf.shape(scores), seed=seed), stable=True)
)
scores = tf.gather_nd(scores, shuffle_idx)
_, indices = tf.math.top_k(scores, k, sorted=True)
nd_indices = tfutils.to_nd_indices(indices)
if shuffle_idx is not None:
nd_indices = tf.gather_nd(shuffle_idx, nd_indices)
return [tf.gather_nd(f, nd_indices) for f in features_list]
[docs]def sorted_ranks(
scores: tf.Tensor,
shuffle_ties: bool = True,
seed: Optional[tf.Tensor] = None
):
batch_size, list_size = tf.unstack(tf.shape(scores))
positions = tf.reshape(tf.tile(tf.range(list_size), [batch_size]), tf.shape(scores))
sorted_positions = sort_by_scores(scores, [positions], shuffle_ties=shuffle_ties, seed=seed)[0]
ranks = tf.argsort(sorted_positions)
return ranks
[docs]def compute_ranks(logits: tf.Tensor, is_valid: tf.Tensor):
# Replace any invalid logits with the minimum float value.
# This should already be done, but just in case.
# TODO using this imported python float might prevent fast execution
# as a @tf.function
scores = tf.where(is_valid, logits, MIN_FLOAT_32 * tf.ones_like(logits))
return sorted_ranks(scores)
[docs]def approximate_ranks(logits: tf.Tensor, alpha: tf.Tensor):
list_size = tf.shape(logits)[1]
x = tf.tile(tf.expand_dims(logits, 2), [1, 1, list_size])
y = tf.tile(tf.expand_dims(logits, 1), [1, list_size, 1])
pairs = tf.sigmoid(alpha * (y - x))
return tf.reduce_sum(pairs, axis=-1) + 0.5
[docs]def inverse_max_dcg(
labels: tf.Tensor,
gain_fn=None,
discount_fn=None,
k: Optional[tf.Tensor] = None
):
def default_gain(x):
return tf.pow(2.0, x) - 1.0
def default_discount(x):
return tf.math.log(2.0) / tf.math.log1p(x)
if gain_fn is None:
gain_fn = default_gain
if discount_fn is None:
discount_fn = default_discount
ideal_sorted_labels, = sort_by_scores(labels, [labels], k=k)
rank = tf.range(tf.shape(ideal_sorted_labels)[1]) + 1
discounted_gain = gain_fn(ideal_sorted_labels) * discount_fn(tf.cast(rank, tf.float32))
discounted_gain = tf.reduce_sum(discounted_gain, axis=1, keepdims=True)
return tf.where(
tf.greater(discounted_gain, 0.0),
1.0 / discounted_gain,
tf.zeros_like(discounted_gain)
)