# Copyright 2020 The TensorFlow Ranking Authors.
# Porting and additional code Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Python Modules
import abc
import logging
from typing import Iterable, Optional, Union, List, Callable, Dict
# 3rd Party Modules
import numpy as np
import tensorflow as tf
# Project Modules
import deletor.ranking.utils as rutils
from deletor.constants import EPSILON, MIN_FLOAT_32
log = logging.getLogger(__name__)
[docs]def check_tensor_shapes(tensors: List[tf.Tensor]):
"""Checks the tensor shapes to be compatible."""
# TODO I'm not sure if a python list is a good idea with AutoGraph.
# I may want to use a TensorArray instead?
for tensor in tensors:
tensor.get_shape().assert_has_rank(2)
tensor.get_shape().assert_is_compatible_with(tensors[0].get_shape())
[docs]def apply_pairwise_op(op, tensor: tf.Tensor):
"""
:param op:
:param tensor:
:return:
"""
check_tensor_shapes([tensor])
return op(tf.expand_dims(tensor, 2), tf.expand_dims(tensor, 1))
[docs]def get_valid_pairs_and_clean_labels(labels: tf.Tensor):
"""Returns a boolean Tensor for valid pairs and cleaned labels."""
# Is this assert necessary? The shape should be validated by check_tensor_shapes already
labels.get_shape().assert_has_rank(2)
is_valid = rutils.is_label_valid(labels)
valid_pairs = apply_pairwise_op(tf.logical_and, is_valid)
labels = tf.where(is_valid, labels, tf.zeros_like(labels))
return valid_pairs, labels
[docs]def masked_softmax(x: tf.Tensor, mask: tf.Tensor):
"""
:param x:
:param mask:
:return:
"""
max_value = tf.reshape(tf.reduce_max(x, axis=-1), (-1, 1))
x = x - max_value
num = mask * tf.math.exp(x)
den = tf.reshape(tf.reduce_sum(num, axis=-1), (-1, 1))
return tf.math.divide_no_nan(num, den)
[docs]class LambdaWeight(abc.ABC):
"""
Interface for ranking metric optimization.
This class wraps weights used in the LambdaLoss framework for ranking metric
optimization (https://ai.google/research/pubs/pub47258). Such an interface is
to be instantiated by concrete lambda weight models. The instance is used
together with standard loss such as logistic loss and softmax loss.
"""
[docs] @abc.abstractmethod
def pair_weights(self, labels, ranks):
"""
Returns the weight adjustment `Tensor` for example pairs.
:param labels: A dense `Tensor` of labels with shape
[batch_size, list_size].
:param ranks: A dense `Tensor` of ranks with the same shape as `labels`
that are sorted by logits.
:return: A `Tensor` that can weight example pairs.
"""
raise NotImplementedError('Calling an abstract method.')
# noinspection PyMethodMayBeStatic
[docs] def individual_weights(self, labels, ranks):
"""
Returns the weight `Tensor` for individual examples.
:param labels: A dense `Tensor` of labels with shape
[batch_size, list_size].
:param ranks: A dense `Tensor` of ranks with the same shape as `labels`
that are sorted by logits.
:return: A `Tensor` that can weight individual examples.
"""
del ranks
return labels
[docs]class DCGLambdaWeight(LambdaWeight):
"""LambdaWeight for Discounted Cumulative Gain metric."""
def __init__(
self,
k: int = None,
gain_fn: Callable = lambda label: label,
discount_fn: Callable = lambda rank: 1.0 / rank,
normalized: bool = False,
smooth_fraction: float = 0.0
):
"""
Constructor.
Ranks are 1-based, not 0-based. Given rank i and j, there are two types of
pair weights:
| u = \|rank_discount_fn(\|i-j\|) - rank_discount_fn(\|i-j\| + 1)\|
| v = \|rank_discount_fn(i) - rank_discount_fn(j)\|
| where u is the newly introduced one in LambdaLoss paper
| (https://ai.google/research/pubs/pub47258) and v is the original one in the
| LambdaMART paper "From RankNet to LambdaRank to LambdaMART: An Overview".
| The final pair weight contribution of ranks is
| (1-smooth_fraction) * u + smooth_fraction * v.
:param k: The top k for the DCG metric.
:param gain_fn: Transforms labels.
:param discount_fn: The rank discount function.
:param normalized: If True, normalize weight by the max DCG.
:param smooth_fraction: parameter to control the contribution from
LambdaMART.
"""
self.k = k
self.gain_fn = gain_fn
self.discount_fn = discount_fn
self.normalized = normalized
self.smooth_fraction = smooth_fraction
if smooth_fraction < 0 or smooth_fraction > 1:
raise ValueError(f"smooth_fraction should be in range [0, 1]. ({smooth_fraction})")
[docs] def pair_weights(self, labels, ranks):
check_tensor_shapes([labels, ranks])
valid_pairs, labels = get_valid_pairs_and_clean_labels(labels)
gain = self.gain_fn(labels)
if self.normalized is True:
gain *= rutils.inverse_max_dcg(labels, self.gain_fn, self.discount_fn, self.k)
pair_gain = apply_pairwise_op(tf.subtract, gain)
pair_gain *= tf.cast(valid_pairs, tf.float32)
list_size = tf.shape(labels)[1]
k = list_size if self.k is None else self.k
u = self.discount_for_relative_rank_diff(ranks, k)
v = self.discount_for_absolute_rank(ranks, k)
pair_discount = (1.0 - self.smooth_fraction) * u + self.smooth_fraction * v
pair_weight = tf.abs(pair_gain) * pair_discount
if self.k is None:
return pair_weight
pair_mask = apply_pairwise_op(tf.logical_or, tf.less_equal(ranks, k))
return pair_weight * tf.cast(pair_mask, tf.float32)
[docs] def individual_weights(self, labels, ranks):
check_tensor_shapes([labels, ranks])
labels = tf.where(rutils.is_label_valid(labels), labels, tf.zeros_like(labels))
gain = self.gain_fn(labels)
if self.normalized:
gain *= rutils.inverse_max_dcg(labels, self.gain_fn, self.discount_fn, self.k)
rank_discount = self.discount_fn(tf.cast(ranks, tf.float32))
return gain * rank_discount
# region Utility Methods
[docs] def discount_for_relative_rank_diff(self, ranks, k):
"""Rank-based discount in the LambdaLoss paper."""
# The LambdaLoss is not well defined when topn is active and topn <
# list_size. We cap the rank of examples to topn + 1 so that the rank
# difference is capped to topn. This is just a convenient upper bound
# when topn is active. We need to revisit this.
capped_rank = tf.where(
tf.greater(ranks, k),
tf.ones_like(ranks) * (k + 1),
ranks
)
rank_diff = tf.cast(tf.abs(apply_pairwise_op(tf.subtract, capped_rank)), tf.float32)
pair_discount = tf.where(
tf.greater(rank_diff, 0),
tf.abs(self.discount_fn(rank_diff) - self.discount_fn(rank_diff + 1)),
tf.zeros_like(rank_diff)
)
return pair_discount
[docs] def discount_for_absolute_rank(self, ranks, k):
"""Standard discount in the LambdaMART paper."""
# When the rank discount is (1 / rank) for example, the discount is
# |1 / r_i - 1 / r_j|. When i or j > k, the discount becomes 0.
rank_discount = tf.where(
tf.greater(ranks, k),
tf.zeros_like(tf.cast(ranks, tf.float32)),
self.discount_fn(tf.cast(ranks, tf.float32))
)
pair_discount = tf.abs(apply_pairwise_op(tf.subtract, rank_discount))
return pair_discount
# endregion Utility Methods
[docs]class NetworkLoss(tf.keras.losses.Loss, abc.ABC):
"""
Base class for loss functions.
"""
def __call__(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs):
"""
:param y_true:
:param y_pred:
:param kwargs:
:return:
"""
y_true, y_pred = self.validate_inputs(y_true, y_pred)
return self.compute(y_true, y_pred)
[docs] @abc.abstractmethod
@tf.function(experimental_relax_shapes=True)
def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs):
"""
:param y_true:
:param y_pred:
:param kwargs:
:return:
"""
pass
# noinspection DuplicatedCode
# noinspection PyMethodMayBeStatic
[docs] def normalize_weights(self, labels, weights):
"""
:param labels:
:param weights:
:return:
"""
del labels
return tf.constant(1.0, tf.float32) if weights is None else weights
[docs]class ListwiseLoss(NetworkLoss, abc.ABC):
def __init__(
self,
name: str,
lambda_weight: LambdaWeight = None,
params: Dict = None
):
"""
:param name:
:param lambda_weight:
:param params:
"""
super().__init__()
self._name = name
self._lambda_weight = lambda_weight
self._params = params or dict()
@property
def name(self):
return self._name
[docs] def normalize_weights(self, labels, weights):
if weights is None:
return 1.0
return tf.math.divide_no_nan(
tf.reduce_sum(weights * labels, axis=1, keepdims=True),
tf.reduce_sum(labels, axis=1, keepdims=True)
)
[docs]class MultiLoss(NetworkLoss):
def __init__(
self,
base_losses: Iterable[NetworkLoss],
weights: Optional[Union[float, List[float], np.ndarray]] = None,
reduce: bool = True
):
"""
:param base_losses:
:param weights:
:param reduce:
"""
self.reduce = reduce
self.base_losses = [loss for loss in base_losses]
if weights is None:
self.weights = tf.constant([1.0])
else:
self.weights = tf.constant(weights)
if tf.rank(self.weights) > 1:
raise ValueError(
"The weights must be a scalar or a 1d array with the same "
"number of values as the base losses."
)
if tf.size(self.weights) != 1 and tf.size(self.weights) != len(self.base_losses):
raise ValueError(
f"There must be the same number of weights as losses: "
f"{self.weights.size} weights and {len(self.base_losses)} losses"
)
self.weights = tf.reshape(self.weights, (-1, 1))
[docs] def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs):
losses = tf.stack([loss(y_true, y_pred) for loss in self.base_losses])
loss = tf.reduce_sum(self.weights * losses, axis=0)
return tf.reduce_mean(loss) if self.reduce else loss
[docs]class RankingSoftmax(ListwiseLoss):
"""
Ported from tensorflow_ranking. I assume this is the loss from
Ai et al. (2019), which I thought could be implemented as
:class:`.RankingCrossEntropy`.
"""
def __init__(
self,
reduce: bool = True,
lambda_weights: LambdaWeight = None,
pad_value=MIN_FLOAT_32,
epsilon=EPSILON
):
super().__init__('ranking_softmax', lambda_weights)
self.reduce = tf.constant(reduce)
self.pad_value = tf.constant(pad_value, dtype=tf.float32)
self.epsilon = tf.constant(epsilon, dtype=tf.float32)
[docs] def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs):
labels, logits = self.precompute(y_true, y_pred)
losses, weights = self.compute_unreduced(labels, logits)
weighted_loss = weights * losses
return tf.reduce_mean(weighted_loss) if self.reduce else weighted_loss
[docs] def precompute(self, labels: tf.Tensor, logits: tf.Tensor):
is_valid = tf.not_equal(labels, self.pad_value)
# ranks = rutils.compute_ranks(logits, is_valid)
# TODO handle weights
labels = tf.where(is_valid, labels, tf.zeros_like(labels))
logits = tf.where(is_valid, logits, MIN_FLOAT_32 * tf.ones_like(logits))
return labels, logits
[docs] def compute_unreduced(self, labels: tf.Tensor, logits: tf.Tensor):
label_sum = tf.reduce_sum(labels, axis=1, keepdims=True)
# Pad rows where label_sum = 0
nonzero_mask = tf.reshape(tf.greater(tf.reshape(label_sum, [-1]), 0.0), (-1, 1))
padded_labels = tf.where(nonzero_mask, labels, self.epsilon * tf.ones_like(labels))
padded_label_sum = tf.reduce_sum(padded_labels, axis=1, keepdims=True)
# Renormalize the labels
labels_for_softmax = padded_labels / padded_label_sum
logits_for_softmax = logits
weights_for_softmax = tf.reshape(label_sum, [-1])
# Note, this appears to be basically the same thing that I've done in
# RankingCrossentropy, except I seem to have misinterpreted how the
# weighting is done.
losses = tf.nn.softmax_cross_entropy_with_logits(labels_for_softmax, logits_for_softmax)
return losses, weights_for_softmax
[docs]class ApproximateNormalizedDiscountedCumulativeGain(ListwiseLoss):
def __init__(
self,
reduce: bool = True,
lambda_weights=None,
alpha: float = 10.0,
pad_value: float = MIN_FLOAT_32
):
"""
:param reduce:
:param lambda_weights:
:param alpha:
:param pad_value:
"""
super().__init__('approximate_ndcg', lambda_weights)
self.reduce = tf.constant(reduce, tf.bool)
self.alpha = tf.constant(alpha, tf.float32)
self.pad_value = tf.constant(pad_value, tf.float32)
self.epsilon = tf.constant(EPSILON, tf.float32)
def __name__(self):
return "approx_ndcg"
[docs] def name(self):
return self.__name__
[docs] def compute_unreduced(self, labels: tf.Tensor, logits: tf.Tensor):
alpha = self.alpha
is_valid = tf.not_equal(labels, self.pad_value)
labels = tf.where(is_valid, labels, tf.zeros_like(labels))
logits = tf.where(is_valid, logits, MIN_FLOAT_32 * tf.ones_like(logits))
label_sum = tf.reduce_sum(labels, axis=1, keepdims=True)
nonzero_mask = tf.greater(tf.reshape(label_sum, [-1]), 0.0)
labels = tf.compat.v1.where(nonzero_mask, labels, self.epsilon * tf.ones_like(labels))
gains = tf.pow(2., tf.cast(labels, dtype=tf.float32)) - 1.
ranks = rutils.approximate_ranks(logits, alpha=alpha)
discounts = tf.math.log(2.0) / tf.math.log1p(ranks)
dcg = tf.reduce_sum(gains * discounts, axis=-1, keepdims=True)
cost = -dcg * rutils.inverse_max_dcg(labels)
# TODO use lambda weights? Note they are not used in tensorflow_ranking.
return cost, tf.reshape(tf.cast(nonzero_mask, tf.float32), [-1, 1])
[docs] def compute(self, labels: tf.Tensor, logits: tf.Tensor, **kwargs):
losses, loss_weights = self.compute_unreduced(labels, logits)
weights = self.normalize_weights(labels, None) * loss_weights
weighted_loss = weights * losses
return tf.reduce_sum(weighted_loss) if self.reduce else weighted_loss
[docs]class ApproximateBiDiNormalizedDiscountedCumulativeGain(ListwiseLoss):
def __init__(
self,
reduce: bool = True,
max_label: float = 4.0,
beta: float = 1.0,
alpha: float = 10.0,
pad_value: float = MIN_FLOAT_32
):
"""
:param reduce:
:param max_label:
:param beta:
:param alpha:
:param pad_value:
"""
super().__init__('approximate_bidi_ndcg', None)
self.reduce = tf.constant(reduce, tf.bool)
self.max_label = max_label
self.beta = beta
self.alpha = tf.constant(alpha, tf.float32)
self.pad_value = tf.constant(pad_value, tf.float32)
self.epsilon = tf.constant(EPSILON, tf.float32)
[docs] def compute_unreduced(self, labels: tf.Tensor, logits: tf.Tensor):
"""
:param labels:
:param logits:
:return:
"""
alpha = self.alpha
is_valid = tf.not_equal(labels, self.pad_value)
# It's not clear why to me, but breaking this up into decomposable functions
# I.e., using the `dcg` function below causes a terrible performance
# regression.
# Forward
f_labels = tf.where(is_valid, labels, tf.zeros_like(labels))
logits = tf.where(is_valid, logits, MIN_FLOAT_32 * tf.ones_like(logits))
label_sum = tf.reduce_sum(f_labels, axis=1, keepdims=True)
f_nonzero_mask = tf.greater(tf.reshape(label_sum, [-1]), 0.0)
f_labels = tf.compat.v1.where(f_nonzero_mask, f_labels, self.epsilon * tf.ones_like(f_labels))
gains = tf.pow(2., tf.cast(f_labels, dtype=tf.float32)) - 1.
ranks = rutils.approximate_ranks(logits, alpha=alpha)
discounts = tf.math.log(2.0) / tf.math.log1p(ranks)
f_dcg = tf.reduce_sum(gains * discounts, axis=-1, keepdims=True)
f_ndcg = f_dcg * rutils.inverse_max_dcg(f_labels)
# Backward
b_labels = -labels + self.max_label
b_labels = tf.where(is_valid, b_labels, tf.zeros_like(labels))
label_sum = tf.reduce_sum(b_labels, axis=1, keepdims=True)
b_nonzero_mask = tf.greater(tf.reshape(label_sum, [-1]), 0.0)
b_labels = tf.compat.v1.where(b_nonzero_mask, b_labels, self.epsilon * tf.ones_like(b_labels))
gains = tf.pow(2., tf.cast(b_labels, dtype=tf.float32)) - 1.
b_dcg = tf.reduce_sum(gains * discounts, axis=-1, keepdims=True)
b_ndcg = b_dcg * rutils.inverse_max_dcg(b_labels)
cost = b_ndcg - (self.beta * f_ndcg)
return cost, tf.reshape(tf.cast(f_nonzero_mask, tf.float32), [-1, 1])
[docs] def compute(self, labels: tf.Tensor, logits: tf.Tensor, **kwargs):
"""
:param labels:
:param logits:
:param kwargs:
:return:
"""
losses, loss_weights = self.compute_unreduced(labels, logits)
weights = self.normalize_weights(labels, None) * loss_weights
weighted_loss = weights * losses
return tf.reduce_sum(weighted_loss) if self.reduce else weighted_loss
[docs] def dcg(self, labels: tf.Tensor, logits: tf.Tensor):
"""
:param labels:
:param logits:
:return:
"""
alpha = self.alpha
is_valid = tf.not_equal(labels, self.pad_value)
labels = tf.where(is_valid, labels, tf.zeros_like(labels))
logits = tf.where(is_valid, logits, MIN_FLOAT_32 * tf.ones_like(logits))
label_sum = tf.reduce_sum(labels, axis=1, keepdims=True)
nonzero_mask = tf.greater(tf.reshape(label_sum, [-1]), 0.0)
labels = tf.compat.v1.where(nonzero_mask, labels, self.epsilon * tf.ones_like(labels))
gains = tf.pow(2., tf.cast(labels, dtype=tf.float32)) - 1.
ranks = rutils.approximate_ranks(logits, alpha=alpha)
discounts = tf.math.log(2.0) / tf.math.log1p(ranks)
dcg = tf.reduce_sum(gains * discounts, axis=-1, keepdims=True)
return dcg, tf.reshape(tf.cast(nonzero_mask, tf.float32), [-1, 1])
[docs]class RankingCrossEntropy(ListwiseLoss):
def __init__(
self, normalize: bool = True,
reduce: bool = True,
lambda_weights: LambdaWeight = None,
pad_value=MIN_FLOAT_32
):
"""
:param normalize:
:param reduce:
:param lambda_weights:
:param pad_value:
"""
super().__init__('ranking_cross_entropy', lambda_weights)
self.normalize = normalize
# If true reduce the loss to a single number, otherwise reduce along
# the last axis (for compatibility with keras).
self.reduce = reduce
self.pad_value = tf.constant(pad_value, dtype=tf.float32)
[docs] def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs):
"""
:param y_true:
:param y_pred:
:param kwargs:
:return:
"""
# Most numerically stable
# log_p = tf.nn.log_softmax(y_pred)
# Not as numerically stable, but easier to apply mask
mask = tf.cast(tf.not_equal(y_true, self.pad_value), tf.float32)
inv_mask = tf.cast(tf.math.logical_not(tf.cast(mask, tf.bool)), tf.float32)
softmax = masked_softmax(y_pred, mask)
log_p = mask * tf.math.log(softmax + inv_mask)
denom = tf.reduce_sum(mask * tf.math.abs(y_true), axis=1) if self.normalize else 1
xe = -tf.math.divide_no_nan(tf.reduce_sum(y_true * log_p, axis=1), denom)
return tf.reduce_mean(xe) if self.reduce else xe
[docs]class MeanSquaredError(NetworkLoss):
def __init__(self, reduce: bool = True, weight_by_labels: bool = True, pad_value=MIN_FLOAT_32):
"""
:param reduce:
:param weight_by_labels:
:param pad_value:
"""
self.reduce = tf.constant(reduce)
self.weight_by_labels = tf.constant(weight_by_labels)
self.pad_value = tf.constant(pad_value, dtype=tf.float32)
[docs] def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs):
"""
:param y_true:
:param y_pred:
:param kwargs:
:return:
"""
mask = tf.cast(tf.not_equal(y_true, self.pad_value), tf.float32)
sum_sqr = tf.reduce_sum(tf.square(mask * (y_true - y_pred)), axis=-1)
n = tf.reduce_sum(mask, axis=1)
err = tf.math.divide_no_nan(sum_sqr, n)
if self.weight_by_labels:
err = err * tf.math.reduce_sum(mask * y_true, axis=1)
return tf.reduce_mean(err) if self.reduce else err