Source code for sacroml.attacks.target

"""Store information about the target model and data."""

from __future__ import annotations

import logging
import os
import pickle
import shutil
from typing import Any

import numpy as np
import pandas as pd
import sklearn
import torch
import yaml

from sacroml.attacks.model_pytorch import PytorchModel
from sacroml.attacks.model_sklearn import SklearnModel

registry: dict = {
    "PytorchModel": PytorchModel,
    "SklearnModel": SklearnModel,
}

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


[docs] class Target: # pylint: disable=too-many-instance-attributes """Store information about the target model and data."""
[docs] def __init__( # pylint: disable=too-many-arguments, too-many-locals self, model: Any = None, model_path: str = "", model_module_path: str = "", model_name: str = "", model_params: dict | None = None, train_module_path: str = "", train_params: dict | None = None, dataset_name: str = "", dataset_module_path: str = "", features: dict | None = None, X_train: np.ndarray | None = None, y_train: np.ndarray | None = None, X_test: np.ndarray | None = None, y_test: np.ndarray | None = None, X_orig: np.ndarray | None = None, y_orig: np.ndarray | None = None, X_train_orig: np.ndarray | None = None, y_train_orig: np.ndarray | None = None, X_test_orig: np.ndarray | None = None, y_test_orig: np.ndarray | None = None, proba_train: np.ndarray | None = None, proba_test: np.ndarray | None = None, ) -> None: """Store information about a target model and associated data. Parameters ---------- model : Any Trained target model. model_path : str Path to a saved model. model_module_path : str Path to module containing model class. model_name : str Class name of model. model_params : dict | None Hyperparameters for instantiating the model. train_module_path : str Path to module containing training function. train_params : dict | None Hyperparameters for training the model. dataset_name : str The name of the dataset. dataset_module_path : str Path to module containing dataset loading function. features : dict Dictionary describing the dataset features. X_train : np.ndarray | None The (processed) training inputs. y_train : np.ndarray | None The (processed) training outputs. X_test : np.ndarray | None The (processed) testing inputs. y_test : np.ndarray | None The (processed) testing outputs. X_orig : np.ndarray | None The original (unprocessed) dataset inputs. y_orig : np.ndarray | None The original (unprocessed) dataset outputs. X_train_orig : np.ndarray | None The original (unprocessed) training inputs. y_train_orig : np.ndarray | None The original (unprocessed) training outputs. X_test_orig : np.ndarray | None The original (unprocessed) testing inputs. y_test_orig : np.ndarray | None The original (unprocessed) testing outputs. proba_train : np.ndarray | None The model predicted training probabilities. proba_test : np.ndarray | None The model predicted testing probabilities. """ # Model - details if isinstance(model, sklearn.base.BaseEstimator): self.model = SklearnModel( model=model, model_path=model_path, model_module_path=model_module_path, model_name=model_name, model_params=model_params, train_module_path=train_module_path, train_params=train_params, ) elif isinstance(model, torch.nn.Module): self.model = PytorchModel( model=model, model_path=model_path, model_module_path=model_module_path, model_name=model_name, model_params=model_params, train_module_path=train_module_path, train_params=train_params, ) elif isinstance(model, (SklearnModel, PytorchModel)): self.model = model elif model is not None: # pragma: no cover raise ValueError(f"Unsupported model type: {type(model)}") else: # for subsequent model loading self.model = None # Model - code self.model_module_path = model_module_path # Data - model predicted probabilities self.proba_train: np.ndarray | None = proba_train self.proba_test: np.ndarray | None = proba_test # Dataset - details self.dataset_name: str = dataset_name # Dataset - code self.dataset_module_path = dataset_module_path # Dataset - processed self.X_train: np.ndarray | None = X_train self.y_train: np.ndarray | None = y_train self.X_test: np.ndarray | None = X_test self.y_test: np.ndarray | None = y_test self.n_samples: int = 0 if X_train is not None and X_test is not None: self.n_samples = len(X_train) + len(X_test) # Dataset - unprocessed self.X_orig: np.ndarray | None = X_orig self.y_orig: np.ndarray | None = y_orig self.X_train_orig: np.ndarray | None = X_train_orig self.y_train_orig: np.ndarray | None = y_train_orig self.X_test_orig: np.ndarray | None = X_test_orig self.y_test_orig: np.ndarray | None = y_test_orig self.n_samples_orig: int = 0 if X_train_orig is not None and X_test_orig is not None: self.n_samples_orig = len(X_train_orig) + len(X_test_orig) self.features: dict = features if features is not None else {} self.n_features: int = len(self.features) # Safemodel report self.safemodel: list = []
[docs] def add_processed_data( self, X_train: np.ndarray, y_train: np.ndarray, X_test: np.ndarray, y_test: np.ndarray, ) -> None: """Add a processed and split dataset.""" self.X_train = X_train self.y_train = np.array(y_train, int) self.X_test = X_test self.y_test = np.array(y_test, int) self.n_samples = len(X_train) + len(X_test)
[docs] def add_feature(self, name: str, indices: list[int], encoding: str) -> None: """Add a feature description to the data dictionary.""" index: int = len(self.features) self.features[index] = { "name": name, "indices": indices, "encoding": encoding, } self.n_features = len(self.features)
[docs] def add_raw_data( # pylint: disable=too-many-arguments self, X_orig: np.ndarray, y_orig: np.ndarray, X_train_orig: np.ndarray, y_train_orig: np.ndarray, X_test_orig: np.ndarray, y_test_orig: np.ndarray, ) -> None: """Add original unprocessed dataset.""" self.X_orig = X_orig self.y_orig = y_orig self.X_train_orig = X_train_orig self.y_train_orig = y_train_orig self.X_test_orig = X_test_orig self.y_test_orig = y_test_orig self.n_samples_orig = len(X_orig)
def _save_model(self, path: str, ext: str, target: dict) -> None: """Save the target model. Parameters ---------- path : str Path to write the model. ext : str File extension defining the model saved format, e.g., "pkl" or "sav". target : dict Target class as a dictionary for writing yaml. """ if not self.model is None: target["model_type"] = self.model.model_type target["model_name"] = self.model.model_name target["model_params"] = self.model.get_params() if self.model_module_path != "": filename = os.path.normpath(f"{path}/model.py") shutil.copy2(self.model.model_module_path, filename) target["model_module_path"] = "model.py" if self.model.train_module_path != "": filename = os.path.normpath(f"{path}/train.py") shutil.copy2(self.model.train_module_path, filename) target["train_module_path"] = "train.py" target["train_params"] = self.model.train_params if not self.model is None: filename = os.path.normpath(f"{path}/model.{ext}") target["model_path"] = f"model.{ext}" self.model.save(filename) def _load_model(self, path: str, target: dict) -> None: """Load the target model. Parameters ---------- path : str Path to a target directory. target : dict Target class as a dictionary for loading yaml. """ # Load attributes model_type: str = target.get("model_type", "") model_name: str = target.get("model_name", "") model_params: dict = target.get("model_params", {}) model_path: str = target.get("model_path", "") model_module_path: str = target.get("model_module_path", "") train_module_path: str = target.get("train_module_path", "") train_params: dict = target.get("train_params", {}) # Normalise paths model_path = os.path.normpath(f"{path}/{model_path}") model_module_path = os.path.normpath(f"{path}/{model_module_path}") train_module_path = os.path.normpath(f"{path}/{train_module_path}") # Load model if model_type in registry: model_class = registry[model_type] self.model = model_class.load( model_path=model_path, model_module_path=model_module_path, model_name=model_name, model_params=model_params, train_module_path=train_module_path, train_params=train_params, ) logger.info("Loaded: %s : %s", model_type, model_name) else: # pragma: no cover self.model = None logger.info("Can't load model: %s : %s", model_type, model_name) def _save_numpy(self, path: str, target: dict, name: str) -> None: """Save a numpy array variable as pickle. Parameters ---------- path : str Path to save the data. target : dict Target class as a dictionary for writing yaml. name : str Name of the numpy array to save. """ if getattr(self, name) is not None: np_path: str = os.path.normpath(f"{path}/{name}.pkl") target[f"{name}_path"] = f"{name}.pkl" with open(np_path, "wb") as fp: pickle.dump(getattr(self, name), fp, protocol=pickle.HIGHEST_PROTOCOL) else: target[f"{name}_path"] = ""
[docs] def load_array(self, arr_path: str, name: str) -> None: """Load a data array variable from file. Handles both .pkl and .csv files. Parameters ---------- arr_path : str Filename of a data array. name : str Name of the data array to load. """ path = os.path.normpath(arr_path) _, ext = os.path.splitext(path) if ext == ".pkl": arr = get_array_pkl(path, name) elif ext == ".csv": # pragma: no cover arr = get_array_csv(path, name) else: # pragma: no cover raise ValueError(f"Target cannot load {ext} files.") from None setattr(self, name, arr)
def _load_array(self, arr_path: str, target: dict, name: str) -> None: """Load a data array variable contained in a yaml config. Parameters ---------- arr_path : str Filename of a data array. target : dict Target class as a dictionary read from yaml. name : str Name of the data array to load. """ key = f"{name}_path" if key in target and target[key] != "": path = f"{arr_path}/{target[key]}" self.load_array(path, name) def _save_data(self, path: str, target: dict) -> None: """Save the target model data. Parameters ---------- path : str Path to save the data. target : dict Target class as a dictionary for writing yaml. """ if self.dataset_module_path != "": # pragma: no cover filename = os.path.normpath(f"{path}/dataset.py") shutil.copy2(self.dataset_module_path, filename) target["dataset_module_path"] = "dataset.py" self._save_numpy(path, target, "X_train") self._save_numpy(path, target, "y_train") self._save_numpy(path, target, "X_test") self._save_numpy(path, target, "y_test") self._save_numpy(path, target, "X_orig") self._save_numpy(path, target, "y_orig") self._save_numpy(path, target, "X_train_orig") self._save_numpy(path, target, "y_train_orig") self._save_numpy(path, target, "X_test_orig") self._save_numpy(path, target, "y_test_orig") self._save_numpy(path, target, "proba_train") self._save_numpy(path, target, "proba_test") def _load_data(self, path: str, target: dict) -> None: """Load the target model data. Parameters ---------- path : str Path to load the data. target : dict Target class as a dictionary read from yaml. """ self._load_array(path, target, "X_train") self._load_array(path, target, "y_train") self._load_array(path, target, "X_test") self._load_array(path, target, "y_test") self._load_array(path, target, "X_orig") self._load_array(path, target, "y_orig") self._load_array(path, target, "X_train_orig") self._load_array(path, target, "y_train_orig") self._load_array(path, target, "X_test_orig") self._load_array(path, target, "y_test_orig") self._load_array(path, target, "proba_train") self._load_array(path, target, "proba_test") def _ge(self) -> float: """Return the model generalisation error. Returns ------- float Generalisation error. """ if ( self.model is not None and self.X_train is not None and self.y_train is not None and self.X_test is not None and self.y_test is not None ): return self.model.get_generalisation_error( self.X_train, self.y_train, self.X_test, self.y_test ) return np.nan # pragma: no cover
[docs] def save(self, path: str = "target", ext: str = "pkl") -> None: """Save the target class to persistent storage. Parameters ---------- path : str Name of the output folder to save target information. ext : str File extension defining the model saved format, e.g., "pkl" or "sav". """ norm_path: str = os.path.normpath(path) filename: str = os.path.normpath(f"{norm_path}/target.yaml") os.makedirs(os.path.dirname(filename), exist_ok=True) # convert Target to dict target: dict = { "dataset_name": self.dataset_name, "dataset_module_path": self.dataset_module_path, "n_samples": self.n_samples, "features": self.features, "n_features": self.n_features, "n_samples_orig": self.n_samples_orig, "generalisation_error": self._ge(), "safemodel": self.safemodel, } # write model details self._save_model(norm_path, ext, target) # write data arrays and add paths self._save_data(norm_path, target) # write yaml with open(filename, "w", encoding="utf-8") as fp: yaml.dump(target, fp, default_flow_style=False, sort_keys=False)
[docs] def load(self, path: str = "target") -> None: """Load the target class from persistent storage. Parameters ---------- path : str Name of the output folder containing a target yaml file. """ target: dict = {} # load yaml filename: str = os.path.normpath(f"{path}/target.yaml") with open(filename, encoding="utf-8") as fp: target = yaml.safe_load(fp) # load modules if "dataset_module_path" in target: self.dataset_module_path = os.path.normpath( f"{path}/{target['dataset_module_path']}" ) # load parameters if "dataset_name" in target: self.dataset_name = target["dataset_name"] logger.info("dataset_name: %s", self.dataset_name) if "n_samples" in target: self.n_samples = target["n_samples"] if "features" in target: features: dict = target["features"] # convert str keys to int self.features = {int(key): value for key, value in features.items()} if "n_features" in target: self.n_features = target["n_features"] logger.info("n_features: %d", self.n_features) if "n_samples_orig" in target: self.n_samples_orig = target["n_samples_orig"] if "safemodel" in target: self.safemodel = target["safemodel"] # load model self._load_model(path, target) # load data self._load_data(path, target)
[docs] def add_safemodel_results(self, data: list) -> None: """Add the results of safemodel disclosure checking. Parameters ---------- data : list The results of safemodel disclosure checking. """ self.safemodel = data
[docs] def has_model(self) -> bool: """Return whether the target has a loaded model.""" return self.model is not None and self.model.model is not None
[docs] def has_data(self) -> bool: """Return whether the target has all processed data.""" return ( self.X_train is not None and self.y_train is not None and self.X_test is not None and self.y_test is not None )
[docs] def has_raw_data(self) -> bool: """Return whether the target has all raw data.""" return ( self.X_orig is not None and self.y_orig is not None and self.X_train_orig is not None and self.y_train_orig is not None and self.X_test_orig is not None and self.y_test_orig is not None )
[docs] def has_probas(self) -> bool: """Return whether the target has all probability data.""" return self.proba_train is not None and self.proba_test is not None
[docs] def get_array_pkl(path: str, name: str): # pragma: no cover """Load a data array from pickle.""" try: with open(path, "rb") as fp: arr = pickle.load(fp) try: logger.info("%s shape: %s", name, arr.shape) except AttributeError: logger.info("%s is a scalar value.", name) except FileNotFoundError as e: raise FileNotFoundError(f"Pickle file not found: {path}") from e except Exception as e: raise ValueError(f"Error loading pickle file {path}: {e}") from None return arr
[docs] def get_array_csv(path: str, name: str): # pragma: no cover """Load a data array from csv.""" try: arr = pd.read_csv(path, header=None).values logger.info("%s shape: %s", name, arr.shape) except FileNotFoundError as e: raise FileNotFoundError(f"CSV file not found: {path}") from e except pd.errors.EmptyDataError as e: raise ValueError(f"CSV file is empty: {path}") from e except pd.errors.ParserError as e: raise ValueError(f"Error parsing CSV file {path}: {e}") from e except Exception as e: raise ValueError(f"Error reading CSV file {path}: {e}") from None return arr