"""Likelihood testing scenario from https://arxiv.org/pdf/2112.03570.pdf."""
from __future__ import annotations
import contextlib
import logging
import numpy as np
import sklearn
from fpdf import FPDF
from scipy.stats import norm, shapiro
from sacroml import metrics
from sacroml.attacks import report
from sacroml.attacks.attack import Attack
from sacroml.attacks.target import Target
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
EPS = 1e-16 # Used to avoid numerical issues
[docs]
class LIRAAttack(Attack):
"""The main LiRA Attack class."""
[docs]
def __init__( # pylint: disable=too-many-arguments
self,
output_dir: str = "outputs",
write_report: bool = True,
n_shadow_models: int = 100,
p_thresh: float = 0.05,
mode: str = "offline",
fix_variance: bool = False,
report_individual: bool = False,
) -> None:
"""Construct an object to execute a LiRA attack.
Parameters
----------
output_dir : str
Name of the directory where outputs are stored.
write_report : bool
Whether to generate a JSON and PDF report.
n_shadow_models : int
Number of shadow models to be trained.
p_thresh : float
Threshold to determine significance of things. For instance
auc_p_value and pdif_vals.
mode : str
Attack mode: {"offline", "offline-carlini", "online-carlini"}
fix_variance : bool
Whether to use the global standard deviation or per record.
report_individual : bool
Whether to report metrics for each individual record.
"""
super().__init__(output_dir=output_dir, write_report=write_report)
self.n_shadow_models: int = n_shadow_models
self.p_thresh: float = p_thresh
self.mode: str = mode
self.fix_variance: bool = fix_variance
self.report_individual: bool = report_individual
self.result: dict = {} # individual record results
if self.report_individual:
self.result["score"] = []
self.result["label"] = []
self.result["target_logit"] = []
self.result["out_p_norm"] = []
self.result["out_prob"] = []
self.result["out_mean"] = []
self.result["out_std"] = []
if self.mode == "online-carlini":
self.result["in_prob"] = []
self.result["in_mean"] = []
self.result["in_std"] = []
def __str__(self):
"""Return the name of the attack."""
return "LiRA Attack"
[docs]
def attack(self, target: Target) -> dict:
"""Run a LiRA attack from a Target object and a target model.
Parameters
----------
target : attacks.target.Target
target as an instance of the Target class.
Returns
-------
dict
Attack report.
"""
# prepare
shadow_clf = sklearn.base.clone(target.model)
target = self._check_and_update_dataset(target)
# execute attack
self._run(
shadow_clf,
target.X_train,
target.y_train,
target.model.predict_proba(target.X_train),
target.X_test,
target.y_test,
target.model.predict_proba(target.X_test),
)
# create the report
output = self._make_report(target)
# write the report
self._write_report(output)
# return the report
return output
def _check_and_update_dataset(self, target: Target) -> Target:
"""Check that it is safe to use class variables to index prediction arrays.
This has two steps:
1. Replacing the values in y_train with their position in
target.model.classes (will normally result in no change)
2. Removing from the test set any rows corresponding to classes that
are not in the training set.
"""
y_train_new = []
classes = list(target.model.classes_)
for y in target.y_train:
y_train_new.append(classes.index(y))
target.y_train = np.array(y_train_new, int)
logger.info(
"new y_train has values and counts: %s",
np.unique(target.y_train, return_counts=True),
)
ok_pos = []
y_test_new = []
for i, y in enumerate(target.y_test):
if y in classes:
ok_pos.append(i)
y_test_new.append(classes.index(y))
if len(y_test_new) != len(target.X_test):
target.X_test = target.X_test[ok_pos, :]
target.y_test = np.array(y_test_new, int)
logger.info(
"new y_test has values and counts: %s",
np.unique(target.y_test, return_counts=True),
)
return target
def _run( # pylint: disable=too-many-arguments,too-many-locals
self,
shadow_clf: sklearn.base.BaseEstimator,
X_train: np.ndarray,
y_train: np.ndarray,
proba_train: np.ndarray,
X_test: np.ndarray,
y_test: np.ndarray,
proba_test: np.ndarray,
) -> None:
"""Run the likelihood test.
See p.6 (top of second column) for details.
With mode "offline", we measure the probability of observing a
confidence as high as the target model's under the null-hypothesis that
the target point is a non-member. That is we, use the norm CDF.
With mode "offline-carlini", we measure the probability that a target point
did not come from the non-member distribution. That is, we use Carlini's
implementation with a single norm (log) PDF.
With mode "online-carlini", we use Carlini's implementation of the standard
likelihood ratio test, measuring the ratio of probabilities the sample came
from the two distributions. That is, the (log) PDF of pr_in minus pr_out.
Parameters
----------
shadow_clf : sklearn.Model
An sklearn classifier that will be trained to form the shadow models.
All hyperparameters should have been set.
X_train : np.ndarray
Data that was used to train the target model.
y_train : np.ndarray
Labels that were used to train the target model.
proba_train : np.ndarray
Array of predictions produced by the target model on the training data.
X_test : np.ndarray
Data that will be used to train the shadow models.
y_test : np.ndarray
Labels that will be used to train the shadow models.
proba_test : np.ndarray
Array of predictions produced by the target model on the shadow data.
"""
logger.info("Running %s LiRA, fix_variance=%s", self.mode, self.fix_variance)
# Combine target and shadow train, from which to sample datasets
n_train_rows, _ = X_train.shape
n_shadow_rows, _ = X_test.shape
combined_x_train = np.vstack((X_train, X_test))
combined_y_train = np.hstack((y_train, y_test))
combined_target_preds = np.vstack((proba_train, proba_test))
# Get the confidences of samples when in and not in the training set
out_conf, in_conf = self._train_shadow_models(
shadow_clf,
combined_x_train,
combined_y_train,
n_train_rows,
)
# Get the LiRA scores, and how many confidences were normally distributed
mia_scores, n_normal = self._compute_scores(
combined_y_train, combined_target_preds, out_conf, in_conf
)
# Save metrics
mia_clf = self._DummyClassifier()
mia_scores = np.array(mia_scores)
mia_labels = np.array([1] * n_train_rows + [0] * n_shadow_rows)
y_pred_proba = mia_clf.predict_proba(mia_scores)
self.attack_metrics = [metrics.get_metrics(y_pred_proba, mia_labels)]
self.attack_metrics[-1]["n_normal"] = n_normal / (n_train_rows + n_shadow_rows)
if self.report_individual:
self.result["score"] = [score[1] for score in mia_scores]
self.result["member"] = mia_labels
self.attack_metrics[-1]["individual"] = self.result
logger.info("Finished scenario")
def _compute_scores( # pylint: disable=too-many-locals
self,
combined_y_train: np.ndarray,
combined_target_preds: np.ndarray,
out_conf: dict[list[float]],
in_conf: dict[list[float]],
) -> tuple[list[list[float]], int]:
"""Compute LiRA scores for each record."""
logger.info("Computing scores")
mia_scores: list[list[float]] = []
n_normal: int = 0
global_in_std: float = self._get_global_std(in_conf)
global_out_std: float = self._get_global_std(out_conf)
# score each record in the member and non-member sets
for i, label in enumerate(combined_y_train):
# get the target model behaviour on the record
target_logit: float = _logit(combined_target_preds[i, label])
# get behaviour observed with the record as a non-member
out_mean, out_std = self._describe_conf(out_conf[i], global_out_std)
# get behaviour observed with the record as a member
in_mean, in_std = self._describe_conf(in_conf[i], global_in_std)
# compare behaviour
if self.mode == "offline":
pr_out = norm.cdf(target_logit, loc=out_mean, scale=out_std + EPS)
pr_in = 1 - pr_out
elif self.mode == "online-carlini":
pr_out = -norm.logpdf(target_logit, out_mean, out_std + EPS)
pr_in = -norm.logpdf(target_logit, in_mean, in_std + EPS)
# ratio
pr_in = pr_in - pr_out
pr_out = -pr_in
elif self.mode == "offline-carlini":
pr_out = -norm.logpdf(target_logit, out_mean, out_std + EPS)
pr_in = -pr_out
else:
raise ValueError(f"Unsupported LiRA mode: {self.mode}")
mia_scores.append([pr_in, pr_out])
# test the non-member samples for normality
out_p_norm = self._get_p_normal(np.array(out_conf[i]))
if out_p_norm <= 0.05:
n_normal += 1
# save individual record result
if self.report_individual:
self.result["label"].append(label)
self.result["target_logit"].append(target_logit)
self.result["out_p_norm"].append(out_p_norm)
self.result["out_prob"].append(pr_out)
self.result["out_mean"].append(out_mean)
self.result["out_std"].append(out_std + EPS)
if self.mode == "online-carlini":
self.result["in_prob"].append(pr_in)
self.result["in_mean"].append(in_mean)
self.result["in_std"].append(in_std + EPS)
return mia_scores, n_normal
def _describe_conf(
self, confidences: list[float], global_std: float
) -> tuple[float, float]:
"""Return the mean and standard deviation of a list of confidences."""
scores: np.ndarray = np.array(confidences)
mean: float = 0
std: float = 0
if not np.isnan(scores).all():
mean = np.nanmean(scores)
std = np.nanstd(scores)
if self.fix_variance:
std = global_std
return mean, std
def _get_global_std(self, confidences: dict[str, list[float]]) -> float:
"""Return the global standard deviation."""
global_std: float = 0
if self.fix_variance:
# requires conversion from a dict of diff size proba lists
arrays = list(confidences.values())
combined = np.concatenate(arrays)
if not np.isnan(combined).all():
global_std = np.nanstd(combined)
return global_std
def _get_p_normal(self, samples: np.ndarray) -> float:
"""Test whether a set of samples is normally distributed."""
p_normal: float = np.NaN
if np.nanvar(samples) > EPS:
with contextlib.suppress(ValueError):
_, p_normal = shapiro(samples)
return p_normal
def _train_shadow_models( # pylint: disable=too-many-locals
self,
shadow_clf: sklearn.base.BaseEstimator,
combined_x_train: np.ndarray,
combined_y_train: np.ndarray,
n_train_rows: int,
) -> tuple[dict, dict]:
"""Train shadow models and return confidence scores.
Parameters
----------
shadow_clf : sklearn.base.BaseEstimator
An sklearn classifier that will be trained to form the shadow models.
combined_x_train : np.ndarray
Array of combined train and test features.
combined_y_train : np.ndarray
Array of combined train and test labels.
n_train_rows : int
Number of samples in the training set.
Returns
-------
tuple[dict, dict]
Dictionary of confidences when not in the training set.
Dictionary of confidences when in the training set.
"""
logger.info("Training shadow models")
n_combined, _ = combined_x_train.shape
out_conf: dict = {i: [] for i in range(n_combined)}
in_conf: dict = {i: [] for i in range(n_combined)}
indices: np.ndarray = np.arange(0, n_combined, 1)
for model_idx in range(self.n_shadow_models):
if model_idx % 10 == 0:
logger.info("Trained %d models", model_idx)
# Pick the indices to use for training this one
np.random.seed(model_idx) # Reproducibility
these_idx = np.random.choice(indices, n_train_rows, replace=False)
# Fit the shadow model
shadow_clf.set_params(random_state=model_idx)
shadow_clf.fit(
combined_x_train[these_idx, :],
combined_y_train[these_idx],
)
# map a class to a column
class_map = {c: i for i, c in enumerate(shadow_clf.classes_)}
# generate shadow confidences
shadow_confidences = shadow_clf.predict_proba(combined_x_train)
these_idx = set(these_idx)
for i, conf in enumerate(shadow_confidences):
# logit of the correct class
label = class_map.get(combined_y_train[i], -1)
# Occasionally, the random data split will result in classes being
# absent from the training set. In these cases label will be -1 and
# we include logit(0) instead of discarding
logit = _logit(0) if label < 0 else _logit(conf[label])
if i not in these_idx:
out_conf[i].append(logit)
else:
in_conf[i].append(logit)
return out_conf, in_conf
def _construct_metadata(self) -> None:
"""Construct the metadata object."""
super()._construct_metadata()
pdif = np.exp(-self.attack_metrics[0]["PDIF01"])
self.metadata["global_metrics"]["PDIF_sig"] = (
f"Significant at p={self.p_thresh}"
if pdif <= self.p_thresh
else f"Not significant at p={self.p_thresh}"
)
auc_p, auc_std = metrics.auc_p_val(
self.attack_metrics[0]["AUC"],
self.attack_metrics[0]["n_pos_test_examples"],
self.attack_metrics[0]["n_neg_test_examples"],
)
self.metadata["global_metrics"]["AUC_sig"] = (
f"Significant at p={self.p_thresh}"
if auc_p <= self.p_thresh
else f"Not significant at p={self.p_thresh}"
)
self.metadata["global_metrics"]["null_auc_3sd_range"] = (
f"{0.5 - 3 * auc_std} -> {0.5 + 3 * auc_std}"
)
def _make_pdf(self, output: dict) -> FPDF:
"""Create PDF report."""
return report.create_lr_report(output)
def _get_attack_metrics_instances(self) -> dict:
"""Construct the metadata object after attacks."""
attack_metrics_experiment = {}
attack_metrics_instances = {}
for rep, _ in enumerate(self.attack_metrics):
attack_metrics_instances["instance_" + str(rep)] = self.attack_metrics[rep]
attack_metrics_experiment["attack_instance_logger"] = attack_metrics_instances
return attack_metrics_experiment
class _DummyClassifier:
"""A Dummy Classifier to allow this code to work with get_metrics."""
def predict(self, X_test):
"""Return an array of 1/0 depending on value in second column."""
return 1 * (X_test[:, 1] > 0.5)
def predict_proba(self, X_test):
"""Simply return the X_test."""
return X_test
def _logit(p: float) -> float:
"""Return standard logit.
Parameters
----------
p : float
value to evaluate logit at.
Returns
-------
float
logit(p)
Notes
-----
If `p` is close to 0 or 1, evaluating the log will result in numerical
instabilities. This code thresholds `p` at `EPS` and `1 - EPS` where `EPS`
defaults at 1e-16.
"""
if p > 1 - EPS:
p = 1 - EPS
p = max(p, EPS)
return np.log(p / (1 - p))