Source code for deletor.ranking.utils

# Original Copyright 2020 The TensorFlow Ranking Authors.
# Modified Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Python Modules
from typing import List, Optional

# 3rd Party Modules
import tensorflow as tf

# Project Modules
import deletor.tfutils as tfutils

from deletor.constants import MIN_FLOAT_32


[docs]def is_label_valid(labels: tf.Tensor): """ Return a boolean vector indicating which labels are valid (i.e., >= 0). :param labels: :return: """ return tf.greater_equal(labels, 0.0)
[docs]def sort_by_scores( scores: tf.Tensor, features_list: List[tf.Tensor], k: Optional[int] = None, shuffle_ties: bool = True, seed: Optional[tf.Tensor] = None ): scores = tf.cast(scores, tf.float32) scores.get_shape().assert_has_rank(2) list_size = tf.shape(scores)[1] k = list_size if k is None else tf.minimum(k, list_size) shuffle_idx = None if shuffle_ties is True: shuffle_idx = tfutils.to_nd_indices( tf.argsort(tf.random.uniform(tf.shape(scores), seed=seed), stable=True) ) scores = tf.gather_nd(scores, shuffle_idx) _, indices = tf.math.top_k(scores, k, sorted=True) nd_indices = tfutils.to_nd_indices(indices) if shuffle_idx is not None: nd_indices = tf.gather_nd(shuffle_idx, nd_indices) return [tf.gather_nd(f, nd_indices) for f in features_list]
[docs]def sorted_ranks( scores: tf.Tensor, shuffle_ties: bool = True, seed: Optional[tf.Tensor] = None ): batch_size, list_size = tf.unstack(tf.shape(scores)) positions = tf.reshape(tf.tile(tf.range(list_size), [batch_size]), tf.shape(scores)) sorted_positions = sort_by_scores(scores, [positions], shuffle_ties=shuffle_ties, seed=seed)[0] ranks = tf.argsort(sorted_positions) return ranks
[docs]def compute_ranks(logits: tf.Tensor, is_valid: tf.Tensor): # Replace any invalid logits with the minimum float value. # This should already be done, but just in case. # TODO using this imported python float might prevent fast execution # as a @tf.function scores = tf.where(is_valid, logits, MIN_FLOAT_32 * tf.ones_like(logits)) return sorted_ranks(scores)
[docs]def approximate_ranks(logits: tf.Tensor, alpha: tf.Tensor): list_size = tf.shape(logits)[1] x = tf.tile(tf.expand_dims(logits, 2), [1, 1, list_size]) y = tf.tile(tf.expand_dims(logits, 1), [1, list_size, 1]) pairs = tf.sigmoid(alpha * (y - x)) return tf.reduce_sum(pairs, axis=-1) + 0.5
[docs]def inverse_max_dcg( labels: tf.Tensor, gain_fn=None, discount_fn=None, k: Optional[tf.Tensor] = None ): def default_gain(x): return tf.pow(2.0, x) - 1.0 def default_discount(x): return tf.math.log(2.0) / tf.math.log1p(x) if gain_fn is None: gain_fn = default_gain if discount_fn is None: discount_fn = default_discount ideal_sorted_labels, = sort_by_scores(labels, [labels], k=k) rank = tf.range(tf.shape(ideal_sorted_labels)[1]) + 1 discounted_gain = gain_fn(ideal_sorted_labels) * discount_fn(tf.cast(rank, tf.float32)) discounted_gain = tf.reduce_sum(discounted_gain, axis=1, keepdims=True) return tf.where( tf.greater(discounted_gain, 0.0), 1.0 / discounted_gain, tf.zeros_like(discounted_gain) )