Source code for sacroml.attacks.structural_attack

"""Structural attacks.

Runs a number of 'static' structural attacks based on:
(i) the target model's properties;
(ii) the TRE's risk appetite as applied to tables and standard regressions.

This module provides the `StructuralAttack` class, which assesses a trained
machine learning model for several common structural vulnerabilities.

These include:
- Degrees of freedom risk
- k-anonymity violations
- Class disclosure
- 'Unnecessary Risk' caused by hyper-parameters likely to lead to undue model complexity

The methodology is aligned with SACRO-ML's privacy risk framework.
"""

from __future__ import annotations

import logging
from dataclasses import asdict, dataclass

import numpy as np
from acro import ACRO
from fpdf import FPDF
from sklearn.base import BaseEstimator
from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.tree import DecisionTreeClassifier
from xgboost.sklearn import XGBClassifier

from sacroml.attacks import report
from sacroml.attacks.attack import Attack
from sacroml.attacks.target import Target

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# --- Data Structure for Attack Results ---


[docs] @dataclass class StructuralAttackResults: """ Dataclass to store the results of a structural attack. Attributes ---------- dof_risk (bool) : Risk based on degrees of freedom. k_anonymity_risk (bool) : Risk based on k-anonymity violations. class_disclosure_risk (bool) : Risk of class label disclosure. lowvals_cd_risk (bool) : Risk from low-frequency class values. unnecessary_risk (bool) : Risk due to unnecessarily complex model structure. details (dict | None) : Optional additional metadata. """ dof_risk: bool k_anonymity_risk: bool class_disclosure_risk: bool lowvals_cd_risk: bool unnecessary_risk: bool details: dict | None = None
""" Optional additional metadata, such as model-specific notes or thresholds used. """ # --- Standalone Helper Functions for Risk Assessment ---
[docs] def get_unnecessary_risk(model: BaseEstimator) -> bool: """Check whether model hyperparameters are in the top 20% most risky. This check is based on a classifier trained on results from a large scale study described in: https://doi.org/10.48550/arXiv.2502.09396 Parameters ---------- model : BaseEstimator The trained model to check for risk. Returns ------- bool True if the model's hyperparameters are considered high risk, otherwise False. """ if isinstance(model, DecisionTreeClassifier): return _get_unnecessary_risk_dt(model) if isinstance(model, RandomForestClassifier): return _get_unnecessary_risk_rf(model) if isinstance(model, XGBClassifier): return _get_unnecessary_risk_xgb(model) return False
def _get_unnecessary_risk_dt(model: DecisionTreeClassifier) -> bool: """Return whether DecisionTreeClassifier parameters are high risk. This function applies decision rules extracted from a trained decision tree classifier on hyperparameter configurations ranked by MIA AUC. """ max_depth = float(model.max_depth) if model.max_depth else 500 max_features = model.max_features min_samples_leaf = model.min_samples_leaf min_samples_split = model.min_samples_split splitter = model.splitter return ( (max_depth > 7.5 and min_samples_leaf <= 7.5 and min_samples_split <= 15) or ( splitter == "best" and max_depth > 7.5 and min_samples_leaf <= 7.5 and min_samples_split > 15 ) or ( splitter == "best" and max_depth > 7.5 and 7.5 < min_samples_leaf <= 15 and max_features is None ) or ( splitter == "best" and 3.5 < max_depth <= 7.5 and max_features is None and min_samples_leaf <= 7.5 ) or ( splitter == "random" and max_depth > 7.5 and min_samples_leaf <= 7.5 and max_features is None ) ) def _get_unnecessary_risk_rf(model: RandomForestClassifier) -> bool: """Return whether RandomForestClassifier parameters are high risk. This function applies decision rules extracted from a trained decision tree classifier on hyperparameter configurations ranked by MIA AUC. """ max_depth = float(model.max_depth) if model.max_depth else 500 n_estimators = model.n_estimators max_features = model.max_features min_samples_leaf = model.min_samples_leaf min_samples_split = model.min_samples_split return ( (max_depth > 3.5 and n_estimators > 35 and max_features is not None) or ( max_depth > 3.5 and n_estimators > 35 and min_samples_split <= 15 and max_features is None and model.bootstrap ) or ( max_depth > 7.5 and 15 < n_estimators <= 35 and min_samples_leaf <= 15 and not model.bootstrap ) ) def _get_unnecessary_risk_xgb(model: XGBClassifier) -> bool: """Return whether XGBClassifier parameters are high risk. This function applies decision rules extracted from a trained decision tree classifier on hyperparameter configurations ranked by MIA AUC. If parameters have not been specified it takes the xgboost defaults from https://github.com/dmlc/xgboost/blob/master/python-package/xgboost/sklearn.py and here: https://xgboost.readthedocs.io/en/stable/parameter.html """ n_estimators = int(model.n_estimators) if model.n_estimators else 100 max_depth = float(model.max_depth) if model.max_depth else 6 min_child_weight = float(model.min_child_weight) if model.min_child_weight else 1.0 return ( (max_depth > 3.5 and 3.5 < n_estimators <= 12.5 and min_child_weight <= 1.5) or (max_depth > 3.5 and n_estimators > 12.5 and min_child_weight <= 3) or (max_depth > 3.5 and n_estimators > 62.5 and 3 < min_child_weight <= 6) ) # --- Standalone Helper Functions for Parameter Counting ---
[docs] def get_model_param_count(model: BaseEstimator) -> int: """Return the number of trained parameters in a model. This includes learned weights, thresholds, and decision rules depending on model type. Supports DecisionTree, RandomForest, AdaBoost, XGBoost, and MLP classifiers. Parameters ---------- model (BaseEstimator) : A trained scikit-learn or XGBoost model. Returns ------- int : Estimated number of learned parameters. """ if isinstance(model, DecisionTreeClassifier): return _get_model_param_count_dt(model) if isinstance(model, RandomForestClassifier): return _get_model_param_count_rf(model) if isinstance(model, AdaBoostClassifier): return _get_model_param_count_ada(model) if isinstance(model, XGBClassifier): return _get_model_param_count_xgb(model) if isinstance(model, MLPClassifier): return _get_model_param_count_mlp(model) logger.warning( "Parameter counting not implemented for model type %s", type(model).__name__ ) return 0
def _get_tree_parameter_count(dtree: DecisionTreeClassifier) -> int: """Read the tree structure and return the number of learned parameters.""" n_nodes = dtree.tree_.node_count is_leaf = dtree.tree_.children_left == dtree.tree_.children_right n_leaves = np.sum(is_leaf) n_internal_nodes = n_nodes - n_leaves # 2 params (feature, threshold) per internal node # (n_classes - 1) params per leaf node for the probability distribution return 2 * n_internal_nodes + n_leaves * (dtree.n_classes_ - 1) def _get_model_param_count_dt(model: DecisionTreeClassifier) -> int: """Return the number of trained DecisionTreeClassifier parameters.""" return _get_tree_parameter_count(model) def _get_model_param_count_rf(model: RandomForestClassifier) -> int: """Return the number of trained RandomForestClassifier parameters.""" return sum(_get_tree_parameter_count(member) for member in model.estimators_) def _get_model_param_count_ada(model: AdaBoostClassifier) -> int: """Return the number of trained AdaBoostClassifier parameters.""" try: # sklearn v1.2+ base = model.estimator except AttributeError: # pragma: no cover (sklearn version <1.2) base = model.base_estimator if isinstance(base, DecisionTreeClassifier): return sum(_get_tree_parameter_count(member) for member in model.estimators_) return 0 def _get_model_param_count_xgb(model: XGBClassifier) -> int: """Return the number of trained XGBClassifier parameters.""" df = model.get_booster().trees_to_dataframe() if df.empty: return 0 n_trees = df["Tree"].max() + 1 n_leaves = len(df[df.Feature == "Leaf"]) n_internal_nodes = len(df) - n_leaves # 2 params per internal node, (n_classes-1) per leaf, one weight per tree return 2 * n_internal_nodes + (model.n_classes_ - 1) * n_leaves + n_trees def _get_model_param_count_mlp(model: MLPClassifier) -> int: """Return the number of trained MLPClassifier parameters.""" weights = model.coefs_ biases = model.intercepts_ return sum(w.size for w in weights) + sum(b.size for b in biases) # --- Main Attack Class ---
[docs] class StructuralAttack(Attack): """Structural attacks based on the static structure of a model. Performs structural privacy risk assessments on trained ML models. This class implements static structural attacks based on model architecture and hyperparameters, aligned with TRE risk appetite configurations. Attack pipeline includes: - Equivalence class analysis - Degrees of freedom check - k-anonymity check - Class disclosure risk - Complexity risk """
[docs] def __init__( self, output_dir: str = "outputs", write_report: bool = True, risk_appetite_config: str = "default", ) -> None: """Construct an object to execute a structural attack. Parameters ---------- output_dir : str Name of a directory to write outputs. write_report : bool Whether to generate a JSON and PDF report. risk_appetite_config : str Path to yaml file specifying TRE risk appetite. """ super().__init__(output_dir=output_dir, write_report=write_report) self.target: Target | None = None self.results: StructuralAttackResults | None = None # Load risk appetite from ACRO config myacro = ACRO(risk_appetite_config) self.risk_appetite_config = risk_appetite_config self.THRESHOLD = myacro.config["safe_threshold"] self.DOF_THRESHOLD = myacro.config["safe_dof_threshold"] logger.info( "Thresholds for count %i and DoF %i", self.THRESHOLD, self.DOF_THRESHOLD )
def __str__(self) -> str: """Return the name of the attack.""" return "Structural Attack"
[docs] @classmethod def attackable(cls, target: Target) -> bool: """Return whether a target can be assessed with StructuralAttack.""" if ( target.has_model() and isinstance(target.model.model, BaseEstimator) and target.has_data() ): return True logger.info("WARNING: StructuralAttack requires a loadable model and data.") return False
def _attack(self, target: Target) -> dict: """Run all structural risk assessments and returns a report dictionary. This is the main orchestration method, called by the base class `run` method. It calls helper methods to perform individual risk checks and collates the results into a dictionary for reporting. This method orchestrates the full structural attack pipeline, including: - Degrees of freedom risk - k-anonymity risk - Class disclosure risk - Unnecessary complexity risk Parameters ---------- target : Target The target object containing the model and data. Returns ------- dict Attack report. A dictionary containing the results and metadata of the attack. Note: This method is invoked by the base class `run()` method. It assumes the target model has been trained and validated """ self.target = target model = target.model.model # Calculate equivalence classes, which are needed for several checks equiv_classes, equiv_counts, _ = self._calculate_equivalence_classes(model) # Run individual risk assessments dof_risk = self._assess_dof_risk(model) k_anonymity_risk = self._assess_k_anonymity_risk(equiv_counts) unnecessary_risk = get_unnecessary_risk(model) class_disclosure_risk, lowvals_cd_risk = self._assess_class_disclosure_risk( equiv_classes, equiv_counts ) # Collate results into the structured dataclass self.results = StructuralAttackResults( dof_risk=dof_risk, k_anonymity_risk=k_anonymity_risk, unnecessary_risk=unnecessary_risk, class_disclosure_risk=class_disclosure_risk, lowvals_cd_risk=lowvals_cd_risk, ) # Let the base class generate the report dictionary. # It will internally call our overridden _construct_metadata method. output = self._make_report(target) # If requested, write the JSON report file. # The PDF is generated by the main runner script from all JSON files. if self.write_report: self._write_report(output) return output def _assess_dof_risk(self, model: BaseEstimator) -> bool: """Assess risk based on Residual Degrees of Freedom. Returns ------- bool : True if the model's residual degrees of freedom are below the safe threshold. """ n_features = self.target.X_train.shape[1] n_samples = self.target.X_train.shape[0] n_params = get_model_param_count(model) if n_params < n_features: logger.info( "Model has fewer parameters (%d) than features (%d).", n_params, n_features, ) residual_dof = n_samples - n_params logger.info( "Samples=%d, Parameters=%d, DoF=%d", n_samples, n_params, residual_dof ) return residual_dof < self.DOF_THRESHOLD def _assess_k_anonymity_risk(self, equiv_counts: np.ndarray) -> bool: """Assess k-anonymity risk from equivalence class sizes. Returns ------- bool : True if the smallest equivalence class size is below the safe threshold. """ min_k = np.min(equiv_counts) logger.info("Smallest equivalence class size (k-anonymity) is %d", min_k) return min_k < self.THRESHOLD def _assess_class_disclosure_risk( self, equiv_classes: np.ndarray, equiv_counts: np.ndarray ) -> tuple[bool, bool]: """Assess risk of disclosing class frequencies. Returns ------- tuple[bool, bool]: - class_disclosure_risk: True if any class frequency is below the threshold. - lowvals_cd_risk: True if low-frequency values pose a disclosure risk. """ freqs = equiv_classes * equiv_counts[:, np.newaxis] class_disclosure_risk = np.any((freqs > 0) & (freqs < self.THRESHOLD)) lowvals_cd_risk = np.any((freqs > 0) & (freqs < self.THRESHOLD)) return class_disclosure_risk, lowvals_cd_risk def _calculate_equivalence_classes(self, model: BaseEstimator) -> tuple: """Calculate equivalence classes based on model type and predictions.""" if isinstance(model, DecisionTreeClassifier): return self._dt_get_equivalence_classes(model) return self._get_equivalence_classes_from_probas(model) def _dt_get_equivalence_classes(self, model: DecisionTreeClassifier) -> tuple: """Get equivalence classes for a Decision Tree via leaf nodes.""" destinations = model.apply(self.target.X_train) leaves, counts = np.unique(destinations, return_counts=True) members = [np.where(destinations == leaf)[0] for leaf in leaves] sample_indices = [mem[0] for mem in members if len(mem) > 0] equiv_classes = model.predict_proba(self.target.X_train[sample_indices]) return equiv_classes, counts, members def _get_equivalence_classes_from_probas(self, model: BaseEstimator) -> tuple: """Get equivalence classes based on predicted probabilities.""" y_probs = model.predict_proba(self.target.X_train) equiv_classes, inverse_indices, equiv_counts = np.unique( y_probs, axis=0, return_inverse=True, return_counts=True ) members = [np.where(inverse_indices == i)[0] for i in range(len(equiv_classes))] return equiv_classes, equiv_counts, members def _construct_metadata(self): """Construct the metadata dictionary for reporting. Used internally to populate metadata for the attack report, including thresholds and results. """ super()._construct_metadata() self.metadata["attack_specific_output"] = { "attack_name": str(self), "risk_appetite_config": self.risk_appetite_config, "safe_threshold": self.THRESHOLD, "safe_dof_threshold": self.DOF_THRESHOLD, } if self.results: self.metadata["global_metrics"] = asdict(self.results) def _get_attack_metrics_instances(self) -> dict: """Return attack metrics. Required by the Attack base class. Used internally to expose metrics from the `StructuralAttackResults` dataclass. """ # This method is required by the abstract base class. # Its functionality is now handled by the `results` dataclass # and the `_construct_metadata` method. # We return the metrics from the results object if available. if self.results: return asdict(self.results) return {} def _make_pdf(self, output: dict) -> FPDF: """Create PDF report using the external report module. Returns ------- FPDF : A PDF object containing the formatted structural attack report. """ return report.create_structural_report(output)