feat: end-to-end compilation of a torch model

This commit is contained in:
jfrery
2021-11-23 11:25:53 +01:00
committed by jfrery
parent 13b9ff96f0
commit 1625475897
8 changed files with 231 additions and 33 deletions

View File

@@ -39,7 +39,9 @@ class QuantizedActivation(ABC):
Returns:
numpy.ndarray: Return dequantized input in a numpy array
"""
return (q_input.qvalues - q_input.zero_point) * q_input.scale
# TODO remove this + (-x) when issue #721 is fixed
return (q_input.qvalues + (-q_input.zero_point)) * q_input.scale
def quant_output(self, qoutput_activation: numpy.ndarray) -> QuantizedArray:
"""Quantize the output of the activation function.
@@ -53,9 +55,7 @@ class QuantizedActivation(ABC):
assert self.q_out is not None
qoutput_activation = qoutput_activation / self.q_out.scale + self.q_out.zero_point
qoutput_activation = (
(qoutput_activation).round().clip(0, 2 ** self.q_out.n_bits - 1).astype(int)
)
qoutput_activation = (qoutput_activation).clip(0, 2 ** self.q_out.n_bits - 1).astype(int)
# TODO find a better way to do the following (see issue #832)
q_out = copy.copy(self.q_out)

View File

@@ -4,7 +4,7 @@ from typing import Optional
import numpy
STABILITY_CONST = 10 ** -12
STABILITY_CONST = 10 ** -6
class QuantizedArray:
@@ -28,6 +28,7 @@ class QuantizedArray:
self.n_bits = n_bits
self.is_signed = is_signed
self.scale, self.zero_point, self.qvalues = self.compute_quantization_parameters()
self.n_features = 1 if len(values.shape) <= 1 else values.shape[1]
def __call__(self) -> Optional[numpy.ndarray]:
return self.qvalues
@@ -35,17 +36,23 @@ class QuantizedArray:
def compute_quantization_parameters(self):
"""Compute the quantization parameters."""
# Small constant needed for stability
rmax = numpy.max(self.values) + STABILITY_CONST
rmax = numpy.max(self.values)
rmin = numpy.min(self.values)
scale = (
(rmax - rmin) / ((2 ** self.n_bits - 1 - self.offset) - (-self.offset))
if rmax != rmin
else 1.0
)
zero_point = numpy.round(
(rmax * (-self.offset) - (rmin * (2 ** self.n_bits - 1 - self.offset))) / (rmax - rmin)
)
if rmax - rmin < STABILITY_CONST:
scale = 1
zero_point = rmin
else:
scale = (
(rmax - rmin) / ((2 ** self.n_bits - 1 - self.offset) - (-self.offset))
if rmax != rmin
else 1.0
)
zero_point = numpy.round(
(rmax * (-self.offset) - (rmin * (2 ** self.n_bits - 1 - self.offset)))
/ (rmax - rmin)
).astype(int)
# Compute quantized values and store
qvalues = self.values / scale + zero_point

View File

@@ -21,8 +21,8 @@ class QuantizedLinear:
Args:
n_bits (int): Maximum number of bits for the ouput.
q_weights (QuantizedArray): Quantized weights (n_examples, n_neurons, n_features).
q_bias (QuantizedArray, optional): Quantized bias (n_neurons). Defaults to None.
q_weights (QuantizedArray): Quantized weights (n_features, n_neurons).
q_bias (QuantizedArray, optional): Quantized bias (1, n_neurons). Defaults to None.
"""
self.q_weights = q_weights
self.q_bias = q_bias
@@ -71,7 +71,17 @@ class QuantizedLinear:
matmul = q_input.qvalues @ self.q_weights.qvalues
# Sum operation in full integers resulting in large integers (INTEGERS)
sum_input = self.q_weights.zero_point * numpy.sum(q_input.qvalues, axis=1, keepdims=True)
# [WORKAROUND #995] numpy.sum can't be currently done in our framework
# sum_input = self.q_weights.zero_point * numpy.sum(q_input.qvalues, axis=1, keepdims=True)
# Hack because we can't do numpy.sum(axis...,keepdims...)
const_ones = numpy.ones(shape=(q_input.n_features, 1), dtype=int)
sum_input = self.q_weights.zero_point * (q_input.qvalues @ const_ones)
# Last part that has to be done in FHE the rest must go in a PBS.
# Forced fusing using .astype(numpy.float32)
numpy_q_out = (matmul + (numpy.negative(sum_input))).astype(numpy.float32)
# sum_weights is a constant
sum_weights = q_input.zero_point * numpy.sum(self.q_weights.qvalues, axis=0, keepdims=True)
# Quantization scales and zero points (FLOATS involved)
@@ -82,11 +92,11 @@ class QuantizedLinear:
)
final_term = p * q_input.zero_point * self.q_weights.zero_point
numpy_q_out = matmul - sum_input - sum_weights + final_term
numpy_q_out = numpy_q_out + final_term + (numpy.negative(sum_weights))
numpy_q_out = m_matmul * numpy_q_out
numpy_q_out = self.q_out.zero_point + bias_part + numpy_q_out
numpy_q_out = numpy_q_out.round().clip(0, 2 ** self.q_out.n_bits - 1).astype(int)
numpy_q_out = numpy_q_out.clip(0, 2 ** self.q_out.n_bits - 1).astype(int)
# TODO find a more intuitive way to do the following (see issue #832)
# We should be able to reuse q_out quantization parameters

View File

@@ -1,28 +1,127 @@
"""QuantizedModule API."""
import copy
from typing import Optional, Union
import numpy
from concrete.common.compilation.artifacts import CompilationArtifacts
from concrete.common.compilation.configuration import CompilationConfiguration
from concrete.common.fhe_circuit import FHECircuit
from ..numpy import EncryptedTensor, UnsignedInteger
from ..numpy.compile import compile_numpy_function
from .quantized_array import QuantizedArray
class QuantizedModule:
"""Inference for a quantized model."""
quant_layers_dict: dict
_mode: str
q_input: Optional[QuantizedArray]
forward_fhe: Union[None, FHECircuit]
def __init__(self, quant_layers_dict: dict):
self.quant_layers_dict = copy.deepcopy(quant_layers_dict)
self.compiled = False
self.forward_fhe = None
self.q_input = None
def __call__(self, x: QuantizedArray) -> QuantizedArray:
def __call__(self, x: QuantizedArray):
return self.forward(x)
def forward(self, q_x: QuantizedArray) -> QuantizedArray:
def forward(self, q_x: Union[numpy.ndarray, QuantizedArray]) -> numpy.ndarray:
"""Forward pass with numpy function only.
Args:
q_x (QuantizedArray): QuantizedArray containing the inputs.
q_x (Union[numpy.ndarray, QuantizedArray]): QuantizedArray containing the inputs
or a numpy.array containing the q_values.
In the latter, the stored input parameters
are used:
(q_input.scale, q_input.zero_point).
Returns:
(QuantizedArray): Prediction of the quantized model
(numpy.ndarray): Predictions of the quantized model
"""
# Following "if not" important for compilation as the tracer
# need to fall in it the statement (tracing).
# If the q_x is a numpy module then we reuse self.q_input parameters
# computed during calibration.
# Later we might want to only allow nympy.array input
if not isinstance(q_x, QuantizedArray):
assert self.q_input is not None
self.q_input.update_qvalues(q_x)
q_x = self.q_input
for _, layer in self.quant_layers_dict.items():
q_x = layer(q_x)
return q_x
# mypy compliance
assert isinstance(q_x, QuantizedArray)
return q_x.qvalues
def forward_and_dequant(self, q_x: Union[numpy.ndarray, QuantizedArray]) -> numpy.ndarray:
"""Forward pass with numpy function only plus dequantization.
Args:
q_x (Union[numpy.ndarray, QuantizedArray]): QuantizedArray containing the inputs
or a numpy.array containing the q_values.
In the latter, the stored input parameters
are used:
(q_input.scale, q_input.zero_point).
Returns:
(numpy.ndarray): Predictions of the quantized model
"""
q_out = self.forward(q_x)
return self.dequantize_output(q_out)
def dequantize_output(self, qvalues: numpy.ndarray) -> numpy.ndarray:
"""Take the last layer q_out and use its dequant function.
Args:
qvalues (numpy.ndarray): Quantized values of the last layer.
Returns:
numpy.ndarray: Dequantized values of the last layer.
"""
last_layer = list(self.quant_layers_dict.values())[-1]
real_values = last_layer.q_out.update_qvalues(qvalues)
return real_values
def compile(
self,
q_input: QuantizedArray,
compilation_configuration: Optional[CompilationConfiguration] = None,
compilation_artifacts: Optional[CompilationArtifacts] = None,
) -> FHECircuit:
"""Compile the forward function of the module.
Args:
q_input (QuantizedArray): Needed for tracing and building the boundaries.
compilation_configuration (Optional[CompilationConfiguration]): Configuration object
to use during
compilation
compilation_artifacts (Optional[CompilationArtifacts]): Artifacts object to fill during
compilation
Returns:
bool: Success flag from the compilation.
"""
self.q_input = copy.deepcopy(q_input)
self.forward_fhe = compile_numpy_function(
self.forward,
{
"q_x": EncryptedTensor(
UnsignedInteger(self.q_input.n_bits), shape=(1, *self.q_input.qvalues.shape[1:])
)
},
[
(numpy.expand_dims(arr, 0),)
for arr in self.q_input.qvalues # Super weird formatting
],
compilation_configuration=compilation_configuration,
compilation_artifacts=compilation_artifacts,
)
return self.forward_fhe

View File

@@ -0,0 +1,84 @@
"""Test Neural Networks compilations"""
import numpy
import pytest
from torch import nn
from concrete.quantization import PostTrainingAffineQuantization, QuantizedArray
from concrete.torch import NumpyModule
# INPUT_OUTPUT_FEATURE is the number of input and output of each of the network layers.
# (as well as the input of the network itself)
INPUT_OUTPUT_FEATURE = [1, 2, 3]
class FC(nn.Module):
"""Torch model for the tests"""
def __init__(self, input_output):
super().__init__()
self.fc1 = nn.Linear(in_features=input_output, out_features=input_output)
self.sigmoid1 = nn.Sigmoid()
self.fc2 = nn.Linear(in_features=input_output, out_features=input_output)
def forward(self, x):
"""Forward pass."""
out = self.fc1(x)
out = self.sigmoid1(out)
out = self.fc2(out)
return out
@pytest.mark.parametrize(
"model",
[pytest.param(FC)],
)
@pytest.mark.parametrize(
"input_output_feature",
[pytest.param(input_output_feature) for input_output_feature in INPUT_OUTPUT_FEATURE],
)
def test_quantized_module_compilation(
input_output_feature, model, seed_torch, default_compilation_configuration
):
"""Test a neural network compilation for FHE inference."""
# Seed torch
seed_torch()
n_bits = 2
# Define an input shape (n_examples, n_features)
input_shape = (10, input_output_feature)
# Build a random Quantized Fully Connected Neural Network
# Define the torch model
torch_fc_model = model(input_output_feature)
# Create random input
numpy_input = numpy.random.uniform(-1, 1, size=input_shape)
# Create corresponding numpy model
numpy_fc_model = NumpyModule(torch_fc_model)
# Quantize with post-training static method
post_training_quant = PostTrainingAffineQuantization(n_bits, numpy_fc_model)
quantized_model = post_training_quant.quantize_module(numpy_input)
# Quantize input
q_input = QuantizedArray(n_bits, numpy_input)
quantized_model(q_input)
# Compile
quantized_model.compile(q_input, default_compilation_configuration)
dequant_predictions = quantized_model.forward_and_dequant(q_input)
# Compare predictions between FHE and QuantizedModule
homomorphic_predictions = []
for x_q in q_input.qvalues:
homomorphic_predictions.append(
quantized_model.forward_fhe.run(numpy.array([x_q]).astype(numpy.uint8))
)
homomorphic_predictions = quantized_model.dequantize_output(
numpy.array(homomorphic_predictions, dtype=numpy.float32)
)
homomorphic_predictions.reshape(dequant_predictions.shape)
# Make sure homomorphic_predictions are the same as dequant_predictions
assert numpy.isclose(homomorphic_predictions.ravel(), dequant_predictions.ravel()).all()

View File

@@ -10,7 +10,7 @@ N_BITS_ATOL_TUPLE_LIST = [
(20, 10 ** -2),
(16, 10 ** -1),
(8, 10 ** -0),
(4, 10 ** -0),
(5, 10 ** -0),
]

View File

@@ -16,7 +16,7 @@ N_BITS_LIST = [20, 16, 8]
@pytest.mark.parametrize(
"n_examples, n_features, n_neurons",
[
pytest.param(2, 3, 4),
pytest.param(50, 3, 4),
pytest.param(20, 500, 30),
pytest.param(200, 300, 50),
pytest.param(10000, 100, 1),
@@ -33,7 +33,7 @@ def test_quantized_linear(n_examples, n_features, n_neurons, n_bits, is_signed):
inputs = numpy.random.uniform(size=(n_examples, n_features))
q_inputs = QuantizedArray(n_bits, inputs)
# shape of weights: (n_neurons, n_features)
# shape of weights: (n_features, n_neurons)
weights = numpy.random.uniform(size=(n_features, n_neurons))
q_weights = QuantizedArray(n_bits, weights, is_signed)
@@ -49,7 +49,7 @@ def test_quantized_linear(n_examples, n_features, n_neurons, n_bits, is_signed):
expected_outputs = q_linear.q_out.values
actual_output = q_linear(q_inputs).dequant()
assert numpy.isclose(expected_outputs, actual_output, rtol=10 ** -1).all()
assert numpy.isclose(expected_outputs, actual_output, atol=10 ** -0).all()
# Same test without bias
q_linear = QuantizedLinear(n_bits, q_weights)
@@ -59,4 +59,4 @@ def test_quantized_linear(n_examples, n_features, n_neurons, n_bits, is_signed):
expected_outputs = q_linear.q_out.values
actual_output = q_linear(q_inputs).dequant()
assert numpy.isclose(expected_outputs, actual_output, rtol=10 ** -1).all()
assert numpy.isclose(expected_outputs, actual_output, atol=10 ** -0).all()

View File

@@ -101,9 +101,7 @@ def test_quantized_linear(model, input_shape, n_bits, atol, seed_torch):
quantized_model = post_training_quant.quantize_module(numpy_input)
# Quantize input
q_input = QuantizedArray(n_bits, numpy_input)
# Get quantized prediction
q_prediction = quantized_model(q_input)
# Dequantize to get back to real values
dequant_prediction = q_prediction.dequant()
# Forward and Dequantize to get back to real values
dequant_prediction = quantized_model.forward_and_dequant(q_input)
assert numpy.isclose(numpy_prediction, dequant_prediction, atol=atol).all()