mirror of
https://github.com/zama-ai/concrete.git
synced 2026-02-09 03:55:04 -05:00
feat: Update regression benchmarks and examples (#1163)
Use our quantization API, improve datasets, metrics and quantization precisions Closes #972
This commit is contained in:
@@ -4,226 +4,176 @@
|
||||
# flake8: noqa: E501
|
||||
# pylint: disable=C0301
|
||||
|
||||
import numpy as np
|
||||
from common import BENCHMARK_CONFIGURATION
|
||||
from copy import deepcopy
|
||||
from typing import Any, Dict
|
||||
|
||||
import concrete.numpy as hnp
|
||||
import numpy as np
|
||||
from sklearn.datasets import make_regression
|
||||
from sklearn.linear_model import LinearRegression
|
||||
from sklearn.metrics import r2_score
|
||||
from sklearn.model_selection import train_test_split
|
||||
from tqdm import tqdm
|
||||
|
||||
from concrete.quantization import QuantizedArray, QuantizedLinear, QuantizedModule
|
||||
|
||||
|
||||
class QuantizedLinearRegression(QuantizedModule):
|
||||
"""
|
||||
Quantized Generalized Linear Model
|
||||
Building on top of QuantizedModule, implement a quantized linear transformation (w.x + b)
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def from_sklearn(sklearn_model, calibration_data):
|
||||
"""Create a Quantized Linear Regression initialized from a sklearn trained model"""
|
||||
weights = np.expand_dims(sklearn_model.coef_, 1)
|
||||
bias = sklearn_model.intercept_
|
||||
# Quantize with 6 bits for input data, 1 for weights, 1 for the bias and 6 for the output
|
||||
return QuantizedLinearRegression(6, 1, 1, 6, weights, bias, calibration_data)
|
||||
|
||||
def __init__(self, q_bits, w_bits, b_bits, out_bits, weights, bias, calibration_data) -> None:
|
||||
"""
|
||||
Create the linear regression with different quantization bit precisions:
|
||||
|
||||
Quantization Parameters - Number of bits:
|
||||
q_bits (int): bits for input data, insuring that the number of bits of
|
||||
the w . x + b operation does not exceed 7 for the calibration data
|
||||
w_bits (int): bits for weights: in the case of a univariate regression this
|
||||
can be 1
|
||||
b_bits (int): bits for bias (this is a single value so a single bit is enough)
|
||||
out_bits (int): bits for the result of the linear transformation (w.x + b).
|
||||
In our case since the result of the linear transformation is
|
||||
directly decrypted we can use the maximum of 7 bits
|
||||
|
||||
Other parameters:
|
||||
weights: a numpy nd-array of weights (Nxd) where d is the data dimensionality
|
||||
bias: a numpy scalar
|
||||
calibration_data: a numpy nd-array of data (Nxd)
|
||||
"""
|
||||
self.n_bits = out_bits
|
||||
|
||||
# We need to calibrate to a sufficiently low number of bits
|
||||
# so that the output of the Linear layer (w . x + b)
|
||||
# does not exceed 7 bits
|
||||
self.q_calibration_data = QuantizedArray(q_bits, calibration_data)
|
||||
|
||||
# Quantize the weights and create the quantized linear layer
|
||||
q_weights = QuantizedArray(w_bits, weights)
|
||||
q_bias = QuantizedArray(b_bits, bias)
|
||||
q_layer = QuantizedLinear(out_bits, q_weights, q_bias)
|
||||
|
||||
# Store quantized layers
|
||||
quant_layers_dict: Dict[str, Any] = {}
|
||||
|
||||
# Calibrate the linear layer and obtain calibration_data for the next layers
|
||||
calibration_data = self._calibrate_and_store_layers_activation(
|
||||
"linear", q_layer, calibration_data, quant_layers_dict
|
||||
)
|
||||
|
||||
# Finally construct our Module using the quantized layers
|
||||
super().__init__(quant_layers_dict)
|
||||
|
||||
def _calibrate_and_store_layers_activation(
|
||||
self, name, q_function, calibration_data, quant_layers_dict
|
||||
):
|
||||
"""
|
||||
This function calibrates a layer of a quantized module (e.g. linear, inverse-link,
|
||||
activation, etc) by looking at the input data, then computes the output of the quantized
|
||||
version of the layer to be used as input to the following layers
|
||||
"""
|
||||
|
||||
# Calibrate the output of the layer
|
||||
q_function.calibrate(calibration_data)
|
||||
# Store the learned quantized layer
|
||||
quant_layers_dict[name] = q_function
|
||||
# Create new calibration data (output of the previous layer)
|
||||
q_calibration_data = QuantizedArray(self.n_bits, calibration_data)
|
||||
# Dequantize to have the value in clear and ready for next calibration
|
||||
return q_function(q_calibration_data).dequant()
|
||||
|
||||
def quantize_input(self, x):
|
||||
"""Quantize an input set with the quantization parameters determined from calibration"""
|
||||
q_input_arr = deepcopy(self.q_calibration_data)
|
||||
q_input_arr.update_values(x)
|
||||
return q_input_arr
|
||||
|
||||
|
||||
def main():
|
||||
x = np.array(
|
||||
[[69], [130], [110], [100], [145], [160], [185], [200], [80], [50]], dtype=np.float32
|
||||
)
|
||||
y = np.array([181, 325, 295, 268, 400, 420, 500, 520, 220, 120], dtype=np.float32)
|
||||
"""
|
||||
Our linear regression benchmark. Use some synthetic data to train a regression model,
|
||||
then fit a model with sklearn. We quantize the sklearn model and compile it to FHE.
|
||||
We compute the training loss for the quantized and FHE models and compare them. We also
|
||||
predict on a test set and compare FHE results to predictions from the quantized model
|
||||
"""
|
||||
|
||||
class Model:
|
||||
w = None
|
||||
b = None
|
||||
|
||||
def fit(self, x, y):
|
||||
a = np.ones((x.shape[0], x.shape[1] + 1), dtype=np.float32)
|
||||
a[:, 1:] = x
|
||||
|
||||
regularization_contribution = np.identity(x.shape[1] + 1, dtype=np.float32)
|
||||
regularization_contribution[0][0] = 0
|
||||
|
||||
parameters = np.linalg.pinv(a.T @ a + regularization_contribution) @ a.T @ y
|
||||
|
||||
self.b = parameters[0]
|
||||
self.w = parameters[1:].reshape(-1, 1)
|
||||
|
||||
return self
|
||||
|
||||
def evaluate(self, x):
|
||||
return x @ self.w + self.b
|
||||
|
||||
model = Model().fit(x, y)
|
||||
|
||||
class QuantizationParameters:
|
||||
def __init__(self, q, zp, n):
|
||||
self.q = q
|
||||
self.zp = zp
|
||||
self.n = n
|
||||
|
||||
class QuantizedArray:
|
||||
def __init__(self, values, parameters):
|
||||
self.values = np.array(values)
|
||||
self.parameters = parameters
|
||||
|
||||
@staticmethod
|
||||
def of(x, n):
|
||||
if not isinstance(x, np.ndarray):
|
||||
x = np.array(x)
|
||||
|
||||
min_x = x.min()
|
||||
max_x = x.max()
|
||||
|
||||
if min_x == max_x:
|
||||
|
||||
if min_x == 0.0:
|
||||
q_x = 1
|
||||
zp_x = 0
|
||||
x_q = np.zeros(x.shape, dtype=np.uint)
|
||||
|
||||
elif min_x < 0.0:
|
||||
q_x = abs(1 / min_x)
|
||||
zp_x = -1
|
||||
x_q = np.zeros(x.shape, dtype=np.uint)
|
||||
|
||||
else:
|
||||
q_x = 1 / min_x
|
||||
zp_x = 0
|
||||
x_q = np.ones(x.shape, dtype=np.uint)
|
||||
|
||||
else:
|
||||
q_x = (2 ** n - 1) / (max_x - min_x)
|
||||
zp_x = int(round(min_x * q_x))
|
||||
x_q = ((q_x * x) - zp_x).round().astype(np.uint)
|
||||
|
||||
return QuantizedArray(x_q, QuantizationParameters(q_x, zp_x, n))
|
||||
|
||||
def dequantize(self):
|
||||
return (self.values.astype(np.float32) + float(self.parameters.zp)) / self.parameters.q
|
||||
|
||||
def affine(self, w, b, min_y, max_y, n_y):
|
||||
x_q = self.values
|
||||
w_q = w.values
|
||||
b_q = b.values
|
||||
|
||||
q_x = self.parameters.q
|
||||
q_w = w.parameters.q
|
||||
q_b = b.parameters.q
|
||||
|
||||
zp_x = self.parameters.zp
|
||||
zp_w = w.parameters.zp
|
||||
zp_b = b.parameters.zp
|
||||
|
||||
q_y = (2 ** n_y - 1) / (max_y - min_y)
|
||||
zp_y = int(round(min_y * q_y))
|
||||
|
||||
y_q = (q_y / (q_x * q_w)) * (
|
||||
(x_q + zp_x) @ (w_q + zp_w) + (q_x * q_w / q_b) * (b_q + zp_b)
|
||||
)
|
||||
y_q -= min_y * q_y
|
||||
y_q = y_q.round().clip(0, 2 ** n_y - 1).astype(np.uint)
|
||||
|
||||
return QuantizedArray(y_q, QuantizationParameters(q_y, zp_y, n_y))
|
||||
|
||||
class QuantizedFunction:
|
||||
def __init__(self, table):
|
||||
self.table = table
|
||||
|
||||
@staticmethod
|
||||
def of(f, input_bits, output_bits):
|
||||
domain = np.array(range(2 ** input_bits), dtype=np.uint)
|
||||
table = f(domain).round().clip(0, 2 ** output_bits - 1).astype(np.uint)
|
||||
return QuantizedFunction(table)
|
||||
|
||||
parameter_bits = 1
|
||||
|
||||
w_q = QuantizedArray.of(model.w, parameter_bits)
|
||||
b_q = QuantizedArray.of(model.b, parameter_bits)
|
||||
|
||||
input_bits = 6
|
||||
|
||||
x_q = QuantizedArray.of(x, input_bits)
|
||||
|
||||
output_bits = 7
|
||||
|
||||
min_y = y.min()
|
||||
max_y = y.max()
|
||||
|
||||
n_y = output_bits
|
||||
q_y = (2 ** n_y - 1) / (max_y - min_y)
|
||||
zp_y = int(round(min_y * q_y))
|
||||
y_parameters = QuantizationParameters(q_y, zp_y, n_y)
|
||||
|
||||
q_x = x_q.parameters.q
|
||||
q_w = w_q.parameters.q
|
||||
q_b = b_q.parameters.q
|
||||
|
||||
zp_x = x_q.parameters.zp
|
||||
zp_w = w_q.parameters.zp
|
||||
zp_b = b_q.parameters.zp
|
||||
|
||||
x_q = x_q.values
|
||||
w_q = w_q.values
|
||||
b_q = b_q.values
|
||||
|
||||
c1 = q_y / (q_x * q_w)
|
||||
c2 = w_q + zp_w
|
||||
c3 = (q_x * q_w / q_b) * (b_q + zp_b)
|
||||
c4 = min_y * q_y
|
||||
|
||||
f_q = QuantizedFunction.of(
|
||||
lambda intermediate: (c1 * (intermediate + c3)) - c4,
|
||||
input_bits + parameter_bits,
|
||||
output_bits,
|
||||
X, y, _ = make_regression(
|
||||
n_samples=200, n_features=1, n_targets=1, bias=5.0, noise=30.0, random_state=42, coef=True
|
||||
)
|
||||
|
||||
table = hnp.LookupTable([int(entry) for entry in f_q.table])
|
||||
# Split it into train/test and sort the sets for nicer visualization
|
||||
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=42)
|
||||
|
||||
w_0 = int(c2.flatten()[0])
|
||||
sidx = np.argsort(np.squeeze(x_train))
|
||||
x_train = x_train[sidx, :]
|
||||
y_train = y_train[sidx]
|
||||
|
||||
def function_to_compile(x_0):
|
||||
return table[(x_0 + zp_x) * w_0]
|
||||
sidx = np.argsort(np.squeeze(x_test))
|
||||
x_test = x_test[sidx, :]
|
||||
y_test = y_test[sidx]
|
||||
|
||||
inputset = [int(x_i[0]) for x_i in x_q]
|
||||
# Train a linear regression with sklearn and predict on the test data
|
||||
linreg = LinearRegression()
|
||||
linreg.fit(x_train, y_train)
|
||||
|
||||
# Calibrate the model for quantization using both training and test data
|
||||
calib_data = X # np.vstack((x_train, x_test))
|
||||
q_linreg = QuantizedLinearRegression.from_sklearn(linreg, calib_data)
|
||||
|
||||
# Compile the quantized model to FHE
|
||||
# bench: Measure: Compilation Time (ms)
|
||||
engine = hnp.compile_numpy_function(
|
||||
function_to_compile,
|
||||
{"x_0": hnp.EncryptedScalar(hnp.UnsignedInteger(input_bits))},
|
||||
inputset,
|
||||
compilation_configuration=BENCHMARK_CONFIGURATION,
|
||||
)
|
||||
engine = q_linreg.compile(q_linreg.quantize_input(calib_data))
|
||||
# bench: Measure: End
|
||||
|
||||
non_homomorphic_loss = 0
|
||||
homomorphic_loss = 0
|
||||
# Measure test error using the clear-sklearn, the clear-quantized and the FHE quantized model
|
||||
# as R^2 coefficient for the test data
|
||||
|
||||
for i, (x_i, y_i) in enumerate(zip(x_q, y)):
|
||||
x_i = [int(value) for value in x_i]
|
||||
# First, predict using the sklearn classifier
|
||||
y_pred = linreg.predict(x_test)
|
||||
|
||||
non_homomorphic_prediction = (
|
||||
QuantizedArray(x_i, QuantizationParameters(q_x, zp_x, input_bits))
|
||||
.affine(
|
||||
QuantizedArray.of(model.w, parameter_bits),
|
||||
QuantizedArray.of(model.b, parameter_bits),
|
||||
min_y,
|
||||
max_y,
|
||||
output_bits,
|
||||
)
|
||||
.dequantize()[0]
|
||||
)
|
||||
# Now that the model is quantized, predict on the test set
|
||||
x_test_q = q_linreg.quantize_input(x_test)
|
||||
q_y_pred = q_linreg.forward_and_dequant(x_test_q)
|
||||
|
||||
# Now predict using the FHE quantized model on the testing set
|
||||
y_test_pred_fhe = np.zeros_like(x_test)
|
||||
|
||||
for i, x_i in enumerate(tqdm(x_test_q.qvalues)):
|
||||
q_sample = np.expand_dims(x_i, 1).transpose([1, 0]).astype(np.uint8)
|
||||
# bench: Measure: Evaluation Time (ms)
|
||||
homomorphic_prediction = QuantizedArray(engine.run(*x_i), y_parameters).dequantize()
|
||||
q_pred_fhe = engine.run(q_sample)
|
||||
# bench: Measure: End
|
||||
y_test_pred_fhe[i] = q_linreg.dequantize_output(q_pred_fhe)
|
||||
|
||||
non_homomorphic_loss += (non_homomorphic_prediction - y_i) ** 2
|
||||
homomorphic_loss += (homomorphic_prediction - y_i) ** 2
|
||||
# Measure the error for the three versions of the classifier
|
||||
sklearn_r2 = r2_score(y_pred, y_test)
|
||||
non_homomorphic_test_error = r2_score(q_y_pred, y_test)
|
||||
homomorphic_test_error = r2_score(y_test_pred_fhe, y_test)
|
||||
|
||||
print()
|
||||
# Measure the error of the FHE quantized model w.r.t the clear quantized model
|
||||
difference = (
|
||||
abs(homomorphic_test_error - non_homomorphic_test_error) * 100 / non_homomorphic_test_error
|
||||
)
|
||||
|
||||
print(f"input = {x[i][0]}")
|
||||
print(f"output = {y_i:.4f}")
|
||||
|
||||
print(f"non homomorphic prediction = {non_homomorphic_prediction:.4f}")
|
||||
print(f"homomorphic prediction = {homomorphic_prediction:.4f}")
|
||||
|
||||
non_homomorphic_loss /= len(y)
|
||||
homomorphic_loss /= len(y)
|
||||
difference = abs(homomorphic_loss - non_homomorphic_loss) * 100 / non_homomorphic_loss
|
||||
|
||||
print()
|
||||
print(f"Non Homomorphic Loss: {non_homomorphic_loss:.4f}")
|
||||
print(f"Homomorphic Loss: {homomorphic_loss:.4f}")
|
||||
print(f"Sklearn R^2: {sklearn_r2:.4f}")
|
||||
print(f"Non Homomorphic R^2: {non_homomorphic_test_error:.4f}")
|
||||
print(f"Homomorphic R^2: {homomorphic_test_error:.4f}")
|
||||
print(f"Relative Difference Percentage: {difference:.2f}%")
|
||||
|
||||
# bench: Measure: Non Homomorphic Loss = non_homomorphic_loss
|
||||
# bench: Measure: Homomorphic Loss = homomorphic_loss
|
||||
# bench: Measure: Relative Loss Difference Between Homomorphic and Non Homomorphic Implementation (%) = difference
|
||||
# bench: Alert: Relative Loss Difference Between Homomorphic and Non Homomorphic Implementation (%) > 7.5
|
||||
# bench: Measure: Relative Loss Difference (%) = difference
|
||||
# bench: Measure: Homomorphic Test Error = homomorphic_test_error
|
||||
# bench: Alert: Relative Loss Difference (%) > 7.5
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -4,295 +4,208 @@
|
||||
# flake8: noqa: E501
|
||||
# pylint: disable=C0301
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
from common import BENCHMARK_CONFIGURATION
|
||||
from copy import deepcopy
|
||||
from typing import Any, Dict
|
||||
|
||||
import concrete.numpy as hnp
|
||||
import numpy as np
|
||||
from numpy.random import RandomState
|
||||
from sklearn.datasets import make_classification
|
||||
from sklearn.linear_model import LogisticRegression
|
||||
from sklearn.model_selection import train_test_split
|
||||
from tqdm import tqdm
|
||||
|
||||
from concrete.quantization import QuantizedArray, QuantizedLinear, QuantizedModule, QuantizedSigmoid
|
||||
|
||||
|
||||
class QuantizedLogisticRegression(QuantizedModule):
|
||||
"""
|
||||
Quantized Logistic Regression
|
||||
Building on top of QuantizedModule, this class will chain together a linear transformation
|
||||
and an inverse-link function, in this case the logistic function
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def from_sklearn(sklearn_model, calibration_data):
|
||||
"""Create a Quantized Logistic Regression initialized from a sklearn trained model"""
|
||||
if sklearn_model.coef_.ndim == 1:
|
||||
weights = np.expand_dims(sklearn_model.coef_, 1)
|
||||
else:
|
||||
weights = sklearn_model.coef_.transpose()
|
||||
|
||||
bias = sklearn_model.intercept_
|
||||
|
||||
# In our case we have two data dimensions, we the weights precision needs to be 2 bits, as
|
||||
# for now we need the quantized values to be greater than zero for weights
|
||||
# Thus, to insure a maximum of 7 bits in the output of the linear transformation, we choose
|
||||
# 4 bits for the data and the minimum of 1 for the bias
|
||||
return QuantizedLogisticRegression(4, 2, 1, 6, weights, bias, calibration_data)
|
||||
|
||||
def __init__(self, q_bits, w_bits, b_bits, out_bits, weights, bias, calibration_data) -> None:
|
||||
"""
|
||||
Create the Logistic regression with different quantization bit precisions:
|
||||
|
||||
Quantization Parameters - Number of bits:
|
||||
q_bits (int): bits for input data, insuring that the number of bits of
|
||||
the w . x + b operation does not exceed 7 for the calibration data
|
||||
w_bits (int): bits for weights: in the case of a univariate regression this
|
||||
can be 1
|
||||
b_bits (int): bits for bias (this is a single value so a single bit is enough)
|
||||
out_bits (int): bits for the result of the linear transformation (w.x + b).
|
||||
In the case of Logistic Regression the result of the linear
|
||||
transformation is input to a univariate inverse-link function, so
|
||||
this value can be 7
|
||||
|
||||
Other parameters:
|
||||
weights: a numpy nd-array of weights (Nxd) where d is the data dimensionality
|
||||
bias: a numpy scalar
|
||||
calibration_data: a numpy nd-array of data (Nxd)
|
||||
"""
|
||||
self.n_bits = out_bits
|
||||
|
||||
# We need to calibrate to a sufficiently low number of bits
|
||||
# so that the output of the Linear layer (w . x + b)
|
||||
# does not exceed 7 bits
|
||||
self.q_calibration_data = QuantizedArray(q_bits, calibration_data)
|
||||
|
||||
# Quantize the weights and create the quantized linear layer
|
||||
q_weights = QuantizedArray(w_bits, weights)
|
||||
q_bias = QuantizedArray(b_bits, bias)
|
||||
q_layer = QuantizedLinear(out_bits, q_weights, q_bias)
|
||||
|
||||
# Store quantized layers
|
||||
quant_layers_dict: Dict[str, Any] = {}
|
||||
|
||||
# Calibrate the linear layer and obtain calibration_data for the next layers
|
||||
calibration_data = self._calibrate_and_store_layers_activation(
|
||||
"linear", q_layer, calibration_data, quant_layers_dict
|
||||
)
|
||||
|
||||
# Add the inverse-link for inference.
|
||||
# This needs to be quantized since it's computed in FHE,
|
||||
# but we can use 7 bits of output since, in this case,
|
||||
# the result of the inverse-link is not processed by any further layers
|
||||
# Seven bits is the maximum precision but this could be lowered to improve speed
|
||||
# at the possible expense of higher deviance of the regressor
|
||||
q_logit = QuantizedSigmoid(n_bits=7)
|
||||
|
||||
# Now calibrate the inverse-link function with the linear layer's output data
|
||||
calibration_data = self._calibrate_and_store_layers_activation(
|
||||
"invlink", q_logit, calibration_data, quant_layers_dict
|
||||
)
|
||||
|
||||
# Finally construct our Module using the quantized layers
|
||||
super().__init__(quant_layers_dict)
|
||||
|
||||
def _calibrate_and_store_layers_activation(
|
||||
self, name, q_function, calibration_data, quant_layers_dict
|
||||
):
|
||||
"""
|
||||
This function calibrates a layer of a quantized module (e.g. linear, inverse-link,
|
||||
activation, etc) by looking at the input data, then computes the output of the quantized
|
||||
version of the layer to be used as input to the following layers
|
||||
"""
|
||||
|
||||
# Calibrate the output of the layer
|
||||
q_function.calibrate(calibration_data)
|
||||
# Store the learned quantized layer
|
||||
quant_layers_dict[name] = q_function
|
||||
# Create new calibration data (output of the previous layer)
|
||||
q_calibration_data = QuantizedArray(self.n_bits, calibration_data)
|
||||
# Dequantize to have the value in clear and ready for next calibration
|
||||
return q_function(q_calibration_data).dequant()
|
||||
|
||||
def quantize_input(self, x):
|
||||
q_input_arr = deepcopy(self.q_calibration_data)
|
||||
q_input_arr.update_values(x)
|
||||
return q_input_arr
|
||||
|
||||
|
||||
def main():
|
||||
x = torch.tensor(
|
||||
[
|
||||
[1, 1],
|
||||
[1, 1.5],
|
||||
[1.5, 1.2],
|
||||
[1, 2],
|
||||
[2, 1],
|
||||
[4, 1],
|
||||
[4, 1.5],
|
||||
[3.5, 1.8],
|
||||
[3, 2],
|
||||
[4, 2],
|
||||
]
|
||||
).float()
|
||||
y = torch.tensor(
|
||||
[
|
||||
[0],
|
||||
[0],
|
||||
[0],
|
||||
[0],
|
||||
[0],
|
||||
[1],
|
||||
[1],
|
||||
[1],
|
||||
[1],
|
||||
[1],
|
||||
]
|
||||
).float()
|
||||
"""Main benchmark function: generate some synthetic data for two class classification,
|
||||
split train-test, train a sklearn classifier, calibrate and quantize it on the whole dataset
|
||||
then compile it to FHE. Test the three versions of the classifier on the test set and
|
||||
report accuracy"""
|
||||
|
||||
class Model(torch.nn.Module):
|
||||
def __init__(self, n):
|
||||
super().__init__()
|
||||
self.fc = torch.nn.Linear(n, 1)
|
||||
|
||||
def forward(self, x):
|
||||
output = torch.sigmoid(self.fc(x))
|
||||
return output
|
||||
|
||||
model = Model(x.shape[1])
|
||||
|
||||
optimizer = torch.optim.SGD(model.parameters(), lr=1)
|
||||
criterion = torch.nn.BCELoss()
|
||||
|
||||
epochs = 1501
|
||||
for e in range(1, epochs + 1):
|
||||
optimizer.zero_grad()
|
||||
|
||||
out = model(x)
|
||||
loss = criterion(out, y)
|
||||
|
||||
loss.backward()
|
||||
optimizer.step()
|
||||
|
||||
if e % 100 == 1 or e == epochs:
|
||||
print("Epoch:", e, "|", "Loss:", loss.item())
|
||||
|
||||
w = np.array(model.fc.weight.flatten().tolist()).reshape((-1, 1))
|
||||
b = model.fc.bias.flatten().tolist()[0]
|
||||
|
||||
x = x.detach().numpy()
|
||||
y = y.detach().numpy().flatten()
|
||||
|
||||
class QuantizationParameters:
|
||||
def __init__(self, q, zp, n):
|
||||
self.q = q
|
||||
self.zp = zp
|
||||
self.n = n
|
||||
|
||||
def __eq__(self, other):
|
||||
return self.q == other.q and self.zp == other.zp and self.n == other.n
|
||||
|
||||
class QuantizedArray:
|
||||
def __init__(self, values, parameters):
|
||||
self.values = np.array(values)
|
||||
self.parameters = parameters
|
||||
|
||||
@staticmethod
|
||||
def of(x, n):
|
||||
if not isinstance(x, np.ndarray):
|
||||
x = np.array(x)
|
||||
|
||||
min_x = x.min()
|
||||
max_x = x.max()
|
||||
|
||||
if min_x == max_x:
|
||||
|
||||
if min_x == 0.0:
|
||||
q_x = 1
|
||||
zp_x = 0
|
||||
x_q = np.zeros(x.shape, dtype=np.uint)
|
||||
|
||||
elif min_x < 0.0:
|
||||
q_x = abs(1 / min_x)
|
||||
zp_x = -1
|
||||
x_q = np.zeros(x.shape, dtype=np.uint)
|
||||
|
||||
else:
|
||||
q_x = 1 / min_x
|
||||
zp_x = 0
|
||||
x_q = np.ones(x.shape, dtype=np.uint)
|
||||
|
||||
else:
|
||||
q_x = (2 ** n - 1) / (max_x - min_x)
|
||||
zp_x = int(round(min_x * q_x))
|
||||
x_q = ((q_x * x) - zp_x).round().astype(np.uint)
|
||||
|
||||
return QuantizedArray(x_q, QuantizationParameters(q_x, zp_x, n))
|
||||
|
||||
def dequantize(self):
|
||||
return (self.values.astype(np.float32) + float(self.parameters.zp)) / self.parameters.q
|
||||
|
||||
def affine(self, w, b, min_y, max_y, n_y):
|
||||
x_q = self.values
|
||||
w_q = w.values
|
||||
b_q = b.values
|
||||
|
||||
q_x = self.parameters.q
|
||||
q_w = w.parameters.q
|
||||
q_b = b.parameters.q
|
||||
|
||||
zp_x = self.parameters.zp
|
||||
zp_w = w.parameters.zp
|
||||
zp_b = b.parameters.zp
|
||||
|
||||
q_y = (2 ** n_y - 1) / (max_y - min_y)
|
||||
zp_y = int(round(min_y * q_y))
|
||||
|
||||
y_q = (q_y / (q_x * q_w)) * (
|
||||
(x_q + zp_x) @ (w_q + zp_w) + (q_x * q_w / q_b) * (b_q + zp_b)
|
||||
)
|
||||
y_q -= min_y * q_y
|
||||
y_q = y_q.round().clip(0, 2 ** n_y - 1).astype(np.uint)
|
||||
|
||||
return QuantizedArray(y_q, QuantizationParameters(q_y, zp_y, n_y))
|
||||
|
||||
class QuantizedFunction:
|
||||
def __init__(self, table, input_parameters=None, output_parameters=None):
|
||||
self.table = table
|
||||
self.input_parameters = input_parameters
|
||||
self.output_parameters = output_parameters
|
||||
|
||||
@staticmethod
|
||||
def of(f, input_bits, output_bits):
|
||||
domain = np.array(range(2 ** input_bits), dtype=np.uint)
|
||||
table = f(domain).round().clip(0, 2 ** output_bits - 1).astype(np.uint)
|
||||
return QuantizedFunction(table)
|
||||
|
||||
@staticmethod
|
||||
def plain(f, input_parameters, output_bits):
|
||||
n = input_parameters.n
|
||||
|
||||
domain = np.array(range(2 ** n), dtype=np.uint)
|
||||
inputs = QuantizedArray(domain, input_parameters).dequantize()
|
||||
|
||||
outputs = f(inputs)
|
||||
quantized_outputs = QuantizedArray.of(outputs, output_bits)
|
||||
|
||||
table = quantized_outputs.values
|
||||
output_parameters = quantized_outputs.parameters
|
||||
|
||||
return QuantizedFunction(table, input_parameters, output_parameters)
|
||||
|
||||
def apply(self, x):
|
||||
assert x.parameters == self.input_parameters
|
||||
return QuantizedArray(self.table[x.values], self.output_parameters)
|
||||
|
||||
parameter_bits = 1
|
||||
|
||||
w_q = QuantizedArray.of(w, parameter_bits)
|
||||
b_q = QuantizedArray.of(b, parameter_bits)
|
||||
|
||||
input_bits = 5
|
||||
|
||||
x_q = QuantizedArray.of(x, input_bits)
|
||||
|
||||
output_bits = 7
|
||||
|
||||
intermediate = x @ w + b
|
||||
intermediate_q = x_q.affine(w_q, b_q, intermediate.min(), intermediate.max(), output_bits)
|
||||
|
||||
sigmoid = QuantizedFunction.plain(
|
||||
lambda x: 1 / (1 + np.exp(-x)), intermediate_q.parameters, output_bits
|
||||
# Generate some data with a fixed seed
|
||||
X, y = make_classification(
|
||||
n_features=2,
|
||||
n_redundant=0,
|
||||
n_informative=2,
|
||||
random_state=2,
|
||||
n_clusters_per_class=1,
|
||||
n_samples=100,
|
||||
)
|
||||
|
||||
y_q = sigmoid.apply(intermediate_q)
|
||||
y_parameters = y_q.parameters
|
||||
# Scale the data randomly, fixing seeds for reproductibility
|
||||
rng = RandomState(2)
|
||||
X += 2 * rng.uniform(size=X.shape)
|
||||
|
||||
q_x = x_q.parameters.q
|
||||
q_w = w_q.parameters.q
|
||||
q_b = b_q.parameters.q
|
||||
q_intermediate = intermediate_q.parameters.q
|
||||
# Split it into train/test
|
||||
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.4, random_state=42)
|
||||
|
||||
zp_x = x_q.parameters.zp
|
||||
zp_w = w_q.parameters.zp
|
||||
zp_b = b_q.parameters.zp
|
||||
# Train a logistic regression with sklearn on the training set
|
||||
logreg = LogisticRegression()
|
||||
logreg.fit(x_train, y_train)
|
||||
|
||||
x_q = x_q.values
|
||||
w_q = w_q.values
|
||||
b_q = b_q.values
|
||||
# Calibrate the model for quantization using both training and test data
|
||||
calib_data = X
|
||||
q_logreg = QuantizedLogisticRegression.from_sklearn(logreg, calib_data)
|
||||
|
||||
c1 = q_intermediate / (q_x * q_w)
|
||||
c2 = w_q + zp_w
|
||||
c3 = (q_x * q_w / q_b) * (b_q + zp_b)
|
||||
c4 = intermediate.min() * q_intermediate
|
||||
|
||||
def f(x):
|
||||
values = ((c1 * (x + c3)) - c4).round().clip(0, 2 ** output_bits - 1).astype(np.uint)
|
||||
after_affine_q = QuantizedArray(values, intermediate_q.parameters)
|
||||
|
||||
sigmoid = QuantizedFunction.plain(
|
||||
lambda x: 1 / (1 + np.exp(-x)),
|
||||
after_affine_q.parameters,
|
||||
output_bits,
|
||||
)
|
||||
y_q = sigmoid.apply(after_affine_q)
|
||||
|
||||
return y_q.values
|
||||
|
||||
f_q = QuantizedFunction.of(f, output_bits, output_bits)
|
||||
|
||||
table = hnp.LookupTable([int(entry) for entry in f_q.table])
|
||||
|
||||
w_0 = int(c2.flatten()[0])
|
||||
w_1 = int(c2.flatten()[1])
|
||||
|
||||
def function_to_compile(x_0, x_1):
|
||||
return table[((x_0 + zp_x) * w_0) + ((x_1 + zp_x) * w_1)]
|
||||
|
||||
inputset = [(int(x_i[0]), int(x_i[1])) for x_i in x_q]
|
||||
# Now, we can compile our model to FHE, taking as possible input set all of our dataset
|
||||
X_q = q_logreg.quantize_input(X)
|
||||
|
||||
# bench: Measure: Compilation Time (ms)
|
||||
engine = hnp.compile_numpy_function(
|
||||
function_to_compile,
|
||||
{
|
||||
"x_0": hnp.EncryptedScalar(hnp.UnsignedInteger(input_bits)),
|
||||
"x_1": hnp.EncryptedScalar(hnp.UnsignedInteger(input_bits)),
|
||||
},
|
||||
inputset,
|
||||
compilation_configuration=BENCHMARK_CONFIGURATION,
|
||||
)
|
||||
engine = q_logreg.compile(X_q)
|
||||
# bench: Measure: End
|
||||
|
||||
# Start classifier evaluation
|
||||
|
||||
# Test the original classifier
|
||||
y_pred_test = np.asarray(logreg.predict(x_test))
|
||||
|
||||
# Now that the model is quantized, predict on the test set
|
||||
x_test_q = q_logreg.quantize_input(x_test)
|
||||
q_y_score_test = q_logreg.forward_and_dequant(x_test_q)
|
||||
q_y_pred_test = (q_y_score_test > 0.5).astype(np.int32)
|
||||
|
||||
non_homomorphic_correct = 0
|
||||
homomorphic_correct = 0
|
||||
|
||||
for i, (x_i, y_i) in enumerate(zip(x_q, y)):
|
||||
x_i = [int(value) for value in x_i]
|
||||
# Track the samples that are wrongly classified due to quantization issues
|
||||
q_wrong_predictions = np.zeros((0, 2), dtype=X.dtype)
|
||||
|
||||
# Predict the FHE quantized classifier probabilities on the test set.
|
||||
# Compute FHE quantized accuracy, clear-quantized accuracy and
|
||||
# keep track of samples wrongly classified due to quantization
|
||||
for i, x_i in enumerate(tqdm(x_test_q.qvalues)):
|
||||
y_i = y_test[i]
|
||||
|
||||
fhe_in_sample = np.expand_dims(x_i, 1).transpose([1, 0]).astype(np.uint8)
|
||||
|
||||
non_homomorphic_prediction = round(
|
||||
sigmoid.apply(
|
||||
QuantizedArray(x_i, QuantizationParameters(q_x, zp_x, input_bits)).affine(
|
||||
QuantizedArray.of(w, parameter_bits),
|
||||
QuantizedArray.of(b, parameter_bits),
|
||||
intermediate.min(),
|
||||
intermediate.max(),
|
||||
output_bits,
|
||||
)
|
||||
).dequantize()[0]
|
||||
)
|
||||
# bench: Measure: Evaluation Time (ms)
|
||||
homomorphic_prediction = round(QuantizedArray(engine.run(*x_i), y_parameters).dequantize())
|
||||
q_pred_fhe = engine.run(fhe_in_sample)
|
||||
# bench: Measure: End
|
||||
y_score_fhe = q_logreg.dequantize_output(q_pred_fhe)
|
||||
homomorphic_prediction = (y_score_fhe > 0.5).astype(np.int32)
|
||||
|
||||
non_homomorphic_prediction = q_y_pred_test[i]
|
||||
if non_homomorphic_prediction == y_i:
|
||||
non_homomorphic_correct += 1
|
||||
elif y_pred_test[i] == y_i:
|
||||
# If this was a correct prediction with the clear-sklearn classifier
|
||||
q_wrong_predictions = np.vstack((q_wrong_predictions, x_test[i, :]))
|
||||
|
||||
if homomorphic_prediction == y_i:
|
||||
homomorphic_correct += 1
|
||||
|
||||
print()
|
||||
|
||||
print(f"input = {x[i][0]}, {x[i][1]}")
|
||||
print(f"output = {y_i:.4f}")
|
||||
|
||||
print(f"non homomorphic prediction = {non_homomorphic_prediction:.4f}")
|
||||
print(f"homomorphic prediction = {homomorphic_prediction:.4f}")
|
||||
|
||||
non_homomorphic_accuracy = (non_homomorphic_correct / len(y)) * 100
|
||||
homomorphic_accuracy = (homomorphic_correct / len(y)) * 100
|
||||
# Aggregate accuracies for all the versions of the classifier
|
||||
sklearn_acc = np.sum(y_pred_test == y_test) / len(y_test) * 100
|
||||
non_homomorphic_accuracy = (non_homomorphic_correct / len(y_test)) * 100
|
||||
homomorphic_accuracy = (homomorphic_correct / len(y_test)) * 100
|
||||
difference = abs(homomorphic_accuracy - non_homomorphic_accuracy)
|
||||
|
||||
print()
|
||||
print(f"Sklearn accuracy: {sklearn_acc:.4f}")
|
||||
print(f"Non Homomorphic Accuracy: {non_homomorphic_accuracy:.4f}")
|
||||
print(f"Homomorphic Accuracy: {homomorphic_accuracy:.4f}")
|
||||
print(f"Difference Percentage: {difference:.2f}%")
|
||||
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user