Source code for deletor.losses

# Copyright 2020 The TensorFlow Ranking Authors.
# Porting and additional code Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Python Modules
import abc
import logging

from typing import Iterable, Optional, Union, List, Callable, Dict

# 3rd Party Modules
import numpy as np
import tensorflow as tf

# Project Modules
import deletor.ranking.utils as rutils

from deletor.constants import EPSILON, MIN_FLOAT_32

log = logging.getLogger(__name__)


[docs]def check_tensor_shapes(tensors: List[tf.Tensor]): """Checks the tensor shapes to be compatible.""" # TODO I'm not sure if a python list is a good idea with AutoGraph. # I may want to use a TensorArray instead? for tensor in tensors: tensor.get_shape().assert_has_rank(2) tensor.get_shape().assert_is_compatible_with(tensors[0].get_shape())
[docs]def apply_pairwise_op(op, tensor: tf.Tensor): """ :param op: :param tensor: :return: """ check_tensor_shapes([tensor]) return op(tf.expand_dims(tensor, 2), tf.expand_dims(tensor, 1))
[docs]def get_valid_pairs_and_clean_labels(labels: tf.Tensor): """Returns a boolean Tensor for valid pairs and cleaned labels.""" # Is this assert necessary? The shape should be validated by check_tensor_shapes already labels.get_shape().assert_has_rank(2) is_valid = rutils.is_label_valid(labels) valid_pairs = apply_pairwise_op(tf.logical_and, is_valid) labels = tf.where(is_valid, labels, tf.zeros_like(labels)) return valid_pairs, labels
[docs]def masked_softmax(x: tf.Tensor, mask: tf.Tensor): """ :param x: :param mask: :return: """ max_value = tf.reshape(tf.reduce_max(x, axis=-1), (-1, 1)) x = x - max_value num = mask * tf.math.exp(x) den = tf.reshape(tf.reduce_sum(num, axis=-1), (-1, 1)) return tf.math.divide_no_nan(num, den)
[docs]class LambdaWeight(abc.ABC): """ Interface for ranking metric optimization. This class wraps weights used in the LambdaLoss framework for ranking metric optimization (https://ai.google/research/pubs/pub47258). Such an interface is to be instantiated by concrete lambda weight models. The instance is used together with standard loss such as logistic loss and softmax loss. """
[docs] @abc.abstractmethod def pair_weights(self, labels, ranks): """ Returns the weight adjustment `Tensor` for example pairs. :param labels: A dense `Tensor` of labels with shape [batch_size, list_size]. :param ranks: A dense `Tensor` of ranks with the same shape as `labels` that are sorted by logits. :return: A `Tensor` that can weight example pairs. """ raise NotImplementedError('Calling an abstract method.')
# noinspection PyMethodMayBeStatic
[docs] def individual_weights(self, labels, ranks): """ Returns the weight `Tensor` for individual examples. :param labels: A dense `Tensor` of labels with shape [batch_size, list_size]. :param ranks: A dense `Tensor` of ranks with the same shape as `labels` that are sorted by logits. :return: A `Tensor` that can weight individual examples. """ del ranks return labels
[docs]class DCGLambdaWeight(LambdaWeight): """LambdaWeight for Discounted Cumulative Gain metric.""" def __init__( self, k: int = None, gain_fn: Callable = lambda label: label, discount_fn: Callable = lambda rank: 1.0 / rank, normalized: bool = False, smooth_fraction: float = 0.0 ): """ Constructor. Ranks are 1-based, not 0-based. Given rank i and j, there are two types of pair weights: | u = \|rank_discount_fn(\|i-j\|) - rank_discount_fn(\|i-j\| + 1)\| | v = \|rank_discount_fn(i) - rank_discount_fn(j)\| | where u is the newly introduced one in LambdaLoss paper | (https://ai.google/research/pubs/pub47258) and v is the original one in the | LambdaMART paper "From RankNet to LambdaRank to LambdaMART: An Overview". | The final pair weight contribution of ranks is | (1-smooth_fraction) * u + smooth_fraction * v. :param k: The top k for the DCG metric. :param gain_fn: Transforms labels. :param discount_fn: The rank discount function. :param normalized: If True, normalize weight by the max DCG. :param smooth_fraction: parameter to control the contribution from LambdaMART. """ self.k = k self.gain_fn = gain_fn self.discount_fn = discount_fn self.normalized = normalized self.smooth_fraction = smooth_fraction if smooth_fraction < 0 or smooth_fraction > 1: raise ValueError(f"smooth_fraction should be in range [0, 1]. ({smooth_fraction})")
[docs] def pair_weights(self, labels, ranks): check_tensor_shapes([labels, ranks]) valid_pairs, labels = get_valid_pairs_and_clean_labels(labels) gain = self.gain_fn(labels) if self.normalized is True: gain *= rutils.inverse_max_dcg(labels, self.gain_fn, self.discount_fn, self.k) pair_gain = apply_pairwise_op(tf.subtract, gain) pair_gain *= tf.cast(valid_pairs, tf.float32) list_size = tf.shape(labels)[1] k = list_size if self.k is None else self.k u = self.discount_for_relative_rank_diff(ranks, k) v = self.discount_for_absolute_rank(ranks, k) pair_discount = (1.0 - self.smooth_fraction) * u + self.smooth_fraction * v pair_weight = tf.abs(pair_gain) * pair_discount if self.k is None: return pair_weight pair_mask = apply_pairwise_op(tf.logical_or, tf.less_equal(ranks, k)) return pair_weight * tf.cast(pair_mask, tf.float32)
[docs] def individual_weights(self, labels, ranks): check_tensor_shapes([labels, ranks]) labels = tf.where(rutils.is_label_valid(labels), labels, tf.zeros_like(labels)) gain = self.gain_fn(labels) if self.normalized: gain *= rutils.inverse_max_dcg(labels, self.gain_fn, self.discount_fn, self.k) rank_discount = self.discount_fn(tf.cast(ranks, tf.float32)) return gain * rank_discount
# region Utility Methods
[docs] def discount_for_relative_rank_diff(self, ranks, k): """Rank-based discount in the LambdaLoss paper.""" # The LambdaLoss is not well defined when topn is active and topn < # list_size. We cap the rank of examples to topn + 1 so that the rank # difference is capped to topn. This is just a convenient upper bound # when topn is active. We need to revisit this. capped_rank = tf.where( tf.greater(ranks, k), tf.ones_like(ranks) * (k + 1), ranks ) rank_diff = tf.cast(tf.abs(apply_pairwise_op(tf.subtract, capped_rank)), tf.float32) pair_discount = tf.where( tf.greater(rank_diff, 0), tf.abs(self.discount_fn(rank_diff) - self.discount_fn(rank_diff + 1)), tf.zeros_like(rank_diff) ) return pair_discount
[docs] def discount_for_absolute_rank(self, ranks, k): """Standard discount in the LambdaMART paper.""" # When the rank discount is (1 / rank) for example, the discount is # |1 / r_i - 1 / r_j|. When i or j > k, the discount becomes 0. rank_discount = tf.where( tf.greater(ranks, k), tf.zeros_like(tf.cast(ranks, tf.float32)), self.discount_fn(tf.cast(ranks, tf.float32)) ) pair_discount = tf.abs(apply_pairwise_op(tf.subtract, rank_discount)) return pair_discount
# endregion Utility Methods
[docs]class NetworkLoss(tf.keras.losses.Loss, abc.ABC): """ Base class for loss functions. """ def __call__(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs): """ :param y_true: :param y_pred: :param kwargs: :return: """ y_true, y_pred = self.validate_inputs(y_true, y_pred) return self.compute(y_true, y_pred)
[docs] @abc.abstractmethod @tf.function(experimental_relax_shapes=True) def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs): """ :param y_true: :param y_pred: :param kwargs: :return: """ pass
# noinspection DuplicatedCode
[docs] @staticmethod @tf.function(experimental_relax_shapes=True) def validate_inputs(y_true: tf.Tensor, y_pred: tf.Tensor): """ Make sure the inputs have rank 2 and are the same shape. :param y_true: :param y_pred: :return: """ # TODO validating the inputs in graph mode is surprisingly difficult # true_shape = tf.TensorShape(tf.shape(y_true)) # pred_shape = tf.TensorShape(tf.shape(y_pred)) # # true_shape.is_compatible_with(pred_shape) # if tf.rank(y_true) > 2: # log.error("The input tensors must have exactly 2 dimensions.") # raise ValueError( # f"The input tensors must have exactly 2 dimensions. " # f"It has: {tf.rank(y_true)}" # ) # # # if tf.math.not_equal(true_shape, pred_shape): # if tf.TensorShape(true_shape) != tf.TensorShape(pred_shape): # log.error( # f"y_true and y_pred must have the same shape: " # f"{true_shape} != {pred_shape}" # ) # raise ValueError("y_true and y_pred must have the same shape") if tf.rank(y_pred) == 1: y_true = tf.reshape(y_true, (1, -1)) y_pred = tf.reshape(y_pred, (1, -1)) return tf.cast(y_true, tf.float32), tf.cast(y_pred, tf.float32)
# noinspection PyMethodMayBeStatic
[docs] def normalize_weights(self, labels, weights): """ :param labels: :param weights: :return: """ del labels return tf.constant(1.0, tf.float32) if weights is None else weights
[docs]class ListwiseLoss(NetworkLoss, abc.ABC): def __init__( self, name: str, lambda_weight: LambdaWeight = None, params: Dict = None ): """ :param name: :param lambda_weight: :param params: """ super().__init__() self._name = name self._lambda_weight = lambda_weight self._params = params or dict() @property def name(self): return self._name
[docs] def normalize_weights(self, labels, weights): if weights is None: return 1.0 return tf.math.divide_no_nan( tf.reduce_sum(weights * labels, axis=1, keepdims=True), tf.reduce_sum(labels, axis=1, keepdims=True) )
[docs]class MultiLoss(NetworkLoss): def __init__( self, base_losses: Iterable[NetworkLoss], weights: Optional[Union[float, List[float], np.ndarray]] = None, reduce: bool = True ): """ :param base_losses: :param weights: :param reduce: """ self.reduce = reduce self.base_losses = [loss for loss in base_losses] if weights is None: self.weights = tf.constant([1.0]) else: self.weights = tf.constant(weights) if tf.rank(self.weights) > 1: raise ValueError( "The weights must be a scalar or a 1d array with the same " "number of values as the base losses." ) if tf.size(self.weights) != 1 and tf.size(self.weights) != len(self.base_losses): raise ValueError( f"There must be the same number of weights as losses: " f"{self.weights.size} weights and {len(self.base_losses)} losses" ) self.weights = tf.reshape(self.weights, (-1, 1))
[docs] def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs): losses = tf.stack([loss(y_true, y_pred) for loss in self.base_losses]) loss = tf.reduce_sum(self.weights * losses, axis=0) return tf.reduce_mean(loss) if self.reduce else loss
[docs]class RankingSoftmax(ListwiseLoss): """ Ported from tensorflow_ranking. I assume this is the loss from Ai et al. (2019), which I thought could be implemented as :class:`.RankingCrossEntropy`. """ def __init__( self, reduce: bool = True, lambda_weights: LambdaWeight = None, pad_value=MIN_FLOAT_32, epsilon=EPSILON ): super().__init__('ranking_softmax', lambda_weights) self.reduce = tf.constant(reduce) self.pad_value = tf.constant(pad_value, dtype=tf.float32) self.epsilon = tf.constant(epsilon, dtype=tf.float32)
[docs] def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs): labels, logits = self.precompute(y_true, y_pred) losses, weights = self.compute_unreduced(labels, logits) weighted_loss = weights * losses return tf.reduce_mean(weighted_loss) if self.reduce else weighted_loss
[docs] def precompute(self, labels: tf.Tensor, logits: tf.Tensor): is_valid = tf.not_equal(labels, self.pad_value) # ranks = rutils.compute_ranks(logits, is_valid) # TODO handle weights labels = tf.where(is_valid, labels, tf.zeros_like(labels)) logits = tf.where(is_valid, logits, MIN_FLOAT_32 * tf.ones_like(logits)) return labels, logits
[docs] def compute_unreduced(self, labels: tf.Tensor, logits: tf.Tensor): label_sum = tf.reduce_sum(labels, axis=1, keepdims=True) # Pad rows where label_sum = 0 nonzero_mask = tf.reshape(tf.greater(tf.reshape(label_sum, [-1]), 0.0), (-1, 1)) padded_labels = tf.where(nonzero_mask, labels, self.epsilon * tf.ones_like(labels)) padded_label_sum = tf.reduce_sum(padded_labels, axis=1, keepdims=True) # Renormalize the labels labels_for_softmax = padded_labels / padded_label_sum logits_for_softmax = logits weights_for_softmax = tf.reshape(label_sum, [-1]) # Note, this appears to be basically the same thing that I've done in # RankingCrossentropy, except I seem to have misinterpreted how the # weighting is done. losses = tf.nn.softmax_cross_entropy_with_logits(labels_for_softmax, logits_for_softmax) return losses, weights_for_softmax
[docs]class ApproximateNormalizedDiscountedCumulativeGain(ListwiseLoss): def __init__( self, reduce: bool = True, lambda_weights=None, alpha: float = 10.0, pad_value: float = MIN_FLOAT_32 ): """ :param reduce: :param lambda_weights: :param alpha: :param pad_value: """ super().__init__('approximate_ndcg', lambda_weights) self.reduce = tf.constant(reduce, tf.bool) self.alpha = tf.constant(alpha, tf.float32) self.pad_value = tf.constant(pad_value, tf.float32) self.epsilon = tf.constant(EPSILON, tf.float32) def __name__(self): return "approx_ndcg"
[docs] def name(self): return self.__name__
[docs] def compute_unreduced(self, labels: tf.Tensor, logits: tf.Tensor): alpha = self.alpha is_valid = tf.not_equal(labels, self.pad_value) labels = tf.where(is_valid, labels, tf.zeros_like(labels)) logits = tf.where(is_valid, logits, MIN_FLOAT_32 * tf.ones_like(logits)) label_sum = tf.reduce_sum(labels, axis=1, keepdims=True) nonzero_mask = tf.greater(tf.reshape(label_sum, [-1]), 0.0) labels = tf.compat.v1.where(nonzero_mask, labels, self.epsilon * tf.ones_like(labels)) gains = tf.pow(2., tf.cast(labels, dtype=tf.float32)) - 1. ranks = rutils.approximate_ranks(logits, alpha=alpha) discounts = tf.math.log(2.0) / tf.math.log1p(ranks) dcg = tf.reduce_sum(gains * discounts, axis=-1, keepdims=True) cost = -dcg * rutils.inverse_max_dcg(labels) # TODO use lambda weights? Note they are not used in tensorflow_ranking. return cost, tf.reshape(tf.cast(nonzero_mask, tf.float32), [-1, 1])
[docs] def compute(self, labels: tf.Tensor, logits: tf.Tensor, **kwargs): losses, loss_weights = self.compute_unreduced(labels, logits) weights = self.normalize_weights(labels, None) * loss_weights weighted_loss = weights * losses return tf.reduce_sum(weighted_loss) if self.reduce else weighted_loss
[docs]class ApproximateBiDiNormalizedDiscountedCumulativeGain(ListwiseLoss): def __init__( self, reduce: bool = True, max_label: float = 4.0, beta: float = 1.0, alpha: float = 10.0, pad_value: float = MIN_FLOAT_32 ): """ :param reduce: :param max_label: :param beta: :param alpha: :param pad_value: """ super().__init__('approximate_bidi_ndcg', None) self.reduce = tf.constant(reduce, tf.bool) self.max_label = max_label self.beta = beta self.alpha = tf.constant(alpha, tf.float32) self.pad_value = tf.constant(pad_value, tf.float32) self.epsilon = tf.constant(EPSILON, tf.float32)
[docs] def compute_unreduced(self, labels: tf.Tensor, logits: tf.Tensor): """ :param labels: :param logits: :return: """ alpha = self.alpha is_valid = tf.not_equal(labels, self.pad_value) # It's not clear why to me, but breaking this up into decomposable functions # I.e., using the `dcg` function below causes a terrible performance # regression. # Forward f_labels = tf.where(is_valid, labels, tf.zeros_like(labels)) logits = tf.where(is_valid, logits, MIN_FLOAT_32 * tf.ones_like(logits)) label_sum = tf.reduce_sum(f_labels, axis=1, keepdims=True) f_nonzero_mask = tf.greater(tf.reshape(label_sum, [-1]), 0.0) f_labels = tf.compat.v1.where(f_nonzero_mask, f_labels, self.epsilon * tf.ones_like(f_labels)) gains = tf.pow(2., tf.cast(f_labels, dtype=tf.float32)) - 1. ranks = rutils.approximate_ranks(logits, alpha=alpha) discounts = tf.math.log(2.0) / tf.math.log1p(ranks) f_dcg = tf.reduce_sum(gains * discounts, axis=-1, keepdims=True) f_ndcg = f_dcg * rutils.inverse_max_dcg(f_labels) # Backward b_labels = -labels + self.max_label b_labels = tf.where(is_valid, b_labels, tf.zeros_like(labels)) label_sum = tf.reduce_sum(b_labels, axis=1, keepdims=True) b_nonzero_mask = tf.greater(tf.reshape(label_sum, [-1]), 0.0) b_labels = tf.compat.v1.where(b_nonzero_mask, b_labels, self.epsilon * tf.ones_like(b_labels)) gains = tf.pow(2., tf.cast(b_labels, dtype=tf.float32)) - 1. b_dcg = tf.reduce_sum(gains * discounts, axis=-1, keepdims=True) b_ndcg = b_dcg * rutils.inverse_max_dcg(b_labels) cost = b_ndcg - (self.beta * f_ndcg) return cost, tf.reshape(tf.cast(f_nonzero_mask, tf.float32), [-1, 1])
[docs] def compute(self, labels: tf.Tensor, logits: tf.Tensor, **kwargs): """ :param labels: :param logits: :param kwargs: :return: """ losses, loss_weights = self.compute_unreduced(labels, logits) weights = self.normalize_weights(labels, None) * loss_weights weighted_loss = weights * losses return tf.reduce_sum(weighted_loss) if self.reduce else weighted_loss
[docs] def dcg(self, labels: tf.Tensor, logits: tf.Tensor): """ :param labels: :param logits: :return: """ alpha = self.alpha is_valid = tf.not_equal(labels, self.pad_value) labels = tf.where(is_valid, labels, tf.zeros_like(labels)) logits = tf.where(is_valid, logits, MIN_FLOAT_32 * tf.ones_like(logits)) label_sum = tf.reduce_sum(labels, axis=1, keepdims=True) nonzero_mask = tf.greater(tf.reshape(label_sum, [-1]), 0.0) labels = tf.compat.v1.where(nonzero_mask, labels, self.epsilon * tf.ones_like(labels)) gains = tf.pow(2., tf.cast(labels, dtype=tf.float32)) - 1. ranks = rutils.approximate_ranks(logits, alpha=alpha) discounts = tf.math.log(2.0) / tf.math.log1p(ranks) dcg = tf.reduce_sum(gains * discounts, axis=-1, keepdims=True) return dcg, tf.reshape(tf.cast(nonzero_mask, tf.float32), [-1, 1])
[docs]class RankingCrossEntropy(ListwiseLoss): def __init__( self, normalize: bool = True, reduce: bool = True, lambda_weights: LambdaWeight = None, pad_value=MIN_FLOAT_32 ): """ :param normalize: :param reduce: :param lambda_weights: :param pad_value: """ super().__init__('ranking_cross_entropy', lambda_weights) self.normalize = normalize # If true reduce the loss to a single number, otherwise reduce along # the last axis (for compatibility with keras). self.reduce = reduce self.pad_value = tf.constant(pad_value, dtype=tf.float32)
[docs] def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs): """ :param y_true: :param y_pred: :param kwargs: :return: """ # Most numerically stable # log_p = tf.nn.log_softmax(y_pred) # Not as numerically stable, but easier to apply mask mask = tf.cast(tf.not_equal(y_true, self.pad_value), tf.float32) inv_mask = tf.cast(tf.math.logical_not(tf.cast(mask, tf.bool)), tf.float32) softmax = masked_softmax(y_pred, mask) log_p = mask * tf.math.log(softmax + inv_mask) denom = tf.reduce_sum(mask * tf.math.abs(y_true), axis=1) if self.normalize else 1 xe = -tf.math.divide_no_nan(tf.reduce_sum(y_true * log_p, axis=1), denom) return tf.reduce_mean(xe) if self.reduce else xe
[docs]class MeanSquaredError(NetworkLoss): def __init__(self, reduce: bool = True, weight_by_labels: bool = True, pad_value=MIN_FLOAT_32): """ :param reduce: :param weight_by_labels: :param pad_value: """ self.reduce = tf.constant(reduce) self.weight_by_labels = tf.constant(weight_by_labels) self.pad_value = tf.constant(pad_value, dtype=tf.float32)
[docs] def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs): """ :param y_true: :param y_pred: :param kwargs: :return: """ mask = tf.cast(tf.not_equal(y_true, self.pad_value), tf.float32) sum_sqr = tf.reduce_sum(tf.square(mask * (y_true - y_pred)), axis=-1) n = tf.reduce_sum(mask, axis=1) err = tf.math.divide_no_nan(sum_sqr, n) if self.weight_by_labels: err = err * tf.math.reduce_sum(mask * y_true, axis=1) return tf.reduce_mean(err) if self.reduce else err