Source code for sacroml.safemodel.classifiers.dp_svc

"""Differentially private SVC."""

from __future__ import annotations

import logging

import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

local_logger = logging.getLogger(__file__)
local_logger.setLevel(logging.WARNING)

SMALL_NUMBER = 1e-16  # used to set gamma value if zero to avoid divide by zero

# pylint: disable=too-many-instance-attributes


class DPSVC:
    """Differentially private SVM.

    Implemented according to: https://arxiv.org/pdf/0911.5708.pdf.

    Essentially approximates an infinite-dimensional latent space (and
    corresponding kernel) with a finite dimensional latent space, and adds
    noise to the normal to the separating hyperplane in this latent space.

    Only currently implemented for a radial basis kernel, but could be extended.

    More specifically
    - draws a set of dhat random vectors from a probability measure induced by
      the Fourier transform of the kernel function
    - approximates the kernel with a 2*dhat dimensional latent space
    - computes the separating hyperplane in this latent space with normal w
    - then adds Laplacian noise to w and returns it along with the map to the
      latent space.

    The SKlearn SVM (see
    https://scikit-learn.org/stable/modules/svm.html#mathematical-formulation)
    minimises the function

    (1/2) ||w||_2 + C sum(zeta_i)

    where 1-zeta_i≤ y_i (w phi(x_i) + b), where phi maps x to the latent space
    and zeta_i ≥ 0.

    This is equivalent to minimising

    (1/2) ||w||_2 + C/n sum(l(y_i,f_w(x_i)))

    where l(x,y)=n*max(0,1- x.y), which is n-Lipschitz continuous in y (given x
    is in {-1,1})
    """

    def __init__(
        self,
        C: float = 1.0,
        gamma: str | float = "scale",
        dhat: int = 1000,
        eps: float = 10,
        **kwargs: dict,
    ) -> None:
        self.svc = None
        self.gamma = gamma
        self.dpsvc_gamma = None
        self.dhat = dhat
        self.eps = eps
        self.C = C
        self.lambdaval = None
        self.rho = None
        self.support = None
        self.platt_transform = LogisticRegression()
        self.b = None
        self.classes_ = [0, 1]
        self.intercept = None
        self.noisy_weights = None
        self.set_params(**kwargs)

    def phi_hat(self, input_vector: np.ndarray) -> np.ndarray:
        """Project a single feature."""
        vt1 = (self.rho * input_vector).sum(axis=1)
        vt = (self.dhat ** (-0.5)) * np.column_stack((np.cos(vt1), np.sin(vt1)))
        return vt.reshape(2 * self.dhat)

    def phi_hat_multi(self, input_features: np.ndarray) -> np.ndarray:
        """Compute feature space for a matrix of inputs."""
        n_data, _ = input_features.shape
        phi_hat = np.zeros((n_data, 2 * self.dhat), float)
        for i in range(n_data):
            phi_hat[i, :] = self.phi_hat(input_features[i, :])
        return phi_hat

    def k_hat_svm(self, x: np.ndarray, y: np.ndarray | None = None) -> np.ndarray:
        """Define the version which is sent to sklearn.svm."""
        phi_hat_x = self.phi_hat_multi(x)
        phi_hat_y = phi_hat_x if y is None else self.phi_hat_multi(y)
        return np.dot(phi_hat_x, phi_hat_y.T)

    def fit(self, train_features: np.ndarray, train_labels: np.ndarray) -> None:
        """Fit the model."""
        # Check that the data passed is np.ndarray
        if not isinstance(train_features, np.ndarray) or not isinstance(
            train_labels, np.ndarray
        ):
            raise NotImplementedError("DPSCV needs np.ndarray inputs")

        n_data, n_features = train_features.shape

        # Check the data passed in train_labels
        unique_labels = np.unique(train_labels)
        local_logger.info(unique_labels)
        for label in unique_labels:
            if label not in [0, 1]:
                raise NotImplementedError(
                    (
                        "DP SVC can only handle binary classification with",
                        "labels = 0 and 1",
                    )
                )

        if self.eps > 0:
            self.lambdaval = (2**2.5) * self.C * np.sqrt(self.dhat) / self.eps
        else:
            self.lambdaval = 0

        # Mimic sklearn skale and auto params
        if self.gamma == "scale":
            self.gamma = 1.0 / (n_features * train_features.var())
        elif self.gamma == "auto":
            self.gamma = 1.0 / n_features

        if self.gamma == 0.0:
            self.gamma = SMALL_NUMBER
            local_logger.warning(
                "gamma value passed in was zero, set to %g", SMALL_NUMBER
            )
        # alternative parameterisation
        self.dpsvc_gamma = 1.0 / np.sqrt(2.0 * self.gamma)

        local_logger.info(
            "Gamma = %f (dp parameterisation = %f)", self.gamma, self.dpsvc_gamma
        )

        # Draw dhat random vectors rho from Fourier transform of RBF
        # (which is Gaussian with SD 1/gamma)
        self.rho = np.random.normal(0, 1.0 / self.dpsvc_gamma, (self.dhat, n_features))
        local_logger.info("Sampled rho")

        # Fit support vector machine
        # Create the gram matrix to pass to SVC
        gram_matrix = self.k_hat_svm(train_features)
        local_logger.info("Fitting base SVM")
        self.svc = SVC(kernel="precomputed", C=self.C)
        self.svc.fit(gram_matrix, train_labels)

        # Get separating hyperplane and intercept
        # alpha from solved dual, multiplied by labels (-1,1)
        alpha = self.svc.dual_coef_
        xi = train_features[self.svc.support_, :]  # support vectors x_i
        weights = np.zeros(2 * self.dhat)
        for i in range(alpha.shape[1]):
            weights = weights + alpha[0, i] * self.phi_hat(xi[i, :])

        self.intercept = self.svc.intercept_

        # Add Laplacian noise
        self.noisy_weights = weights + np.random.laplace(
            0, self.lambdaval, len(weights)
        )

        # Logistic transform for predict_proba (rough): generate predictions
        # (DP) for training data
        ypredn = np.zeros(n_data)
        for i in range(n_data):
            ypredn[i] = (
                np.dot(self.phi_hat(train_features[i, :]), self.noisy_weights)
                + self.intercept
            )

        local_logger.info("Fitting Platt scaling")
        self.platt_transform.fit(ypredn.reshape(-1, 1), train_labels)

    def set_params(self, **kwargs: dict) -> None:
        """Set params."""
        for key, value in kwargs.items():
            if key == "gamma":
                self.gamma = value
            elif key == "eps":
                self.eps = value
            elif key == "dhat":
                self.dhat = value
            else:
                local_logger.warning("Unsupported parameter: %s", key)

    def _raw_outputs(self, test_features: np.ndarray) -> np.ndarray:
        """Get the raw output, used by predict and predict_proba."""
        projected_features = self.phi_hat_multi(test_features)
        return np.dot(projected_features, self.noisy_weights) + self.intercept

    def predict(self, test_features: np.ndarray) -> np.ndarray:
        """Return the predictions."""
        out = self._raw_outputs(test_features)
        return 1 * (out > 0)

    def predict_proba(self, test_features: np.ndarray) -> np.ndarray:
        """Return the predictive probabilities."""
        out = self._raw_outputs(test_features)
        return self.platt_transform.predict_proba(out.reshape(-1, 1))