# Copyright 2020 The TensorFlow Ranking Authors.
# Porting and additional code Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Python Modules
import abc
import logging
from typing import Callable, Optional
# 3rd Party Modules
import tensorflow as tf
# Project Modules
from deletor.constants import MIN_FLOAT_32
log = logging.getLogger(__name__)
[docs]class Metric(abc.ABC):
"""
Base class for metric function classes. The compute method of implementing
classes should return a metric value for each instance in a batch. Use
an :class:`.AggregateMetric` to reduce the individual scores to a single
number.
"""
def __call__(self, y_true: tf.Tensor, y_pred: tf.Tensor, weights: tf.Tensor = None):
return self.compute(y_true, y_pred, weights)
@property
def name(self):
return type(self).__name__
[docs] @abc.abstractmethod
@tf.function(experimental_relax_shapes=True)
def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, weights: tf.Tensor = None, **kwargs):
pass
# noinspection DuplicatedCode
[docs]class AggregateMetric(abc.ABC):
def __init__(self, metric: Metric):
self.metric = metric
def __call__(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs):
return self.aggregate(y_true, y_pred)
@property
def name(self):
return f"{type(self).__name__}_{self.metric.name}"
[docs] @abc.abstractmethod
def aggregate(self, y_true: tf.Tensor, y_pred: tf.Tensor):
pass
[docs]class Mean(AggregateMetric):
def __init__(self, metric: Metric):
super().__init__(metric)
[docs] def aggregate(self, y_true: tf.Tensor, y_pred: tf.Tensor):
return tf.reduce_mean(self.metric(y_true, y_pred))
[docs]class Sum(AggregateMetric):
def __init__(self, metric: Metric):
super().__init__(metric)
[docs] def aggregate(self, y_true: tf.Tensor, y_pred: tf.Tensor):
return tf.reduce_sum(self.metric(y_true, y_pred))
[docs]class DiscountedCumulativeGain(Metric):
# Modeled after sklearn.metrics.dcg_score and tensorflow_ranking
def __init__(
self,
k: Optional[int] = None,
gain_fn: Callable = None,
discount_fn: Callable = None,
pad_value=MIN_FLOAT_32
):
k = k or 0
if k < 0:
raise ValueError(f"k should be an integer >= 0: {k}")
self.k = k
self.k_const = tf.constant(k, dtype=tf.int32)
self.gain_fn = gain_fn
self.discount_fn = discount_fn
self.pad_value = tf.constant(pad_value, dtype=tf.float32)
if gain_fn is None:
self.gain_fn = self.default_gain
if discount_fn is None:
self.discount_fn = self.default_discount
@property
def name(self):
return f"dcg_at_{self.k:03d}"
[docs] @tf.function(experimental_relax_shapes=True)
def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, weights: tf.Tensor = None, **kwargs):
# Make sure the tensors are float32
y_true = tf.cast(y_true, tf.float32)
y_pred = tf.cast(y_pred, tf.float32)
# Convert any ragged tensors to dense ones and pad with the appropriate
# values depending on if they are labels or scores.
y_true = y_true.to_tensor(self.pad_value) if isinstance(y_true, tf.RaggedTensor) else y_true
y_pred = y_pred.to_tensor(self.pad_value) if isinstance(y_pred, tf.RaggedTensor) else y_pred
# Create a mask to filter out the padded values
pad_mask = tf.cast(tf.math.not_equal(y_true, self.pad_value), tf.float32)
# Replace the inf values so we don't have numerical issues
# y_true = tf.where(tf.math.is_inf(y_true), tf.zeros_like(y_true), y_true)
y_true = pad_mask * y_true
# Make uniform weights if they are not already provided
weights = tf.ones_like(y_true, dtype=tf.float32) if weights is None else weights
# Get some useful values for simplifying the computation
shape = tf.shape(y_true)
n_examples, n_labels = shape[0], shape[1]
k = tf.minimum(self.k_const, n_labels) if self.k_const > 0 else n_labels
# Mask out anything not in the top k
k_mask = tf.cast(tf.range(n_labels) < k, tf.float32)
# The full mask is a combination of the pad mask and k mask
mask = pad_mask * k_mask
# Get the true labels sorted by the predicted scores
ranking = self.rank_by_scores(y_true, y_pred)
# Make sure the document weights are sorted the same way
weights = self.rank_by_scores(weights, y_pred)
# Calculate the DCG
gain = self.gain_fn(ranking)
discount = self.discount_fn(n_labels+1)
return tf.reduce_sum(mask * weights * gain / discount, axis=-1)
[docs] @staticmethod
def default_gain(relevance):
return tf.math.pow(2.0, relevance) - 1.0
[docs] @staticmethod
def default_discount(p):
return tf.math.log1p(tf.cast(tf.range(1, p), tf.float32)) / tf.math.log(2.0)
# region Utility Methods
[docs] @classmethod
@tf.function(experimental_relax_shapes=True)
def rank_by_scores(cls, y_true: tf.Tensor, y_pred: tf.Tensor):
rank_idx = tf.argsort(y_pred, direction='DESCENDING')
gather_indices = cls.make_rank_indices(rank_idx)
return tf.reshape(tf.gather_nd(y_true, gather_indices), tf.shape(y_true))
[docs] @classmethod
@tf.function(experimental_relax_shapes=True)
def make_rank_indices(cls, ranking: tf.Tensor):
n_examples, n_labels = tf.shape(ranking)[0], tf.shape(ranking)[1]
rows = tf.repeat(tf.range(n_examples), n_labels)
cols = tf.reshape(ranking, [-1])
return tf.stack([rows, cols], -1)
# endregion Utility Methods
[docs]class NormalizedDiscountedCumulativeGain(Metric):
def __init__(
self,
k: Optional[int] = None,
gain_fn: Callable = None,
discount_fn: Callable = None,
):
self.k = k
self.dcg = DiscountedCumulativeGain(k, gain_fn, discount_fn)
@property
def name(self):
return f"ndcg_at_{self.k:03d}"
[docs] def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, weights: tf.Tensor = None, **kwargs):
base_gain = self.dcg(y_true, y_pred)
opti_gain = self.dcg(y_true, y_true)
mask = tf.cast(tf.not_equal(opti_gain, 0), tf.float32)
ndcg = tf.math.divide_no_nan(mask * base_gain, opti_gain)
return ndcg