Source code for sacroml.safemodel.safemodel

"""Prototypes of privacy safe model wrappers."""

from __future__ import annotations

import copy
import datetime
import getpass
import json
import logging
import pathlib
import pickle
from pickle import PicklingError
from typing import Any

import joblib
from dictdiffer import diff

from sacroml.attacks.factory import attack
from sacroml.attacks.target import Target

# pylint : disable=too-many-branches
from .reporting import get_reporting_string

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)


[docs] def check_min(key: str, val: Any, cur_val: Any) -> tuple[str, bool]: """Check minimum value constraint. Parameters ---------- key : string The dictionary key to examine. val : Any Type The expected value of the key. cur_val : Any Type The current value of the key. .. Returns ------- msg : string A message string. disclosive : bool A boolean value indicating whether the model is potentially disclosive. """ if isinstance(cur_val, (int, float)): if cur_val < val: disclosive = True msg = get_reporting_string( name="less_than_min_value", key=key, cur_val=cur_val, val=val ) else: disclosive = False msg = "" return msg, disclosive disclosive = True msg = get_reporting_string( name="different_than_recommended_type", key=key, cur_val=cur_val, val=val ) return msg, disclosive
[docs] def check_max(key: str, val: Any, cur_val: Any) -> tuple[str, bool]: """Check maximum value constraint. Parameters ---------- key : string The dictionary key to examine. val : Any Type The expected value of the key. cur_val : Any Type The current value of the key. Returns ------- msg : string A message string. disclosive : bool A boolean value indicating whether the model is potentially disclosive. """ if isinstance(cur_val, (int, float)): if cur_val > val: disclosive = True msg = get_reporting_string( name="greater_than_max_value", key=key, cur_val=cur_val, val=val ) else: disclosive = False msg = "" return msg, disclosive disclosive = True msg = get_reporting_string( name="different_than_recommended_type", key=key, cur_val=cur_val, val=val ) return msg, disclosive
[docs] def check_equal(key: str, val: Any, cur_val: Any) -> tuple[str, bool]: """Check equality value constraint. Parameters ---------- key : string The dictionary key to examine. val : Any Type The expected value of the key. cur_val : Any Type The current value of the key. Returns ------- msg : string A message string. disclosive : bool A boolean value indicating whether the model is potentially disclosive. """ if cur_val != val: disclosive = True msg = get_reporting_string( name="different_than_fixed_value", key=key, cur_val=cur_val, val=val ) else: disclosive = False msg = "" return msg, disclosive
[docs] def check_type(key: str, val: Any, cur_val: Any) -> tuple[str, bool]: """Check the type of a value. Parameters ---------- key : string The dictionary key to examine. val : Any Type The expected value of the key. cur_val : Any Type The current value of the key. Returns ------- msg : string A message string. disclosive : bool A boolean value indicating whether the model is potentially disclosive. """ if type(cur_val).__name__ != val: disclosive = True msg = get_reporting_string( name="different_than_recommended_type", key=key, cur_val=cur_val, val=val ) else: disclosive = False msg = "" return msg, disclosive
[docs] class SafeModel: # pylint: disable = too-many-instance-attributes """Privacy protected model base class. Attributes ---------- model_type : string A string describing the type of model. Default is "None". model: The Machine Learning Model. saved_model: A saved copy of the Machine Learning Model used for comparison. ignore_items : list A list of items to ignore when comparing the model with the saved_model. examine_separately_items : list A list of items to examine separately. These items are more complex datastructures that cannot be compared directly. filename : string A filename to save the model. researcher : string The researcher user-id used for logging Examples -------- >>> safeRFModel = SafeRandomForestClassifier() >>> safeRFModel.fit(X, y) >>> safeRFModel.save(name="safe.pkl") >>> safeRFModel.preliminary_check() >>> safeRFModel.request_release( ... path="safe", ... ext="pkl", ... target=target, ... ) WARNING: model parameters may present a disclosure risk: - parameter min_samples_leaf = 1 identified as less than the recommended min value of 5. Changed parameter min_samples_leaf = 5. Model parameters are within recommended ranges. """
[docs] def __init__(self) -> None: """Super class constructor, gets researcher name.""" self.model_type: str = "None" self.model = None self.saved_model = None self.model_load_file: str = "None" self.model_save_file: str = "None" self.ignore_items: list[str] = [] self.examine_seperately_items: list[str] = [] self.basemodel_paramnames = [] self.filename: str = "None" self.researcher: str = "None" self.timestamp: str = "None" try: self.researcher = getpass.getuser() except (ImportError, KeyError, OSError): # pragma: no cover self.researcher = "unknown"
[docs] def get_params(self, deep: bool = True) -> dict: """Get a dictionary of parameter values restricted to those expected.""" the_params = {} for key, val in self.__dict__.items(): if key in self.basemodel_paramnames: the_params[key] = val if deep: pass # not implemented yet return the_params
[docs] def save(self, name: str = "undefined") -> None: """Write model to file in appropriate format. Note this is overloaded in `SafeKerasClassifer` to deal with tensorflow specifics. Parameters ---------- name : string The name of the file to save. Notes ----- Optimizer is deliberately excluded to prevent possible restart to training and thus possible back door into attacks. """ self.model_save_file = name if self.model_save_file == "undefined": print("You must input a name with extension to save the model.") else: thename = self.model_save_file.split(".") if len(thename) == 1: print("file name must indicate type as a suffix") else: suffix = self.model_save_file.split(".")[-1] # save to pickle if suffix == "pkl" and self.model_type != "KerasModel": with open(self.model_save_file, "wb") as file: try: pickle.dump(self, file) except (TypeError, AttributeError, PicklingError) as type_err: print( "saving a .pkl file is unsupported for model type:" f"{self.model_type}." f"Error message was {type_err}" ) # save to joblib elif suffix == "sav" and self.model_type != "KerasModel": try: joblib.dump(self, self.model_save_file) except (TypeError, AttributeError, PicklingError) as type_err: print( "saving as a .sav (joblib) file is not supported " f"for models of type {self.model_type}." f"Error message was {type_err}" ) else: print( f"{suffix} file suffix currently not supported " f"for models of type {self.model_type}.\n" )
def __get_constraints(self) -> dict: """Get constraints relevant to the model type from the a read-only file.""" rules: dict = {} rule_path = pathlib.Path(__file__).with_name("rules.json") with open(rule_path, encoding="utf-8") as json_file: parsed = json.load(json_file) rules = parsed[self.model_type] return rules["rules"] def __apply_constraints( self, operator: str, key: str, val: Any, cur_val: Any ) -> str: """Apply a safe rule for a given parameter.""" if operator == "is_type": if (val == "int") and (type(cur_val).__name__ == "float"): self.__dict__[key] = int(self.__dict__[key]) msg = get_reporting_string(name="change_param_type", key=key, val=val) elif (val == "float") and (type(cur_val).__name__ == "int"): self.__dict__[key] = float(self.__dict__[key]) msg = get_reporting_string(name="change_param_type", key=key, val=val) else: msg = get_reporting_string( name="not_implemented_for_change", key=key, cur_val=cur_val, val=val ) else: setattr(self, key, val) msg = get_reporting_string(name="changed_param_equal", key=key, val=val) return msg def __check_model_param( self, rule: dict, apply_constraints: bool ) -> tuple[str, bool]: """Check whether a current model parameter violates a safe rule. Optionally fixes violations. """ disclosive: bool = False msg: str = "" operator: str = rule["operator"] key: str = rule["keyword"] val: Any = rule["value"] cur_val: Any = getattr(self, key) if operator == "min": msg, disclosive = check_min(key, val, cur_val) elif operator == "max": msg, disclosive = check_max(key, val, cur_val) elif operator == "equals": msg, disclosive = check_equal(key, val, cur_val) elif operator == "is_type": msg, disclosive = check_type(key, val, cur_val) else: msg = get_reporting_string( name="unknown_operator", key=key, val=val, cur_val=cur_val ) if apply_constraints and disclosive: msg += self.__apply_constraints(operator, key, val, cur_val) return msg, disclosive def __check_model_param_and( self, rule: dict, apply_constraints: bool ) -> tuple[str, bool]: """Check whether current model parameters violate a logical AND rule. Optionally fixes violations. """ disclosive: bool = False msg: str = "" for arg in rule["subexpr"]: temp_msg, temp_disc = self.__check_model_param(arg, apply_constraints) msg += temp_msg if temp_disc: disclosive = True return msg, disclosive def __check_model_param_or(self, rule: dict) -> tuple[str, bool]: """Check whether current model parameters violate a logical OR rule.""" disclosive: bool = True msg: str = "" for arg in rule["subexpr"]: temp_msg, temp_disc = self.__check_model_param(arg, False) msg += temp_msg if not temp_disc: disclosive = False return msg, disclosive
[docs] def preliminary_check( self, verbose: bool = True, apply_constraints: bool = False ) -> tuple[str, bool]: """Check whether current model parameters violate the safe rules. Optionally fixes violations. Parameters ---------- verbose : bool A boolean value to determine increased output level. apply_constraints : bool A boolean to determine whether identified constraints are to be upheld and applied. Returns ------- msg : string A message string. disclosive : bool A boolean value indicating whether the model is potentially disclosive. """ disclosive: bool = False msg: str = "" notok_start = get_reporting_string(name="warn_possible_disclosure_risk") ok_start = get_reporting_string(name="within_recommended_ranges") rules: dict = self.__get_constraints() for rule in rules: operator = rule["operator"] if operator == "and": temp_msg, temp_disc = self.__check_model_param_and( rule, apply_constraints ) elif operator == "or": temp_msg, temp_disc = self.__check_model_param_or(rule) else: temp_msg, temp_disc = self.__check_model_param(rule, apply_constraints) msg += temp_msg if temp_disc: disclosive = True msg = notok_start + msg if disclosive else ok_start + msg if verbose: print("Preliminary checks: " + msg) return msg, disclosive
[docs] def get_current_and_saved_models(self) -> tuple[dict, dict]: """Copy self.__dict__ and split into dicts for current and saved versions.""" current_model = {} attribute_names_as_list = copy.copy(list(self.__dict__.keys())) for key in attribute_names_as_list: if key not in self.ignore_items: try: value = self.__dict__[key] current_model[key] = copy.deepcopy(value) except (copy.Error, TypeError) as key_type: logger.warning("%s cannot be copied", key) logger.warning( "...%s error; %s", str(type(key_type)), str(key_type) ) saved_model = current_model.pop("saved_model", "Absent") # return empty dict if necessary if ( saved_model == "Absent" or saved_model is None or not isinstance(saved_model, dict) ): saved_model = {} else: # final check in case fit has been called twice _ = saved_model.pop("saved_model", "Absent") return current_model, saved_model
[docs] def examine_seperate_items( self, curr_vals: dict, saved_vals: dict ) -> tuple[str, bool]: """Check model-specific items exist in both current and saved copies.""" msg = "" disclosive = False for item in self.examine_seperately_items: if curr_vals[item] == "Absent" and saved_vals[item] == "Absent": disclosive = True msg += get_reporting_string(name="both_item_removed", item=item) if curr_vals[item] == "Absent" and saved_vals[item] != "Absent": msg += get_reporting_string(name="current_item_removed", item=item) disclosive = True if saved_vals[item] == "Absent" and curr_vals[item] != "Absent": disclosive = True msg += get_reporting_string(name="saved_item_removed", item=item) if not disclosive: # ok, so can call mode-specific extra checks msg, disclosive = self.additional_checks(curr_vals, saved_vals) return msg, disclosive
[docs] def posthoc_check(self) -> tuple[str, bool]: """Check whether model has been interfered with since fit() was last run.""" disclosive = False msg = "" current_model, saved_model = self.get_current_and_saved_models() if len(saved_model) == 0: msg = get_reporting_string(name="error_not_called_fit") msg += get_reporting_string(name="recommend_do_not_release") disclosive = True else: # remove things we don't care about for item in self.ignore_items: _ = current_model.pop(item, "Absent") _ = saved_model.pop(item, "Absent") # break out things that need to be handled/examined in more depth curr_separate = {} saved_separate = {} for item in self.examine_seperately_items: curr_separate[item] = current_model.pop(item, "Absent") saved_separate[item] = saved_model.pop(item, "Absent") # comparison on list of "simple" parameters match = list(diff(current_model, saved_model, expand=True)) num_differences = len(match) if num_differences > 0: disclosive = True msg += get_reporting_string( name="basic_params_differ", length=num_differences ) for this_match in match: if this_match[0] == "change": msg += get_reporting_string( name="param_changed_from_to", key=this_match[1], val=this_match[2][1], cur_val=this_match[2][0], ) else: msg += f"{this_match}" # comparison on model-specific attributes extra_msg, extra_disclosive = self.examine_seperate_items( curr_separate, saved_separate ) msg += extra_msg if extra_disclosive: disclosive = True return msg, disclosive
[docs] def additional_checks( self, curr_separate: dict, saved_separate: dict ) -> tuple[str, bool]: """Perform additional posthoc checks. Placeholder function for additional posthoc checks e.g. keras. This version just checks that any lists have the same contents. Parameters ---------- curr_separate : dict saved_separate : dict Returns ------- msg : string A message string. disclosive : bool A boolean value to indicate whether the model is potentially disclosive. Notes ----- Posthoc checking makes sure that the two dicts have the same set of keys as defined in the list self.examine_separately. """ msg = "" disclosive = False for item in self.examine_seperately_items: if isinstance(curr_separate[item], list): if len(curr_separate[item]) != len(saved_separate[item]): msg += ( f"Warning: different counts of values for parameter {item}.\n" ) disclosive = True else: for i in range(len(saved_separate[item])): difference = list( diff(curr_separate[item][i], saved_separate[item][i]) ) if len(difference) > 0: msg += ( f"Warning: at least one non-matching value " f"for parameter list {item}.\n" ) disclosive = True break return msg, disclosive
[docs] def request_release(self, path: str, ext: str, target: Target = None) -> None: """Save model and create a report for the TRE output checkers. Parameters ---------- path : string Path to save the outputs. ext : str File extension defining the model saved format, e.g., "pkl" or "sav". target : attacks.target.Target Contains model and dataset information. Notes ----- If target is not null, then worst case MIA and attribute inference attacks are called via run_attack. """ # perform checks msg_prel, disclosive_prel = self.preliminary_check(verbose=False) msg_post, disclosive_post = self.posthoc_check() # prepare results output: dict = { "researcher": self.researcher, "model_type": self.model_type, "details": msg_prel, } if hasattr(self, "k_anonymity"): output["k_anonymity"] = str(self.k_anonymity) if not disclosive_prel and not disclosive_post: output["recommendation"] = "Proceed to next step of checking" else: output["recommendation"] = "Do not allow release" output["reason"] = msg_prel + msg_post # Run attacks programmatically if possible if target is not None: for attack_name in ["worstcase", "lira", "attribute"]: output[f"{attack_name}_results"] = self.run_attack( target, attack_name, path ) # add timestamp now = datetime.datetime.now() self.timestamp = str(now.strftime("%Y-%m-%d %H:%M:%S")) output["timestamp"] = self.timestamp data = [output] # save output if target is None: target = Target(model=self) target.add_safemodel_results(data) target.save(path, ext)
[docs] def run_attack( self, target: Target, attack_name: str, output_dir: str = "outputs_safemodel", ) -> dict: """Run a specified attack on the trained model and save report to file. Parameters ---------- target : Target The target in the form of a Target object. attack_name : str Name of the attack to run. output_dir : str Name of the directory to store JSON and PDF reports. Returns ------- dict Metadata results. """ try: params = {"output_dir": output_dir} output = attack(target=target, attack_name=attack_name, **params) metadata = output["metadata"] except ValueError: metadata = {} metadata["outcome"] = "unrecognised attack type requested" logger.info("attack %s, metadata %s", attack_name, metadata) return metadata
[docs] def __str__(self) -> str: # pragma: no cover """Return string with model description.""" return self.model_type + " with parameters: " + str(self.__dict__)