Source code for sacroml.attacks.structural_attack

"""Structural attacks.

Runs a number of 'static' structural attacks based on:
(i) the target model's properties;
(ii) the TREs risk appetite as applied to tables and standard regressions.

Tree-based model types currently supported.
"""

from __future__ import annotations

import logging

import numpy as np
from acro import ACRO
from sklearn.base import BaseEstimator
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.tree import DecisionTreeClassifier
from xgboost.sklearn import XGBClassifier

from sacroml.attacks.attack import Attack
from sacroml.attacks.target import Target

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# pylint: disable=chained-comparison


[docs] def get_unnecessary_risk(model: BaseEstimator) -> bool: """Check whether model hyperparameters are in the top 20% most risky. This check is designed to assess whether a model is likely to be **unnecessarily** risky, i.e., whether it is highly likely that a different combination of hyper-parameters would have led to model with similar or better accuracy on the task but with lower membership inference risk. The rules applied from an experimental study using a grid search in which: - max_features was one-hot encoded from the set [None, log2, sqrt] - splitter was encoded using 0=best, 1=random The target models created were then subject to membership inference attacks (MIA) and the hyper-param combinations rank-ordered according to MIA AUC. Then a decision tree trained to recognise whether hyper-params combintions were in the 20% most risky. The rules below were extracted from that tree for the 'least risky' nodes. Parameters ---------- model : BaseEstimator Model to check for risk. Returns ------- bool True if high risk, otherwise False. """ unnecessary_risk: bool = False if isinstance(model, DecisionTreeClassifier): unnecessary_risk = _get_unnecessary_risk_dt(model) elif isinstance(model, RandomForestClassifier): unnecessary_risk = _get_unnecessary_risk_rf(model) elif isinstance(model, XGBClassifier): unnecessary_risk = _get_unnecessary_risk_xgb(model) return unnecessary_risk
def _get_unnecessary_risk_dt(model: DecisionTreeClassifier) -> bool: """Return whether DecisionTreeClassifier parameters are high risk.""" max_depth = float(model.max_depth) if model.max_depth else 500 max_features = model.max_features min_samples_leaf = model.min_samples_leaf min_samples_split = model.min_samples_split splitter = model.splitter return ( (max_depth > 7.5 and min_samples_leaf <= 7.5 and min_samples_split <= 15) or ( splitter == "best" and max_depth > 7.5 and min_samples_leaf <= 7.5 and min_samples_split > 15 ) or ( splitter == "best" and max_depth > 7.5 and 7.5 < min_samples_leaf <= 15 and max_features is None ) or ( splitter == "best" and 3.5 < max_depth <= 7.5 and max_features is None and min_samples_leaf <= 7.5 ) or ( splitter == "random" and max_depth > 7.5 and min_samples_leaf <= 7.5 and max_features is None ) ) def _get_unnecessary_risk_rf(model: RandomForestClassifier) -> bool: """Return whether RandomForestClassifier parameters are high risk.""" max_depth = float(model.max_depth) if model.max_depth else 500 n_estimators = model.n_estimators max_features = model.max_features min_samples_leaf = model.min_samples_leaf min_samples_split = model.min_samples_split return ( (max_depth > 3.5 and n_estimators > 35 and max_features is not None) or ( max_depth > 3.5 and n_estimators > 35 and min_samples_split <= 15 and max_features is None and model.bootstrap ) or ( max_depth > 7.5 and 15 < n_estimators <= 35 and min_samples_leaf <= 15 and not model.bootstrap ) ) def _get_unnecessary_risk_xgb(model: XGBClassifier) -> bool: """Return whether XGBClassifier parameters are high risk. Check whether params exist and using xgboost defaults if not using defaults from https://github.com/dmlc/xgboost/blob/master/python-package/xgboost/sklearn.py and here: https://xgboost.readthedocs.io/en/stable/parameter.html """ n_estimators = int(model.n_estimators) if model.n_estimators else 100 max_depth = float(model.max_depth) if model.max_depth else 6 min_child_weight = float(model.min_child_weight) if model.min_child_weight else 1.0 return ( (max_depth > 3.5 and 3.5 < n_estimators <= 12.5 and min_child_weight <= 1.5) or (max_depth > 3.5 and n_estimators > 12.5 and min_child_weight <= 3) or (max_depth > 3.5 and n_estimators > 62.5 and 3 < min_child_weight <= 6) )
[docs] def get_tree_parameter_count(dtree: DecisionTreeClassifier) -> int: """Read the tree structure and return the number of learned parameters.""" n_nodes = dtree.tree_.node_count left = dtree.tree_.children_left right = dtree.tree_.children_right is_leaf = np.zeros(n_nodes, dtype=int) for node_id in range(n_nodes): if left[node_id] == right[node_id]: is_leaf[node_id] = 1 n_leaves = is_leaf.sum() # degrees of freedom n_internal_nodes = n_nodes - n_leaves n_params = 2 * n_internal_nodes # feature id and threshold n_params += n_leaves * (dtree.n_classes_ - 1) # probability distribution return n_params
[docs] def get_model_param_count(model: BaseEstimator) -> int: """Return the number of trained parameters in a model.""" n_params: int = 0 if isinstance(model, DecisionTreeClassifier): n_params = _get_model_param_count_dt(model) elif isinstance(model, RandomForestClassifier): n_params = _get_model_param_count_rf(model) elif isinstance(model, AdaBoostClassifier): n_params = _get_model_param_count_ada(model) elif isinstance(model, XGBClassifier): n_params = _get_model_param_count_xgb(model) elif isinstance(model, MLPClassifier): n_params = _get_model_param_count_mlp(model) return n_params
def _get_model_param_count_dt(model: DecisionTreeClassifier) -> int: """Return the number of trained DecisionTreeClassifier parameters.""" return get_tree_parameter_count(model) def _get_model_param_count_rf(model: RandomForestClassifier) -> int: """Return the number of trained RandomForestClassifier parameters.""" n_params: int = 0 for member in model.estimators_: n_params += get_tree_parameter_count(member) return n_params def _get_model_param_count_ada(model: AdaBoostClassifier) -> int: """Return the number of trained AdaBoostClassifier parameters.""" n_params: int = 0 try: # sklearn v1.2+ base = model.estimator except AttributeError: # sklearn version <1.2 base = model.base_estimator if isinstance(base, DecisionTreeClassifier): for member in model.estimators_: n_params += get_tree_parameter_count(member) return n_params def _get_model_param_count_xgb(model: XGBClassifier) -> int: """Return the number of trained XGBClassifier parameters.""" df = model.get_booster().trees_to_dataframe() n_trees = df["Tree"].max() total = len(df) n_leaves = len(df[df.Feature == "Leaf"]) # 2 per internal node, one per clas in leaves, one weight per tree return 2 * (total - n_leaves) + (model.n_classes_ - 1) * n_leaves + n_trees def _get_model_param_count_mlp(model: MLPClassifier) -> int: """Return the number of trained MLPClassifier parameters.""" weights = model.coefs_ # dtype is list of numpy.ndarrays biasses = model.intercepts_ # dtype is list of numpy.ndarrays return sum(a.size for a in weights) + sum(a.size for a in biasses)
[docs] class StructuralAttack(Attack): """Structural attacks based on the static structure of a model.""" # pylint: disable=too-many-instance-attributes
[docs] def __init__( self, output_dir: str = "outputs", write_report: bool = True, risk_appetite_config: str = "default", ) -> None: """Construct an object to execute a structural attack. Parameters ---------- output_dir : str Name of a directory to write outputs. write_report : bool Whether to generate a JSON and PDF report. risk_appetite_config : str Path to yaml file specifying TRE risk appetite. """ super().__init__(output_dir=output_dir, write_report=write_report) self.target: Target = None # disclosure risk self.k_anonymity_risk: bool = False self.dof_risk: bool = False self.unnecessary_risk: bool = False self.class_disclosure_risk: bool = False self.lowvals_cd_risk: bool = False # make dummy acro object and use it to extract risk appetite myacro = ACRO(risk_appetite_config) self.risk_appetite_config = risk_appetite_config self.THRESHOLD = myacro.config["safe_threshold"] self.DOF_THRESHOLD = myacro.config["safe_dof_threshold"] logger.info( "Thresholds for count %i and Dof %i", self.THRESHOLD, self.DOF_THRESHOLD ) del myacro # metrics self.attack_metrics = [ "dof_risk", "k_anonymity_risk", "class_disclosure_risk", "lowvals_cd_risk", "unnecessary_risk", ] self.yprobs = []
def __str__(self) -> str: """Return the name of the attack.""" return "Structural attack"
[docs] def attack(self, target: Target) -> dict: """Run structural attack. To be used when code has access to Target class and trained target model. Parameters ---------- target : attacks.target.Target target as a Target class object Returns ------- dict Attack report. """ self.target = target if target.model is None: errstr = ( "cannot currently call StructuralAttack.attack() " "unless the target contains a trained model" ) raise NotImplementedError(errstr) # get proba values for training data x = self.target.X_train y = self.target.y_train assert x.shape[0] == len(y), "length mismatch between trainx and trainy" self.yprobs = self.target.model.predict_proba(x) # only equivalence classes and membership once as potentially slow if isinstance(target.model, DecisionTreeClassifier): equiv = self.dt_get_equivalence_classes() else: equiv = self.get_equivalence_classes() equiv_classes = equiv[0] equiv_counts = equiv[1] equiv_members = equiv[2] errstr = "len mismatch between equiv classes and " assert len(equiv_classes) == len(equiv_counts), errstr + "counts" assert len(equiv_classes) == len(equiv_members), errstr + "membership" # now assess the risk # Degrees of Freedom n_params = get_model_param_count(target.model) residual_dof = self.target.X_train.shape[0] - n_params self.dof_risk = residual_dof < self.DOF_THRESHOLD # k-anonymity mink = np.min(np.array(equiv_counts)) self.k_anonymity_risk = mink < self.THRESHOLD # unnecessary risk arising from poor hyper-parameter combination. self.unnecessary_risk = get_unnecessary_risk(self.target.model) # class disclosure freqs = np.zeros(equiv_classes.shape) for group in range(freqs.shape[0]): freqs = equiv_classes[group] * equiv_counts[group] self.class_disclosure_risk = np.any(freqs < self.THRESHOLD) freqs[freqs == 0] = 100 self.lowvals_cd_risk = np.any(freqs < self.THRESHOLD) # create the report output = self._make_report(target) # write the report self._write_report(output) # return the report return output
[docs] def dt_get_equivalence_classes(self) -> tuple: """Get details of equivalence classes based on white box inspection.""" destinations = self.target.model.apply(self.target.X_train) ret_tuple = np.unique(destinations, return_counts=True) leaves = ret_tuple[0] counts = ret_tuple[1] members = [] for leaf in leaves: ingroup = np.asarray(destinations == leaf).nonzero()[0] members.append(ingroup) equiv_classes = np.zeros((len(leaves), self.target.model.n_classes_)) for group in range(len(leaves)): sample_id = members[group][0] sample = self.target.X_train[sample_id] proba = self.target.model.predict_proba(sample.reshape(1, -1)) equiv_classes[group] = proba return [equiv_classes, counts, members]
[docs] def get_equivalence_classes(self) -> tuple: """Get details of equivalence classes based on predicted probabilities.""" uniques = np.unique(self.yprobs, axis=0, return_counts=True) equiv_classes = uniques[0] equiv_counts = uniques[1] members = [] for prob_vals in equiv_classes: ingroup = np.unique(np.asarray(self.yprobs == prob_vals).nonzero()[0]) members.append(ingroup) return [equiv_classes, equiv_counts, members]
def _get_global_metrics(self, attack_metrics: list) -> dict: """Get dictionary summarising metrics from a metric list. Parameters ---------- attack_metrics : list List of attack metrics to be reported. Returns ------- global_metrics : dict Dictionary of summary metrics. """ global_metrics = {} if attack_metrics is not None and len(attack_metrics) != 0: global_metrics["dof_risk"] = self.dof_risk global_metrics["k_anonymity_risk"] = self.k_anonymity_risk global_metrics["class_disclosure_risk"] = self.class_disclosure_risk global_metrics["unnecessary_risk"] = self.unnecessary_risk global_metrics["lowvals_cd_risk"] = self.lowvals_cd_risk return global_metrics def _construct_metadata(self): """Construct the metadata object, after attacks.""" super()._construct_metadata() self.metadata["global_metrics"] = self._get_global_metrics(self.attack_metrics) def _get_attack_metrics_instances(self) -> dict: """Construct the metadata object, after attacks.""" attack_metrics_experiment = {} attack_metrics_instances = {} attack_metrics_experiment["attack_instance_logger"] = attack_metrics_instances attack_metrics_experiment["dof_risk"] = self.dof_risk attack_metrics_experiment["k_anonymity_risk"] = self.k_anonymity_risk attack_metrics_experiment["class_disclosure_risk"] = self.class_disclosure_risk attack_metrics_experiment["unnecessary_risk"] = self.unnecessary_risk attack_metrics_experiment["lowvals_cd_risk"] = self.lowvals_cd_risk return attack_metrics_experiment def _make_pdf(self, output: dict) -> None: attack_name: str = output["metadata"]["attack_name"] logger.info("PDF report not yet implemented for %s", attack_name)