"""Likelihood testing scenario from https://arxiv.org/pdf/2112.03570.pdf.
See p.6 (top of second column) for details.
With mode "offline", we measure the probability of observing a
confidence as high as the target model's under the null-hypothesis that
the target point is a non-member. That is we, use the norm CDF.
With mode "offline-carlini", we measure the probability that a target point
did not come from the non-member distribution. That is, we use Carlini's
implementation with a single norm (log) PDF.
With mode "online-carlini", we use Carlini's implementation of the standard
likelihood ratio test, measuring the ratio of probabilities the sample came
from the two distributions. That is, the (log) PDF of pr_in minus pr_out.
"""
from __future__ import annotations
import logging
import numpy as np
from fpdf import FPDF
from scipy.stats import norm
from sacroml import metrics
from sacroml.attacks import report, utils
from sacroml.attacks.attack import Attack
from sacroml.attacks.model import Model
from sacroml.attacks.target import Target
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
EPS: float = 1e-16 # Used to avoid numerical issues
[docs]
class LIRAAttack(Attack):
"""The main LiRA Attack class."""
[docs]
def __init__(
self,
output_dir: str = "outputs",
write_report: bool = True,
n_shadow_models: int = 100,
p_thresh: float = 0.05,
mode: str = "offline",
fix_variance: bool = False,
report_individual: bool = False,
) -> None:
"""Construct an object to execute a LiRA attack.
Parameters
----------
output_dir : str
Name of the directory where outputs are stored.
write_report : bool
Whether to generate a JSON and PDF report.
n_shadow_models : int
Number of shadow models to be trained.
p_thresh : float
Threshold to determine significance of things. For instance
auc_p_value and pdif_vals.
mode : str
Attack mode: {"offline", "offline-carlini", "online-carlini"}
fix_variance : bool
Whether to use the global standard deviation or per record.
report_individual : bool
Whether to report metrics for each individual record.
"""
super().__init__(output_dir=output_dir, write_report=write_report)
self.n_shadow_models: int = n_shadow_models
self.p_thresh: float = p_thresh
self.mode: str = mode
self.fix_variance: bool = fix_variance
self.report_individual: bool = report_individual
self.result: dict = {} # individual record results
if self.report_individual:
self.result["score"] = []
self.result["label"] = []
self.result["target_logit"] = []
self.result["out_p_norm"] = []
self.result["out_prob"] = []
self.result["out_mean"] = []
self.result["out_std"] = []
if self.mode == "online-carlini":
self.result["in_prob"] = []
self.result["in_mean"] = []
self.result["in_std"] = []
def __str__(self):
"""Return the name of the attack."""
return "LiRA Attack"
[docs]
@classmethod
def attackable(cls, target: Target) -> bool: # pragma: no cover
"""Return whether a target can be assessed with LIRAAttack."""
required_methods = [
"clone",
"predict_proba",
"predict",
"get_classes",
"set_params",
]
if (
target.has_model()
and target.has_data()
and all(hasattr(target.model, method) for method in required_methods)
):
return True
logger.info("WARNING: LiRA requires a loadable model.")
return False
def _attack(self, target: Target) -> dict:
"""Run a LiRA attack from a Target object and a target model.
Parameters
----------
target : attacks.target.Target
target as an instance of the Target class.
Returns
-------
dict
Attack report.
"""
# prepare
shadow_clf = target.model.clone()
target = utils.check_and_update_dataset(target)
# execute attack
self._run(
shadow_clf,
target.X_train,
target.y_train,
target.model.predict_proba(target.X_train),
target.X_test,
target.y_test,
target.model.predict_proba(target.X_test),
)
# create the report
output = self._make_report(target)
# write the report
self._write_report(output)
# return the report
return output
def _run(
self,
shadow_clf: Model,
X_train: np.ndarray,
y_train: np.ndarray,
proba_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
proba_test: np.ndarray,
) -> None:
"""Run the likelihood test.
Parameters
----------
shadow_clf : Model
A classifier that will be trained to form the shadow models.
X_train : np.ndarray
Data that was used to train the target model.
y_train : np.ndarray
Labels that were used to train the target model.
proba_train : np.ndarray
Array of predictions produced by the target model on the training data.
X_test : np.ndarray
Data that will be used to train the shadow models.
y_test : np.ndarray
Labels that will be used to train the shadow models.
proba_test : np.ndarray
Array of predictions produced by the target model on the shadow data.
"""
logger.info("Running %s LiRA, fix_variance=%s", self.mode, self.fix_variance)
n_train_rows: int = X_train.shape[0]
n_shadow_rows: int = X_test.shape[0]
combined_data: dict[str, np.ndarray] = {
"features": np.vstack((X_train, X_test)),
"labels": np.hstack((y_train, y_test)),
"predictions": np.vstack((proba_train, proba_test)),
}
utils.train_shadow_models(
shadow_clf=shadow_clf,
combined_x_train=combined_data["features"],
combined_y_train=combined_data["labels"],
n_train_rows=n_train_rows,
n_shadow_models=self.n_shadow_models,
shadow_path=self.shadow_path,
)
out_conf, in_conf = self._get_shadow_signals(
combined_data["features"],
combined_data["labels"],
)
mia_scores, n_normal = self._compute_scores(
combined_data["labels"], combined_data["predictions"], out_conf, in_conf
)
self._save_attack_metrics(mia_scores, n_train_rows, n_shadow_rows, n_normal)
logger.info("Finished scenario")
def _get_shadow_signals(
self,
combined_x_train: np.ndarray,
combined_y_train: np.ndarray,
) -> tuple[dict[int, list[float]], dict[int, list[float]]]:
"""Return confidence scores from saved shadow models.
Parameters
----------
combined_x_train : np.ndarray
Array of combined train and test features.
combined_y_train : np.ndarray
Array of combined train and test labels.
Returns
-------
tuple[dict, dict]
Dictionary of confidences when not in the training set.
Dictionary of confidences when in the training set.
"""
n_combined: int = combined_x_train.shape[0]
out_conf: dict[int, list[float]] = {i: [] for i in range(n_combined)}
in_conf: dict[int, list[float]] = {i: [] for i in range(n_combined)}
logger.info("Getting signals from %d shadow models", self.n_shadow_models)
for model_idx in range(self.n_shadow_models):
# load shadow model
shadow_clf, indices_train, _ = utils.get_shadow_model(
self.shadow_path, model_idx
)
# map a class to a column
class_map = {c: i for i, c in enumerate(shadow_clf.get_classes())}
# generate shadow confidences
shadow_confidences = shadow_clf.predict_proba(combined_x_train)
indices_train = set(indices_train)
for i, conf in enumerate(shadow_confidences):
# logit of the correct class
label = class_map.get(combined_y_train[i], -1)
# Occasionally, the random data split will result in classes being
# absent from the training set. In these cases label will be -1 and
# we include logit(0) instead of discarding
logit = utils.logit(0) if label < 0 else utils.logit(conf[label])
if i not in indices_train:
out_conf[i].append(logit)
else:
in_conf[i].append(logit)
return out_conf, in_conf
def _compute_scores(
self,
combined_y_train: np.ndarray,
combined_target_preds: np.ndarray,
out_conf: dict[int, list[float]],
in_conf: dict[int, list[float]],
) -> tuple[list[list[float]], int]:
"""Compute LiRA scores for each record."""
logger.info("Computing scores")
mia_scores: list[list[float]] = []
n_normal: int = 0
global_in_std: float = self._get_global_std(in_conf)
global_out_std: float = self._get_global_std(out_conf)
for i, label in enumerate(combined_y_train):
logit: float = utils.logit(combined_target_preds[i, label])
out_mean, out_std = self._get_mean_std(out_conf[i], global_out_std)
in_mean, in_std = self._get_mean_std(in_conf[i], global_in_std)
pr_in, pr_out = self._get_probabilities(
logit=logit,
out_mean=out_mean,
out_std=out_std,
in_mean=in_mean,
in_std=in_std,
mode=self.mode,
)
mia_scores.append([pr_in, pr_out])
if utils.get_p_normal(np.array(out_conf[i])) <= 0.05:
n_normal += 1
if self.report_individual:
out_p_norm: float = utils.get_p_normal(np.array(out_conf[i]))
self.result["label"].append(label)
self.result["target_logit"].append(logit)
self.result["out_p_norm"].append(out_p_norm)
self.result["out_prob"].append(pr_out)
self.result["out_mean"].append(out_mean)
self.result["out_std"].append(out_std + EPS)
if self.mode == "online-carlini":
self.result["in_prob"].append(pr_in)
self.result["in_mean"].append(in_mean)
self.result["in_std"].append(in_std + EPS)
return mia_scores, n_normal
def _get_probabilities(
self,
logit: float,
out_mean: float,
out_std: float,
in_mean: float,
in_std: float,
mode: str,
) -> tuple[float, float]:
"""Calculate probabilities based on the selected mode."""
if mode == "offline":
pr_out = norm.cdf(logit, loc=out_mean, scale=out_std + EPS)
pr_in = 1 - pr_out
elif mode == "online-carlini":
pr_out = -norm.logpdf(logit, out_mean, out_std + EPS)
pr_in = -norm.logpdf(logit, in_mean, in_std + EPS)
pr_in = pr_in - pr_out
pr_out = -pr_in
elif mode == "offline-carlini":
pr_out = -norm.logpdf(logit, out_mean, out_std + EPS)
pr_in = -pr_out
else:
raise ValueError(f"Unsupported LiRA mode: {mode}")
return float(pr_in), float(pr_out)
def _save_attack_metrics(
self,
mia_scores: list[list[float]],
n_train_rows: int,
n_shadow_rows: int,
n_normal: int,
) -> None:
"""Save attack metrics and individual results."""
mia_clf = self._DummyClassifier()
mia_scores_array = np.array(mia_scores)
mia_labels = np.array([1] * n_train_rows + [0] * n_shadow_rows)
y_pred_proba = mia_clf.predict_proba(mia_scores_array)
self.attack_metrics = [metrics.get_metrics(y_pred_proba, mia_labels)]
self.attack_metrics[-1]["n_normal"] = n_normal / (n_train_rows + n_shadow_rows)
if self.report_individual:
self.result["score"] = [score[1] for score in mia_scores]
self.result["member"] = mia_labels
self.attack_metrics[-1]["individual"] = self.result
def _get_mean_std(
self, confidences: list[float], global_std: float
) -> tuple[float, float]:
"""Return the mean and standard deviation of a list of confidences."""
scores: np.ndarray = np.array(confidences)
mean: float = 0
std: float = 0
if not np.isnan(scores).all():
mean = float(np.nanmean(scores))
std = float(np.nanstd(scores))
if self.fix_variance:
std = global_std
return mean, std
def _get_global_std(self, confidences: dict[int, list[float]]) -> float:
"""Return the global standard deviation."""
global_std: float = 0
if self.fix_variance:
# requires conversion from a dict of diff size proba lists
arrays = list(confidences.values())
combined = np.concatenate(arrays)
if not np.isnan(combined).all():
global_std = float(np.nanstd(combined))
return global_std
def _construct_metadata(self) -> None:
"""Construct the metadata object."""
super()._construct_metadata()
pdif = np.exp(-self.attack_metrics[0]["PDIF01"])
self.metadata["global_metrics"]["PDIF_sig"] = (
f"Significant at p={self.p_thresh}"
if pdif <= self.p_thresh
else f"Not significant at p={self.p_thresh}"
)
auc_p, auc_std = metrics.auc_p_val(
self.attack_metrics[0]["AUC"],
self.attack_metrics[0]["n_pos_test_examples"],
self.attack_metrics[0]["n_neg_test_examples"],
)
self.metadata["global_metrics"]["AUC_sig"] = (
f"Significant at p={self.p_thresh}"
if auc_p <= self.p_thresh
else f"Not significant at p={self.p_thresh}"
)
self.metadata["global_metrics"]["null_auc_3sd_range"] = (
f"{0.5 - 3 * auc_std} -> {0.5 + 3 * auc_std}"
)
def _make_pdf(self, output: dict) -> FPDF:
"""Create PDF report."""
return report.create_lr_report(output)
def _get_attack_metrics_instances(self) -> dict:
"""Construct the metadata object after attacks."""
attack_metrics_experiment = {}
attack_metrics_instances = {}
for rep, _ in enumerate(self.attack_metrics):
attack_metrics_instances["instance_" + str(rep)] = self.attack_metrics[rep]
attack_metrics_experiment["attack_instance_logger"] = attack_metrics_instances
return attack_metrics_experiment
class _DummyClassifier:
"""A Dummy Classifier to allow this code to work with get_metrics."""
def predict(self, X_test):
"""Return an array of 1/0 depending on value in second column."""
return 1 * (X_test[:, 1] > 0.5)
def predict_proba(self, X_test):
"""Simply return the X_test."""
return X_test