mirror of
https://github.com/zama-ai/concrete.git
synced 2026-02-09 03:55:04 -05:00
513 lines
18 KiB
Python
513 lines
18 KiB
Python
"""Test file for numpy tracing"""
|
|
|
|
from copy import deepcopy
|
|
|
|
import networkx as nx
|
|
import numpy
|
|
import pytest
|
|
|
|
from concrete.common.data_types.floats import Float
|
|
from concrete.common.data_types.integers import Integer
|
|
from concrete.common.debugging import get_printable_graph
|
|
from concrete.common.representation import intermediate as ir
|
|
from concrete.common.values import ClearScalar, ClearTensor, EncryptedScalar, EncryptedTensor
|
|
from concrete.numpy import tracing
|
|
|
|
OPERATIONS_TO_TEST = [ir.Add, ir.Sub, ir.Mul]
|
|
|
|
# Functions from tracing.NPTracer.LIST_OF_SUPPORTED_UFUNC, whose output
|
|
# is a float64, whatever the input type
|
|
LIST_OF_UFUNC_WHOSE_OUTPUT_IS_FLOAT64 = set(
|
|
[
|
|
numpy.arccos,
|
|
numpy.arccosh,
|
|
numpy.arcsin,
|
|
numpy.arcsinh,
|
|
numpy.arctan,
|
|
numpy.arctanh,
|
|
numpy.cbrt,
|
|
numpy.ceil,
|
|
numpy.cos,
|
|
numpy.cosh,
|
|
numpy.deg2rad,
|
|
numpy.degrees,
|
|
numpy.exp,
|
|
numpy.exp2,
|
|
numpy.expm1,
|
|
numpy.fabs,
|
|
numpy.floor,
|
|
numpy.log,
|
|
numpy.log10,
|
|
numpy.log1p,
|
|
numpy.log2,
|
|
numpy.rad2deg,
|
|
numpy.radians,
|
|
numpy.rint,
|
|
numpy.sin,
|
|
numpy.sinh,
|
|
numpy.spacing,
|
|
numpy.sqrt,
|
|
numpy.tan,
|
|
numpy.tanh,
|
|
numpy.trunc,
|
|
]
|
|
)
|
|
|
|
# Functions from tracing.NPTracer.LIST_OF_SUPPORTED_UFUNC, whose output
|
|
# is a boolean, whatever the input type
|
|
LIST_OF_UFUNC_WHOSE_OUTPUT_IS_BOOL = set(
|
|
[
|
|
numpy.isfinite,
|
|
numpy.isinf,
|
|
numpy.isnan,
|
|
numpy.signbit,
|
|
]
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"operation",
|
|
OPERATIONS_TO_TEST,
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"x",
|
|
[
|
|
pytest.param(EncryptedScalar(Integer(64, is_signed=False)), id="x: Encrypted uint"),
|
|
pytest.param(
|
|
EncryptedScalar(Integer(64, is_signed=True)),
|
|
id="x: Encrypted int",
|
|
),
|
|
pytest.param(
|
|
ClearScalar(Integer(64, is_signed=False)),
|
|
id="x: Clear uint",
|
|
),
|
|
pytest.param(
|
|
ClearScalar(Integer(64, is_signed=True)),
|
|
id="x: Clear int",
|
|
),
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"y",
|
|
[
|
|
pytest.param(EncryptedScalar(Integer(64, is_signed=False)), id="y: Encrypted uint"),
|
|
pytest.param(
|
|
EncryptedScalar(Integer(64, is_signed=True)),
|
|
id="y: Encrypted int",
|
|
),
|
|
pytest.param(
|
|
ClearScalar(Integer(64, is_signed=False)),
|
|
id="y: Clear uint",
|
|
),
|
|
pytest.param(
|
|
ClearScalar(Integer(64, is_signed=True)),
|
|
id="y: Clear int",
|
|
),
|
|
],
|
|
)
|
|
def test_numpy_tracing_binary_op(operation, x, y, test_helpers):
|
|
"Test numpy tracing a binary operation (in the supported ops)"
|
|
|
|
# Remark that the functions here have a common structure (which is
|
|
# 2x op y), such that creating further the ref_graph is easy, by
|
|
# hand
|
|
def simple_add_function(x, y):
|
|
z = x + x
|
|
return z + y
|
|
|
|
def simple_sub_function(x, y):
|
|
z = x + x
|
|
return z - y
|
|
|
|
def simple_mul_function(x, y):
|
|
z = x + x
|
|
return z * y
|
|
|
|
assert operation in OPERATIONS_TO_TEST, f"unknown operation {operation}"
|
|
if operation == ir.Add:
|
|
function_to_compile = simple_add_function
|
|
elif operation == ir.Sub:
|
|
function_to_compile = simple_sub_function
|
|
elif operation == ir.Mul:
|
|
function_to_compile = simple_mul_function
|
|
|
|
op_graph = tracing.trace_numpy_function(function_to_compile, {"x": x, "y": y})
|
|
|
|
ref_graph = nx.MultiDiGraph()
|
|
|
|
input_x = ir.Input(x, input_name="x", program_input_idx=0)
|
|
input_y = ir.Input(y, input_name="y", program_input_idx=1)
|
|
|
|
add_node_z = ir.Add(
|
|
(
|
|
input_x.outputs[0],
|
|
input_x.outputs[0],
|
|
)
|
|
)
|
|
|
|
returned_final_node = operation(
|
|
(
|
|
add_node_z.outputs[0],
|
|
input_y.outputs[0],
|
|
)
|
|
)
|
|
|
|
ref_graph.add_node(input_x)
|
|
ref_graph.add_node(input_y)
|
|
ref_graph.add_node(add_node_z)
|
|
ref_graph.add_node(returned_final_node)
|
|
|
|
ref_graph.add_edge(input_x, add_node_z, input_idx=0)
|
|
ref_graph.add_edge(input_x, add_node_z, input_idx=1)
|
|
|
|
ref_graph.add_edge(add_node_z, returned_final_node, input_idx=0)
|
|
ref_graph.add_edge(input_y, returned_final_node, input_idx=1)
|
|
|
|
assert test_helpers.digraphs_are_equivalent(ref_graph, op_graph.graph)
|
|
|
|
|
|
def test_numpy_tracing_tensors():
|
|
"Test numpy tracing tensors"
|
|
|
|
def all_operations(x):
|
|
intermediate = x + numpy.array([[1, 2], [3, 4]])
|
|
intermediate = numpy.array([[5, 6], [7, 8]]) + intermediate
|
|
|
|
intermediate = numpy.array([[100, 200], [300, 400]]) - intermediate
|
|
intermediate = intermediate - numpy.array([[10, 20], [30, 40]])
|
|
|
|
intermediate = intermediate * numpy.array([[1, 2], [2, 1]])
|
|
intermediate = numpy.array([[2, 1], [1, 2]]) * intermediate
|
|
|
|
return intermediate
|
|
|
|
op_graph = tracing.trace_numpy_function(
|
|
all_operations, {"x": EncryptedTensor(Integer(32, True), shape=(2, 2))}
|
|
)
|
|
|
|
expected = """
|
|
%0 = Constant([[2 1] [1 2]]) # ClearTensor<Integer<unsigned, 2 bits>, shape=(2, 2)>
|
|
%1 = Constant([[1 2] [2 1]]) # ClearTensor<Integer<unsigned, 2 bits>, shape=(2, 2)>
|
|
%2 = Constant([[10 20] [30 40]]) # ClearTensor<Integer<unsigned, 6 bits>, shape=(2, 2)>
|
|
%3 = Constant([[100 200] [300 400]]) # ClearTensor<Integer<unsigned, 9 bits>, shape=(2, 2)>
|
|
%4 = Constant([[5 6] [7 8]]) # ClearTensor<Integer<unsigned, 4 bits>, shape=(2, 2)>
|
|
%5 = x # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%6 = Constant([[1 2] [3 4]]) # ClearTensor<Integer<unsigned, 3 bits>, shape=(2, 2)>
|
|
%7 = Add(5, 6) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%8 = Add(7, 4) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%9 = Sub(3, 8) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%10 = Sub(9, 2) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%11 = Mul(10, 1) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%12 = Mul(11, 0) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
return(%12)
|
|
""".lstrip()
|
|
|
|
assert get_printable_graph(op_graph, show_data_types=True) == expected
|
|
|
|
|
|
def test_numpy_explicit_tracing_tensors():
|
|
"Test numpy tracing tensors using explicit operations"
|
|
|
|
def all_explicit_operations(x):
|
|
intermediate = numpy.add(x, numpy.array([[1, 2], [3, 4]]))
|
|
intermediate = numpy.add(numpy.array([[5, 6], [7, 8]]), intermediate)
|
|
|
|
intermediate = numpy.subtract(numpy.array([[100, 200], [300, 400]]), intermediate)
|
|
intermediate = numpy.subtract(intermediate, numpy.array([[10, 20], [30, 40]]))
|
|
|
|
intermediate = numpy.multiply(intermediate, numpy.array([[1, 2], [2, 1]]))
|
|
intermediate = numpy.multiply(numpy.array([[2, 1], [1, 2]]), intermediate)
|
|
|
|
return intermediate
|
|
|
|
op_graph = tracing.trace_numpy_function(
|
|
all_explicit_operations, {"x": EncryptedTensor(Integer(32, True), shape=(2, 2))}
|
|
)
|
|
|
|
expected = """
|
|
%0 = Constant([[2 1] [1 2]]) # ClearTensor<Integer<unsigned, 2 bits>, shape=(2, 2)>
|
|
%1 = Constant([[1 2] [2 1]]) # ClearTensor<Integer<unsigned, 2 bits>, shape=(2, 2)>
|
|
%2 = Constant([[10 20] [30 40]]) # ClearTensor<Integer<unsigned, 6 bits>, shape=(2, 2)>
|
|
%3 = Constant([[100 200] [300 400]]) # ClearTensor<Integer<unsigned, 9 bits>, shape=(2, 2)>
|
|
%4 = Constant([[5 6] [7 8]]) # ClearTensor<Integer<unsigned, 4 bits>, shape=(2, 2)>
|
|
%5 = x # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%6 = Constant([[1 2] [3 4]]) # ClearTensor<Integer<unsigned, 3 bits>, shape=(2, 2)>
|
|
%7 = Add(5, 6) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%8 = Add(7, 4) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%9 = Sub(3, 8) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%10 = Sub(9, 2) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%11 = Mul(10, 1) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
%12 = Mul(11, 0) # EncryptedTensor<Integer<signed, 32 bits>, shape=(2, 2)>
|
|
return(%12)
|
|
""".lstrip()
|
|
|
|
assert get_printable_graph(op_graph, show_data_types=True) == expected
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"function_to_trace,op_graph_expected_output_type,input_and_expected_output_tuples",
|
|
[
|
|
(
|
|
lambda x: x.astype(numpy.int32),
|
|
Integer(32, is_signed=True),
|
|
[
|
|
(14, numpy.int32(14)),
|
|
(1.5, numpy.int32(1)),
|
|
(2.0, numpy.int32(2)),
|
|
(-1.5, numpy.int32(-1)),
|
|
(2 ** 31 - 1, numpy.int32(2 ** 31 - 1)),
|
|
(-(2 ** 31), numpy.int32(-(2 ** 31))),
|
|
],
|
|
),
|
|
(
|
|
lambda x: x.astype(numpy.uint32),
|
|
Integer(32, is_signed=False),
|
|
[
|
|
(14, numpy.uint32(14)),
|
|
(1.5, numpy.uint32(1)),
|
|
(2.0, numpy.uint32(2)),
|
|
(2 ** 32 - 1, numpy.uint32(2 ** 32 - 1)),
|
|
],
|
|
),
|
|
(
|
|
lambda x: x.astype(numpy.int64),
|
|
Integer(64, is_signed=True),
|
|
[
|
|
(14, numpy.int64(14)),
|
|
(1.5, numpy.int64(1)),
|
|
(2.0, numpy.int64(2)),
|
|
(-1.5, numpy.int64(-1)),
|
|
(2 ** 63 - 1, numpy.int64(2 ** 63 - 1)),
|
|
(-(2 ** 63), numpy.int64(-(2 ** 63))),
|
|
],
|
|
),
|
|
(
|
|
lambda x: x.astype(numpy.uint64),
|
|
Integer(64, is_signed=False),
|
|
[
|
|
(14, numpy.uint64(14)),
|
|
(1.5, numpy.uint64(1)),
|
|
(2.0, numpy.uint64(2)),
|
|
(2 ** 64 - 1, numpy.uint64(2 ** 64 - 1)),
|
|
],
|
|
),
|
|
(
|
|
lambda x: x.astype(numpy.float64),
|
|
Float(64),
|
|
[
|
|
(14, numpy.float64(14.0)),
|
|
(1.5, numpy.float64(1.5)),
|
|
(2.0, numpy.float64(2.0)),
|
|
(-1.5, numpy.float64(-1.5)),
|
|
],
|
|
),
|
|
(
|
|
lambda x: x.astype(numpy.float32),
|
|
Float(32),
|
|
[
|
|
(14, numpy.float32(14.0)),
|
|
(1.5, numpy.float32(1.5)),
|
|
(2.0, numpy.float32(2.0)),
|
|
(-1.5, numpy.float32(-1.5)),
|
|
],
|
|
),
|
|
],
|
|
)
|
|
def test_tracing_astype(
|
|
function_to_trace, op_graph_expected_output_type, input_and_expected_output_tuples
|
|
):
|
|
"""Test function for NPTracer.astype"""
|
|
for input_, expected_output in input_and_expected_output_tuples:
|
|
input_value = (
|
|
EncryptedScalar(Integer(64, is_signed=True))
|
|
if isinstance(input_, int)
|
|
else EncryptedScalar(Float(64))
|
|
)
|
|
|
|
op_graph = tracing.trace_numpy_function(function_to_trace, {"x": input_value})
|
|
output_node = op_graph.output_nodes[0]
|
|
assert op_graph_expected_output_type == output_node.outputs[0].dtype
|
|
|
|
node_results = op_graph.evaluate({0: numpy.array(input_)})
|
|
evaluated_output = node_results[output_node]
|
|
assert isinstance(evaluated_output, type(expected_output))
|
|
assert expected_output == evaluated_output
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"inputs,expected_output_node",
|
|
[
|
|
pytest.param(
|
|
{"x": EncryptedScalar(Integer(7, is_signed=False))},
|
|
ir.ArbitraryFunction,
|
|
),
|
|
pytest.param(
|
|
{"x": EncryptedScalar(Integer(32, is_signed=True))},
|
|
ir.ArbitraryFunction,
|
|
),
|
|
pytest.param(
|
|
{"x": EncryptedScalar(Integer(64, is_signed=True))},
|
|
ir.ArbitraryFunction,
|
|
),
|
|
pytest.param(
|
|
{"x": EncryptedScalar(Integer(128, is_signed=True))},
|
|
ir.ArbitraryFunction,
|
|
marks=pytest.mark.xfail(strict=True, raises=NotImplementedError),
|
|
),
|
|
pytest.param(
|
|
{"x": EncryptedScalar(Float(64))},
|
|
ir.ArbitraryFunction,
|
|
),
|
|
],
|
|
)
|
|
def test_trace_numpy_supported_ufuncs(inputs, expected_output_node):
|
|
"""Function to trace supported numpy ufuncs"""
|
|
|
|
for function_to_trace_def in tracing.NPTracer.LIST_OF_SUPPORTED_UFUNC:
|
|
|
|
# We really need a lambda (because numpy functions are not playing
|
|
# nice with inspect.signature), but pylint and flake8 are not happy
|
|
# with it
|
|
# pylint: disable=unnecessary-lambda,cell-var-from-loop
|
|
function_to_trace = lambda x: function_to_trace_def(x) # noqa: E731
|
|
# pylint: enable=unnecessary-lambda,cell-var-from-loop
|
|
|
|
op_graph = tracing.trace_numpy_function(function_to_trace, inputs)
|
|
|
|
assert len(op_graph.output_nodes) == 1
|
|
assert isinstance(op_graph.output_nodes[0], expected_output_node)
|
|
assert len(op_graph.output_nodes[0].outputs) == 1
|
|
|
|
if function_to_trace_def in LIST_OF_UFUNC_WHOSE_OUTPUT_IS_FLOAT64:
|
|
assert op_graph.output_nodes[0].outputs[0] == EncryptedScalar(Float(64))
|
|
elif function_to_trace_def in LIST_OF_UFUNC_WHOSE_OUTPUT_IS_BOOL:
|
|
|
|
# Boolean function
|
|
assert op_graph.output_nodes[0].outputs[0] == EncryptedScalar(
|
|
Integer(8, is_signed=False)
|
|
)
|
|
else:
|
|
|
|
# Function keeping more or less input type
|
|
input_node_type = inputs["x"]
|
|
|
|
expected_output_node_type = deepcopy(input_node_type)
|
|
|
|
expected_output_node_type.dtype.bit_width = max(
|
|
expected_output_node_type.dtype.bit_width, 32
|
|
)
|
|
|
|
assert op_graph.output_nodes[0].outputs[0] == expected_output_node_type
|
|
|
|
|
|
def test_trace_numpy_ufuncs_not_supported():
|
|
"""Testing a failure case of trace_numpy_function"""
|
|
inputs = {"x": EncryptedScalar(Integer(128, is_signed=True))}
|
|
|
|
# We really need a lambda (because numpy functions are not playing
|
|
# nice with inspect.signature), but pylint and flake8 are not happy
|
|
# with it
|
|
# pylint: disable=unnecessary-lambda
|
|
function_to_trace = lambda x: numpy.add.reduce(x) # noqa: E731
|
|
# pylint: enable=unnecessary-lambda
|
|
|
|
with pytest.raises(NotImplementedError) as excinfo:
|
|
tracing.trace_numpy_function(function_to_trace, inputs)
|
|
|
|
assert "Only __call__ method is supported currently" in str(excinfo.value)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"function_to_trace,inputs,expected_output_node,expected_output_value",
|
|
[
|
|
# pylint: disable=unnecessary-lambda
|
|
pytest.param(
|
|
lambda x, y: numpy.dot(x, y),
|
|
{
|
|
"x": EncryptedTensor(Integer(7, is_signed=False), shape=(10,)),
|
|
"y": EncryptedTensor(Integer(7, is_signed=False), shape=(10,)),
|
|
},
|
|
ir.Dot,
|
|
EncryptedScalar(Integer(32, False)),
|
|
),
|
|
pytest.param(
|
|
lambda x, y: numpy.dot(x, y),
|
|
{
|
|
"x": EncryptedTensor(Float(64), shape=(42,)),
|
|
"y": EncryptedTensor(Float(64), shape=(10,)),
|
|
},
|
|
ir.Dot,
|
|
EncryptedScalar(Float(64)),
|
|
),
|
|
pytest.param(
|
|
lambda x, y: numpy.dot(x, y),
|
|
{
|
|
"x": ClearTensor(Integer(64, is_signed=True), shape=(6,)),
|
|
"y": ClearTensor(Integer(64, is_signed=True), shape=(6,)),
|
|
},
|
|
ir.Dot,
|
|
ClearScalar(Integer(64, is_signed=True)),
|
|
),
|
|
pytest.param(
|
|
lambda x: numpy.dot(x, numpy.array([1, 2, 3, 4, 5], dtype=numpy.int64)),
|
|
{
|
|
"x": EncryptedTensor(Integer(64, is_signed=True), shape=(5,)),
|
|
},
|
|
ir.Dot,
|
|
EncryptedScalar(Integer(64, True)),
|
|
),
|
|
# pylint: enable=unnecessary-lambda
|
|
],
|
|
)
|
|
def test_trace_numpy_dot(function_to_trace, inputs, expected_output_node, expected_output_value):
|
|
"""Function to test dot tracing"""
|
|
|
|
op_graph = tracing.trace_numpy_function(function_to_trace, inputs)
|
|
|
|
assert len(op_graph.output_nodes) == 1
|
|
assert isinstance(op_graph.output_nodes[0], expected_output_node)
|
|
assert len(op_graph.output_nodes[0].outputs) == 1
|
|
assert op_graph.output_nodes[0].outputs[0] == expected_output_value
|
|
|
|
|
|
def test_nptracer_get_tracing_func_for_np_functions():
|
|
"""Test NPTracer get_tracing_func_for_np_function"""
|
|
|
|
for np_function in tracing.NPTracer.LIST_OF_SUPPORTED_UFUNC:
|
|
expected_tracing_func = tracing.NPTracer.UFUNC_ROUTING[np_function]
|
|
|
|
assert (
|
|
tracing.NPTracer.get_tracing_func_for_np_function(np_function) == expected_tracing_func
|
|
)
|
|
|
|
|
|
def test_nptracer_get_tracing_func_for_np_functions_not_implemented():
|
|
"""Check NPTracer in case of not-implemented function"""
|
|
with pytest.raises(NotImplementedError) as excinfo:
|
|
tracing.NPTracer.get_tracing_func_for_np_function(numpy.conjugate)
|
|
|
|
assert "NPTracer does not yet manage the following func: conjugate" in str(excinfo.value)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"tracer",
|
|
[
|
|
tracing.NPTracer([], ir.Input(ClearScalar(Integer(32, True)), "x", 0), 0),
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
"operation",
|
|
[
|
|
lambda x: x + "fail",
|
|
lambda x: "fail" + x,
|
|
lambda x: x - "fail",
|
|
lambda x: "fail" - x,
|
|
lambda x: x * "fail",
|
|
lambda x: "fail" * x,
|
|
],
|
|
)
|
|
def test_nptracer_unsupported_operands(operation, tracer):
|
|
"""Test cases where NPTracer cannot be used with other operands."""
|
|
with pytest.raises(TypeError):
|
|
tracer = operation(tracer)
|