Source code for deletor.metrics

# Copyright 2020 The TensorFlow Ranking Authors.
# Porting and additional code Copyright 2020 Reid Swanson
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Python Modules
import abc
import logging

from typing import Callable, Optional

# 3rd Party Modules
import tensorflow as tf

# Project Modules
from deletor.constants import MIN_FLOAT_32

log = logging.getLogger(__name__)


[docs]class Metric(abc.ABC): """ Base class for metric function classes. The compute method of implementing classes should return a metric value for each instance in a batch. Use an :class:`.AggregateMetric` to reduce the individual scores to a single number. """ def __call__(self, y_true: tf.Tensor, y_pred: tf.Tensor, weights: tf.Tensor = None): return self.compute(y_true, y_pred, weights) @property def name(self): return type(self).__name__
[docs] @abc.abstractmethod @tf.function(experimental_relax_shapes=True) def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, weights: tf.Tensor = None, **kwargs): pass
# noinspection DuplicatedCode
[docs] @staticmethod def validate_inputs(y_true: tf.Tensor, y_pred: tf.Tensor): """ Make sure the inputs have rank 2 and are the same shape. :param y_true: :param y_pred: :return: """ true_shape, pred_shape = tf.shape(y_true), tf.shape(y_pred) if tf.rank(y_true) != 2: log.error("The input tensors must have exactly 2 dimensions.") raise ValueError("The input tensors must have exactly 2 dimensions") if tf.math.not_equal(true_shape, pred_shape): log.error( f"y_true and y_pred must have the same shape: " f"{true_shape} != {pred_shape}" ) raise ValueError("y_true and y_pred must have the same shape")
[docs]class AggregateMetric(abc.ABC): def __init__(self, metric: Metric): self.metric = metric def __call__(self, y_true: tf.Tensor, y_pred: tf.Tensor, **kwargs): return self.aggregate(y_true, y_pred) @property def name(self): return f"{type(self).__name__}_{self.metric.name}"
[docs] @abc.abstractmethod def aggregate(self, y_true: tf.Tensor, y_pred: tf.Tensor): pass
[docs]class Mean(AggregateMetric): def __init__(self, metric: Metric): super().__init__(metric)
[docs] def aggregate(self, y_true: tf.Tensor, y_pred: tf.Tensor): return tf.reduce_mean(self.metric(y_true, y_pred))
[docs]class Sum(AggregateMetric): def __init__(self, metric: Metric): super().__init__(metric)
[docs] def aggregate(self, y_true: tf.Tensor, y_pred: tf.Tensor): return tf.reduce_sum(self.metric(y_true, y_pred))
[docs]class DiscountedCumulativeGain(Metric): # Modeled after sklearn.metrics.dcg_score and tensorflow_ranking def __init__( self, k: Optional[int] = None, gain_fn: Callable = None, discount_fn: Callable = None, pad_value=MIN_FLOAT_32 ): k = k or 0 if k < 0: raise ValueError(f"k should be an integer >= 0: {k}") self.k = k self.k_const = tf.constant(k, dtype=tf.int32) self.gain_fn = gain_fn self.discount_fn = discount_fn self.pad_value = tf.constant(pad_value, dtype=tf.float32) if gain_fn is None: self.gain_fn = self.default_gain if discount_fn is None: self.discount_fn = self.default_discount @property def name(self): return f"dcg_at_{self.k:03d}"
[docs] @tf.function(experimental_relax_shapes=True) def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, weights: tf.Tensor = None, **kwargs): # Make sure the tensors are float32 y_true = tf.cast(y_true, tf.float32) y_pred = tf.cast(y_pred, tf.float32) # Convert any ragged tensors to dense ones and pad with the appropriate # values depending on if they are labels or scores. y_true = y_true.to_tensor(self.pad_value) if isinstance(y_true, tf.RaggedTensor) else y_true y_pred = y_pred.to_tensor(self.pad_value) if isinstance(y_pred, tf.RaggedTensor) else y_pred # Create a mask to filter out the padded values pad_mask = tf.cast(tf.math.not_equal(y_true, self.pad_value), tf.float32) # Replace the inf values so we don't have numerical issues # y_true = tf.where(tf.math.is_inf(y_true), tf.zeros_like(y_true), y_true) y_true = pad_mask * y_true # Make uniform weights if they are not already provided weights = tf.ones_like(y_true, dtype=tf.float32) if weights is None else weights # Get some useful values for simplifying the computation shape = tf.shape(y_true) n_examples, n_labels = shape[0], shape[1] k = tf.minimum(self.k_const, n_labels) if self.k_const > 0 else n_labels # Mask out anything not in the top k k_mask = tf.cast(tf.range(n_labels) < k, tf.float32) # The full mask is a combination of the pad mask and k mask mask = pad_mask * k_mask # Get the true labels sorted by the predicted scores ranking = self.rank_by_scores(y_true, y_pred) # Make sure the document weights are sorted the same way weights = self.rank_by_scores(weights, y_pred) # Calculate the DCG gain = self.gain_fn(ranking) discount = self.discount_fn(n_labels+1) return tf.reduce_sum(mask * weights * gain / discount, axis=-1)
[docs] @staticmethod def default_gain(relevance): return tf.math.pow(2.0, relevance) - 1.0
[docs] @staticmethod def default_discount(p): return tf.math.log1p(tf.cast(tf.range(1, p), tf.float32)) / tf.math.log(2.0)
# region Utility Methods
[docs] @classmethod @tf.function(experimental_relax_shapes=True) def rank_by_scores(cls, y_true: tf.Tensor, y_pred: tf.Tensor): rank_idx = tf.argsort(y_pred, direction='DESCENDING') gather_indices = cls.make_rank_indices(rank_idx) return tf.reshape(tf.gather_nd(y_true, gather_indices), tf.shape(y_true))
[docs] @classmethod @tf.function(experimental_relax_shapes=True) def make_rank_indices(cls, ranking: tf.Tensor): n_examples, n_labels = tf.shape(ranking)[0], tf.shape(ranking)[1] rows = tf.repeat(tf.range(n_examples), n_labels) cols = tf.reshape(ranking, [-1]) return tf.stack([rows, cols], -1)
# endregion Utility Methods
[docs]class NormalizedDiscountedCumulativeGain(Metric): def __init__( self, k: Optional[int] = None, gain_fn: Callable = None, discount_fn: Callable = None, ): self.k = k self.dcg = DiscountedCumulativeGain(k, gain_fn, discount_fn) @property def name(self): return f"ndcg_at_{self.k:03d}"
[docs] def compute(self, y_true: tf.Tensor, y_pred: tf.Tensor, weights: tf.Tensor = None, **kwargs): base_gain = self.dcg(y_true, y_pred) opti_gain = self.dcg(y_true, y_true) mask = tf.cast(tf.not_equal(opti_gain, 0), tf.float32) ndcg = tf.math.divide_no_nan(mask * base_gain, opti_gain) return ndcg