Source code for grewtse.evaluators.evaluator

import ast
from transformers import AutoModelForMaskedLM, AutoModelForCausalLM, AutoTokenizer
from transformers import PreTrainedModel, PreTrainedTokenizerBase
from pathlib import Path
from typing import Tuple, NamedTuple, List, Any, Literal, Union, Optional
from contextlib import contextmanager
import torch.nn.functional as F
from tqdm import tqdm
import pandas as pd
import itertools
import logging
import torch
import math

from grewtse.utils.validation import load_and_validate_mp_dataset
from grewtse.evaluators.metrics import (
    compute_normalised_surprisal_difference,
    compute_average_surprisal_difference,
    compute_entropy_based_certainty,
    compute_accuracy,
    compute_surprisal,
    compute_mean,
)

EVAL_TEMPLATE = {}


# --- Helper function ---
def _init_row_results(row):
    row_results = EVAL_TEMPLATE.copy()
    row_results.update(row._asdict())
    return row_results


class TooManyMasksException(Exception):
    def __init__(self, message: str):
        self.message = message
        super().__init__(f"TMM Exception: {message}")


class Prediction(NamedTuple):
    token: str
    prob: float
    surprisal: float


[docs] class GrewTSEvaluator: """ An evaluation class designed specifically for rapid syntactic evaluation of models available on the Hugging Face platform. """ Task = Literal["mlm", "ntp"] TSEvaluationType = Literal["token-level","sentence-level"] def __init__(self): self.evaluator = Evaluator() self.evaluation_dataset = None
[docs] def evaluate_model( self, mp_dataset: pd.DataFrame, model_repo: str, task_type: Task, # can be 'mlm' or 'ntp' evaluation_type: TSEvaluationType = "token-level", # can be "token-level" or "sentence-level" evaluation_cols: list[str] = ["form_grammatical", "form_ungrammatical"], entropy_topk: int = 100, save_to: Optional[str] = None, device: str = "cpu", row_limit: Optional[int] = None, ) -> pd.DataFrame: """ Function for carrying out Targeted Syntactic Evaluation for either encoder or decoder models. :param mp_dataset: A DataFrame containing the Minimal-Pair Dataset generated by Grew-TSE. :param model_repo: the Hugging Face model repository link. :param task_type: choose either 'mlm' (masked language modelling) or 'ntp' (next-token prediction). :param evaluation_type: choose how to calculate accuracy, 'token-level' or 'sentence-level' :param evaluation_cols: a list of strings indicating the columns containing the target words / tokens. Defaults to `form_grammatical` and `form_ungrammatical`, as used by GrewTSEPipe. :param entropy_topk: how many probabilities are taken into account for calculating the model uncertainty. :param device: the device that you want to use e.g. cpu, cuda. Defaults to `cpu`. :param save_to: a path to save the resulting CSV file to. :param row_limit: place a limit on the number of samples / rows evaluated in the Minimal-Pair Dataset. :return: A DataFrame containing the evaluation results for each sample. """ # --- Prepare dataset --- mp_dataset_iter = mp_dataset.itertuples() if row_limit: mp_dataset_iter = itertools.islice(mp_dataset_iter, row_limit) n = len(mp_dataset) if not row_limit else row_limit # --- Load model & tokenizer --- is_mlm = task_type == "mlm" results = [] with self.evaluator.load_model(model_repo, is_mlm, device): # --- Evaluate each row --- for row in tqdm(mp_dataset_iter, total=n, desc="Evaluating"): row_results = _init_row_results(row) try: if is_mlm: self._evaluate_row_mlm(row, row_results, evaluation_type, evaluation_cols) else: self._evaluate_row_ntp(row, row_results, evaluation_type, evaluation_cols) except TooManyMasksException: logging.error(f"Too many masks in {getattr(row, 'sentence_id')}") continue except Exception as e: raise RuntimeError(f"Model/tokeniser issue: {e}") from e if evaluation_type == "token-level": # --- Entropy --- certainty_score = self.evaluator.get_entropy_based_certainty( k=entropy_topk, normalise=True ) row_results["certainty"] = certainty_score results.append(row_results) # evaluation_columns = pd.Series(EVAL_TEMPLATE.keys()) results_df = pd.DataFrame(results) self.evaluation_dataset = results_df if save_to is not None: results_df.to_csv(save_to, index=False) return results_df
[docs] def evaluate_from_filepath( self, mp_dataset_filepath: str, model_repo: str, task_type: Literal[Task], evaluation_type: TSEvaluationType = "token-level", # can be "token-level" or "sentence-level" evaluation_cols: list[str] = ["form_grammatical", "form_ungrammatical"], entropy_topk: int = 100, save_to: Optional[str] = None, device: str = "cpu", row_limit: Optional[int] = None, ) -> pd.DataFrame: """ Carries out model evaluation using a Minimal-Pair Dataset generated by Grew-TSE that is provided as a filepath. :param mp_dataset_filepath: the filepath pointing to the Minimal-Pair Dataset. Should be a .csv format. :param model_repo: the Hugging Face model repository link. :param task_type: choose either 'mlm' (masked language modelling) or 'ntp' (next-token prediction). :param entropy_topk: how many probabilities are taken into account for calculating the model uncertainty. :param save_to: a path to save the resulting CSV file to. :param device: the device that you want to use e.g. cpu, gpu. :param row_limit: place a limit on the number of samples / rows evaluated in the Minimal-Pair Dataset. :return: A DataFrame containing the evaluation results for each sample. """ mp_dataset = load_and_validate_mp_dataset(mp_dataset_filepath) return self.evaluate_model( mp_dataset, model_repo, task_type, evaluation_type, evaluation_cols, entropy_topk, save_to, device, row_limit, )
[docs] def load_evaluation_results(self, evaluation_dataset_filepath: str) -> None: """ Load in a set of evaluation results for additional metrics e.g. overall accuracy, average surprisal. :param evaluation_dataset_filepath: the filepath pointing to a set of previously-generated evaluation results. """ path = Path(evaluation_dataset_filepath) if not path.is_file(): raise FileNotFoundError(f"Invalid evaluation file: {path}") try: df = pd.read_csv(path) except Exception as e: raise ValueError(f"Could not read evaluation dataset: {path}") from e if df.empty: raise ValueError("Evaluation dataset is empty") self.evaluation_dataset = df
def _evaluate_row_mlm( self, row, row_results, evaluation_type: TSEvaluationType = "token-level", # can be "token-level" or "sentence-level" evaluation_cols: list[str] = ["form_grammatical", "form_ungrammatical"], ): try: targets = [getattr(row, c) for c in evaluation_cols] if evaluation_type == "token-level": result_probs = self.evaluator.run_mlm_token_level( row.masked_text, targets ) else: result_probs = self.evaluator.run_mlm_sentence_level( row.masked_text, targets ) for c, p in zip(evaluation_cols, result_probs): # define column header for results # note that it is saved as p_[original column] no matter if it's # the original probability or if it's the log probability prob_col_name = f"p_{c}" surprisal_col_name = f"I_{c}" # save probability and surprisal results row_results[prob_col_name] = p is_log_scale = evaluation_type == "sentence-level" row_results[surprisal_col_name] = compute_surprisal(p, is_log=is_log_scale) """ if "ood_minimal_pairs" in row: self._evaluate_ood_pairs( row, row_results, lambda g, u: self.evaluator.run_masked_prediction( row.masked_text, [g, u] ), ) """ except Exception as e: raise RuntimeError( f"Failed to perform masked language modelling for row {getattr(row, 'sentence_id')}: {e}" ) def _evaluate_row_ntp( self, row, row_results, evaluation_type: TSEvaluationType = "token-level", # can be "token-level" or "sentence-level" evaluation_cols: list[str] = ["form_grammatical", "form_ungrammatical"], ): """ You can only run sentence-level NTP calculations currently if you provide the dataset as masked. If you want to run token-level NTP calculations, provide the context before the target word as `prompt_text`. If you want to run sentence-level NTP calculations, provide masked sentences in column `masked_sentence`. This is due to the substitution required where there is multiple evaluation columns. This is a bit hacky and could be improved. """ try: targets = [getattr(row, c) for c in evaluation_cols] if evaluation_type == "token-level": result_probs = self.evaluator.run_ntp_token_level( row.prompt_text, targets ) else: result_probs = self.evaluator.run_ntp_sentence_level_from_masked( row.masked_text, targets ) for c, p in zip(evaluation_cols, result_probs): # define column header for results # note that it is saved as p_[original column] no matter if it's # the original probability or if it's the log probability prob_col_name = f"p_{c}" surprisal_col_name = f"I_{c}" row_results[prob_col_name] = p is_log_scale = evaluation_type == "sentence-level" row_results[surprisal_col_name] = compute_surprisal(p, is_log=is_log_scale) """ if "ood_minimal_pairs" in row: self._evaluate_ood_pairs( row, row_results, lambda g, u: self.evaluator.run_next_word_prediction( row.prompt_text, [g, u] ), ) """ except Exception as e: raise RuntimeError( f"Failed to perform masked next-word prediction for row {getattr(row, 'sentence_id')}: {e}" ) def _evaluate_ood_pairs(self, row, row_results, evaluation_func): ood_pairs = ast.literal_eval(row.ood_pairs) all_ood_probs_gram = [] all_ood_probs_ungram = [] for pair in ood_pairs: prob_gram, prob_ungram = evaluation_func(pair[0], pair[1]) all_ood_probs_gram.append(prob_gram) all_ood_probs_ungram.append(prob_ungram) avg_ood_prob_gram = compute_mean(all_ood_probs_gram) avg_ood_prob_ungram = compute_mean(all_ood_probs_ungram) row_results.update( { "ood_avg_p_grammatical": avg_ood_prob_gram, "ood_avg_p_ungrammatical": avg_ood_prob_ungram, "ood_avg_I_grammatical": compute_surprisal(avg_ood_prob_gram), "ood_avg_I_ungrammatical": compute_surprisal(avg_ood_prob_ungram), } )
[docs] def get_avg_surprisal_difference( self, grammatical_column: str = "p_form_grammatical", ungrammatical_column: str = "p_form_ungrammatical", ) -> float: """ Get the normalised average surprisal difference (ASD). A higher score indicates that the model, on average, tends towards being more confident in the grammatical word over the ungrammatical one. However, this is not quite fully accurate and as with any average ASD scores may suffer from outliers skewing the result. :return: the value of the normalised ASD. """ if self.evaluation_dataset is None: raise KeyError("Please evaluate a model first or load evaluation results.") return compute_average_surprisal_difference( self.evaluation_dataset[grammatical_column], self.evaluation_dataset[ungrammatical_column], )
[docs] def get_norm_avg_surprisal_difference( self, grammatical_column: str = "p_form_grammatical", ungrammatical_column: str = "p_form_ungrammatical", ) -> float: """ Get the normalised average surprisal difference (ASD). A higher score indicates that the model, on average, tends towards being more confident in the grammatical word over the ungrammatical one. However, this is not quite fully accurate and as with any average ASD scores may suffer from outliers skewing the result. This normalised version simply calculates (Average Grammatical Surprisal - Average Ungrammatical Surprisal) / Average Grammatical Surprisal :param grammatical_column: a string representing the name of the grammatical column. :param ungrammatical_column: a string representing the name of the ungrammatical column. :return: the value of the normalised ASD. """ if self.evaluation_dataset is None: raise KeyError("Please evaluate a model first or load evaluation results.") return compute_normalised_surprisal_difference( self.evaluation_dataset[grammatical_column], self.evaluation_dataset[ungrammatical_column], )
[docs] def get_avg_certainty(self) -> float: """ Average certainty of the model computed over all probability distributions. """ if self.evaluation_dataset is None: raise KeyError("Please evaluate a model first or load evaluation results.") else: return round( compute_mean(self.evaluation_dataset["certainty"].to_list()), 2 )
[docs] def get_accuracy( self, grammatical_column: str = "p_form_grammatical", ungrammatical_column: Union[str,List[str]] = "p_form_ungrammatical", ) -> float: """ Get the proportion of the time that the model predicts the grammatical form over the ungrammatical form. A value of -1 indicates something went wrong. :param grammatical_column: a string representing the name of the grammatical column. :param ungrammatical_column: a string representing the name of the ungrammatical column. :return: a float between 0 and 1, where 1 is 100% accuracy. """ g_form_probs = self._get_grammatical_form_probs(grammatical_column) if isinstance(ungrammatical_column, str): ug_form_probs = self._get_ungrammatical_form_probs(ungrammatical_column) else: ug_form_probs = [self._get_grammatical_form_probs(ug_col) for ug_col in ungrammatical_column] return compute_accuracy( g_form_probs, ug_form_probs )
def _get_grammatical_form_probs( self, col: str = "p_form_grammatical" ) -> Optional[pd.Series]: if ( self.evaluation_dataset is not None and col in self.evaluation_dataset.columns ): g_form_probs = self.evaluation_dataset[col] if isinstance(g_form_probs, pd.Series): return g_form_probs else: raise KeyError("Please evaluate a model first.") def _get_ungrammatical_form_probs( self, col: str = "p_form_ungrammatical" ) -> Optional[pd.Series]: if ( self.evaluation_dataset is not None and col in self.evaluation_dataset.columns ): ug_form_probs = self.evaluation_dataset[col] if isinstance(ug_form_probs, pd.Series): return ug_form_probs else: raise KeyError("Please evaluate a model first.")
class Evaluator: def __init__(self): self.tokeniser: Optional[PreTrainedTokenizerBase] = None self.model: Optional[PreTrainedModel] = None self.mask_token_index: int = -1 self.mask_probs: Optional[torch.Tensor] = None self.logits: Optional[torch.Tensor] = None self.device: Optional[str] = None def setup_parameters( self, model_name: str, is_mlm: bool = True, device: str = "cpu" ) -> Tuple[PreTrainedModel, PreTrainedTokenizerBase]: self.tokeniser = AutoTokenizer.from_pretrained(model_name) if is_mlm: self.model = AutoModelForMaskedLM.from_pretrained(model_name) else: self.model = AutoModelForCausalLM.from_pretrained(model_name) # set to eval mode, disabling things like dropout self.device = device self.model.to(device) self.model.eval() return self.model, self.tokeniser @contextmanager def load_model(self, model_name: str, is_mlm: bool = True, device: str = "cpu"): """Context manager for proper model cleanup.""" try: self.setup_parameters(model_name, is_mlm, device) yield self finally: if self.model is not None: del self.model if torch.cuda.is_available(): torch.cuda.empty_cache() self.model = None self.tokeniser = None def run_mlm_token_level(self, masked_sentence: str, targets: List[str]) -> List[float]: """ This gets the token-level probability for an MLM model including the full context around the mask. """ if not self.model or not self.tokeniser: raise RuntimeError("Model and tokenizer must be loaded before prediction.") device = next(self.model.parameters()).device masked_sentence = masked_sentence.replace("[MASK]", self.tokeniser.mask_token) sentence_ids = self.tokeniser.encode(masked_sentence, add_special_tokens=False) result_probs = [] for t_i in targets: word_ids = self.tokeniser.encode(t_i, add_special_tokens=False) target_word_prob = self._compute_mlm_prob_token_level( sentence_ids, word_ids=word_ids, device=device, include_context_after_target=True, keep_in_log_space=False ) result_probs.append(target_word_prob) return result_probs def run_mlm_sentence_level(self, masked_sentence: str, targets: List[str]) -> List[float]: """ This gets the sentence-level probability for each target word applied to a masked sentence for an MLM model. """ if not self.model or not self.tokeniser: raise RuntimeError("Model and tokenizer must be loaded before prediction.") device = next(self.model.parameters()).device mask_token_id = self.tokeniser.mask_token_id result_probs = [] for t_i in targets: full_sentence = masked_sentence.replace("[MASK]", t_i) sentence_ids = self.tokeniser.encode(full_sentence, add_special_tokens=False) curr_tokens = [] total_log_prob = 0.0 for i,token_id in enumerate(sentence_ids): if i == 0: curr_tokens.append(token_id) continue # add a mask at the end of the sentence for the next # calculation next_masked_context = curr_tokens next_masked_context.append(mask_token_id) next_token_prob = self._compute_mlm_prob_token_level( next_masked_context, word_ids=[token_id], device=device, include_context_after_target=False, keep_in_log_space=True ) curr_tokens.append(token_id) total_log_prob += next_token_prob result_probs.append(total_log_prob) return result_probs def _compute_mlm_prob_token_level( self, input_ids: List[int], word_ids: List[int], device, include_context_after_target:bool=True, keep_in_log_space:bool=False ) -> float: """ Computes autoregressive approximation for a word (devided into tokens i.e. word_ids) E.g. p(eat | The boy [MASK] every day) * p(s | The boy eat[MASK] every day) If include_context_after_target is set to False, the above would be calculated as: p(eat | The boy [MASK]) * p(s | The boy eat[MASK]) """ mask_index = input_ids.index(self.tokeniser.mask_token_id) input_ids_tensor = torch.tensor([input_ids], device=device) if not include_context_after_target: input_ids = input_ids[:mask_index+1] log_prob = 0.0 for i, tid in enumerate(word_ids): with torch.no_grad(): logits = self.model(input_ids_tensor).logits probs = F.softmax(logits[:, mask_index, :], dim=-1) token_prob = probs[0, tid].item() log_prob += math.log(max(token_prob, 1e-12)) if i == 0: self.mask_probs = probs # Replace mask with predicted token input_ids_tensor[0, mask_index] = tid # Insert new mask if more tokens remain if i < len(word_ids) - 1: new_length = input_ids_tensor.size(1) + 1 new_tensor = torch.empty((1, new_length), dtype=torch.long, device=device) new_tensor[:, :mask_index + 1] = input_ids_tensor[:, :mask_index + 1] new_tensor[:, mask_index + 1] = self.tokeniser.mask_token_id new_tensor[:, mask_index + 2:] = input_ids_tensor[:, mask_index + 1:] input_ids_tensor = new_tensor mask_index += 1 if keep_in_log_space: return log_prob else: return math.exp(log_prob) def run_ntp_token_level(self, context: str, targets: list[str]) -> list[float]: """ This gets the token-level probability given a context. """ if not self.model or not self.tokeniser: raise RuntimeError("Model and tokenizer must be loaded before prediction.") context_ids = self.tokeniser.encode(context, add_special_tokens=False) device = next(self.model.parameters()).device probs = [] for t in targets: t_ids = self.tokeniser.encode(t, add_special_tokens=False) t_prob = self._compute_ntp_token_level( context_ids, t_ids, device ) probs.append(t_prob) return probs def run_ntp_sentence_level_from_masked(self, masked_sentence: str, targets: list[str], use_sentence_joint:bool=False): """ This gets the sentence=level autoregressive probability for NTP but from masked sentences. This is handy for re-using the same dataset when running evaluations for both MLM and NTP models. """ if not self.model or not self.tokeniser: raise RuntimeError("Model and tokenizer must be loaded before prediction.") probs = [] for t_i in targets: sentence_with_target_i = masked_sentence.replace("[MASK]", t_i) total_log_prob_t_i = self.run_ntp_sentence_level(sentence_with_target_i) probs.append(total_log_prob_t_i) return probs def run_ntp_sentence_level(self, full_sentence: str) -> float: """ This gets the sentence-level autoregressive probability for an NTP model. In this case, you provide the whole sentence with the word already in the string. This is an abstraction of the logic for individual token-level calculations, just applied over the whole sentence from beginning to end. E.g. log[p(t1)] + log[p(t2 | t1)] + log[p(t3 | t1,t2)] + log[p(t4 | t1,t2,t3)] """ if not self.model or not self.tokeniser: raise RuntimeError("Model and tokenizer must be loaded before prediction.") sentence_ids = self.tokeniser.encode(full_sentence, add_special_tokens=False) device = next(self.model.parameters()).device curr_tokens = [] total_log_prob = 0.0 for i,token_id in enumerate(sentence_ids): if i == 0: curr_tokens.append(token_id) continue next_token_prob = self._compute_ntp_token_level( curr_tokens, [token_id], device ) curr_tokens.append(token_id) total_log_prob += math.log(next_token_prob,2) return total_log_prob def _compute_ntp_token_level( self, input_ids: List[int], word_ids: List[int], device ) -> float: """ This gets the token-level autoregressive probability for NTP given a context (the tokens before the target) and a following set of target tokens (typically amounting to a word). The `input_ids` represent the token IDs for the context before the target word. The `word_ids` represent the the token IDs for the target word. """ input_ids_tensor = torch.tensor([input_ids], device=device) log_prob = 0.0 for i, tid in enumerate(word_ids): with torch.no_grad(): logits = self.model(input_ids_tensor).logits index = input_ids_tensor.shape[1] - 1 # last token position probs = F.softmax(logits[:, index, :], dim=-1) token_prob = probs[0, tid].item() log_prob += math.log(max(token_prob, 1e-12)) if i == 0: self.mask_probs = probs # Append predicted token to context input_ids_tensor = torch.cat( [input_ids_tensor, torch.tensor([[tid]], device=device)], dim=1 ) return math.exp(log_prob) def get_entropy_based_certainty( self, k: int = 100, normalise: bool = False ) -> float: """Compute an entropic certainty score over the prediction distribution. k: Number of top tokens to consider. normalise: Whether to normalise entropy. Returns: :returns: Certainty value based on entropy calculations over token probabiliity distribution. """ if self.mask_probs is None: raise ValueError("No output probabilities available. Run evaluation first.") return compute_entropy_based_certainty(self.mask_probs[0], k) def _get_mask_index(self, inputs: Any) -> int: if "input_ids" not in inputs: raise ValueError("Missing 'input_ids' in inputs.") if self.tokeniser.mask_token_id is None: raise ValueError("The tokeniser does not have a defined mask_token_id.") input_ids = inputs["input_ids"] mask_positions = torch.where(input_ids == self.tokeniser.mask_token_id) if len(mask_positions[0]) == 0: raise ValueError("No mask token found in input_ids.") elif len(mask_positions[0]) > 1: raise ValueError("Multiple mask tokens found; expected only one.") return ( mask_positions[1].item() if len(mask_positions) > 1 else mask_positions[0].item() ) def _get_mask_probabilities( self, mask_token_index: int, logits: Any ) -> torch.Tensor: mask_logits = logits[0, mask_token_index, :] probs = F.softmax(mask_logits, dim=-1) # shape: (vocab_size, ) return probs