"""Test file for numpy compilation functions""" import itertools import random from copy import deepcopy from functools import partial import numpy import pytest from concrete.common.compilation import CompilationConfiguration from concrete.common.data_types.integers import Integer, UnsignedInteger from concrete.common.debugging import draw_graph, format_operation_graph from concrete.common.extensions.multi_table import MultiLookupTable from concrete.common.extensions.table import LookupTable from concrete.common.values import ClearTensor, EncryptedScalar, EncryptedTensor from concrete.numpy import tracing from concrete.numpy.compile import ( compile_numpy_function, compile_numpy_function_into_op_graph_and_measure_bounds, ) # pylint: disable=too-many-lines def data_gen(args): """Helper to create an inputset""" for prod in itertools.product(*args): yield prod def numpy_array_data_gen(args, tensor_shapes): """Helper to create an inputset containing numpy arrays filled with the same value and of a particular shape""" for prod in itertools.product(*args): yield tuple( numpy.full(tensor_shape, val, numpy.int64) for val, tensor_shape in zip(prod, tensor_shapes) ) def no_fuse_unhandled(x, y): """No fuse unhandled""" x_intermediate = x + 2.8 y_intermediate = y + 9.3 intermediate = x_intermediate - y_intermediate return (intermediate * 1.5).astype(numpy.int32) def identity_lut_generator(n): """Test lookup table""" return lambda x: LookupTable(list(range(2 ** n)))[x] def random_lut_1b(x): """1-bit random table lookup""" # fmt: off table = LookupTable([10, 12]) # fmt: on return table[x] def random_lut_2b(x): """2-bit random table lookup""" # fmt: off table = LookupTable([3, 8, 22, 127]) # fmt: on return table[x] def random_lut_3b(x): """3-bit random table lookup""" # fmt: off table = LookupTable([30, 52, 125, 23, 17, 12, 90, 4]) # fmt: on return table[x] def random_lut_4b(x): """4-bit random table lookup""" # fmt: off table = LookupTable([30, 52, 125, 23, 17, 12, 90, 4, 21, 51, 22, 15, 53, 100, 75, 90]) # fmt: on return table[x] def random_lut_5b(x): """5-bit random table lookup""" # fmt: off table = LookupTable( [ 1, 5, 2, 3, 10, 2, 4, 8, 1, 12, 15, 12, 10, 1, 0, 2, 4, 3, 8, 7, 10, 11, 6, 13, 9, 0, 2, 1, 15, 11, 12, 5 ] ) # fmt: on return table[x] def random_lut_6b(x): """6-bit random table lookup""" # fmt: off table = LookupTable( [ 95, 74, 11, 83, 24, 116, 28, 75, 26, 85, 114, 121, 91, 123, 78, 69, 72, 115, 67, 5, 39, 11, 120, 88, 56, 43, 74, 16, 72, 85, 103, 92, 44, 115, 50, 56, 107, 77, 25, 71, 52, 45, 80, 35, 69, 8, 40, 87, 26, 85, 84, 53, 73, 95, 86, 22, 16, 45, 59, 112, 53, 113, 98, 116 ] ) # fmt: on return table[x] def random_lut_7b(x): """7-bit random table lookup""" # fmt: off table = LookupTable( [ 13, 58, 38, 58, 15, 15, 77, 86, 80, 94, 108, 27, 126, 60, 65, 95, 50, 79, 22, 97, 38, 60, 25, 48, 73, 112, 27, 45, 88, 20, 67, 17, 16, 6, 71, 60, 77, 43, 93, 40, 41, 31, 99, 122, 120, 40, 94, 13, 111, 44, 96, 62, 108, 91, 34, 90, 103, 58, 3, 103, 19, 69, 55, 108, 0, 111, 113, 0, 0, 73, 22, 52, 81, 2, 88, 76, 36, 121, 97, 121, 123, 79, 82, 120, 12, 65, 54, 101, 90, 52, 84, 106, 23, 15, 110, 79, 85, 101, 30, 61, 104, 35, 81, 30, 98, 44, 111, 32, 68, 18, 45, 123, 84, 80, 68, 27, 31, 38, 126, 61, 51, 7, 49, 37, 63, 114, 22, 18, ] ) # fmt: on return table[x] def multi_lut(x): """2-bit multi table lookup""" table = MultiLookupTable( [ [LookupTable([1, 2, 1, 0]), LookupTable([2, 2, 1, 3])], [LookupTable([1, 0, 1, 0]), LookupTable([0, 2, 3, 3])], [LookupTable([0, 2, 3, 0]), LookupTable([2, 1, 2, 0])], ] ) return table[x] def small_fused_table(x): """Test with a small fused table""" return (10 * (numpy.cos(x + 1) + 1)).astype(numpy.uint32) def complicated_topology(x): """Mix x in an intricated way.""" intermediate = x x_p_1 = intermediate + 1 x_p_2 = intermediate + 2 x_p_3 = x_p_1 + x_p_2 return ( x_p_3.astype(numpy.int32), x_p_2.astype(numpy.int32), (x_p_2 + 3).astype(numpy.int32), x_p_3.astype(numpy.int32) + 67, ) # TODO: https://github.com/zama-ai/concretefhe-internal/issues/908 # sotc means Scalar Or Tensor Constructor, this is a workaround for tests as the compiler does not # support computation between scalars and tensors def scalar_or_tensor_constructor(scalar_value, return_scalar): """Constructor for scalars if return_scalar is true""" if return_scalar: return scalar_value return numpy.array([scalar_value]) def mix_x_and_y_and_call_f(func, sotc, x, y): """Create an upper function to test `func`""" z = numpy.abs(sotc(10) * func(x)) z = z / sotc(2) z = z.astype(numpy.int32) + y return z def mix_x_and_y_and_call_f_with_float_inputs(func, sotc, x, y): """Create an upper function to test `func`, with inputs which are forced to be floats""" z = numpy.abs(sotc(10) * func(x + sotc(0.1))) z = z.astype(numpy.int32) + y return z def mix_x_and_y_and_call_f_with_integer_inputs(func, sotc, x, y): """Create an upper function to test `func`, with inputs which are forced to be integers but in a way which is fusable into a TLU""" x = x // sotc(2) a = x + sotc(0.1) a = numpy.rint(a).astype(numpy.int32) z = numpy.abs(sotc(10) * func(a)) z = z.astype(numpy.int32) + y return z def mix_x_and_y_and_call_f_which_expects_small_inputs(func, sotc, x, y): """Create an upper function to test `func`, which expects small values to not use too much precision""" a = numpy.abs(sotc(0.77) * numpy.sin(x)) z = numpy.abs(sotc(3) * func(a)) z = z.astype(numpy.int32) + y return z def mix_x_and_y_and_call_f_which_has_large_outputs(func, sotc, x, y): """Create an upper function to test `func`, which outputs large values""" a = numpy.abs(sotc(2) * numpy.sin(x)) z = numpy.abs(func(a) * sotc(0.131)) z = z.astype(numpy.int32) + y return z def mix_x_and_y_and_call_f_avoid_0_input(func, sotc, x, y): """Create an upper function to test `func`, which makes that inputs are not 0""" a = numpy.abs(sotc(7) * numpy.sin(x)) + sotc(1) c = sotc(100) // a b = sotc(100) / a a = a + b + c z = numpy.abs(sotc(5) * func(a)) z = z.astype(numpy.int32) + y return z def mix_x_and_y_and_call_binary_f_one(func, sotc, c, x, y): """Create an upper function to test `func`""" z = numpy.abs(func(x, c) + sotc(1)) z = z.astype(numpy.uint32) + y return z def mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y): """Create an upper function to test `func`""" z = numpy.abs(func(c, x) + sotc(1)) z = z.astype(numpy.uint32) + y return z def check_is_good_execution(compiler_engine, function, args, verbose=True): """Run several times the check compiler_engine.run(*args) == function(*args). If always wrong, return an error. One can set the expected probability of success of one execution and the number of tests, to finetune the probability of bad luck, ie that we run several times the check and always have a wrong result.""" expected_probability_of_success = 0.95 nb_tries = 5 expected_bad_luck = (1 - expected_probability_of_success) ** nb_tries for i in range(1, nb_tries + 1): if numpy.array_equal(compiler_engine.run(*args), function(*args)): # Good computation after i tries if verbose: print(f"Good computation after {i} tries") return # Bad computation after nb_tries raise AssertionError( f"bad computation after {nb_tries} tries, which was supposed to happen with a " f"probability of {expected_bad_luck}" ) def subtest_compile_and_run_unary_ufunc_correctness( ufunc, upper_function, input_ranges, tensor_shape, default_compilation_configuration, ): """Test correctness of results when running a compiled function""" def get_function(ufunc, upper_function): return lambda x, y: upper_function(ufunc, x, y) function = get_function(ufunc, upper_function) function_parameters = { arg_name: EncryptedTensor(Integer(64, True), shape=tensor_shape) for arg_name in ["x", "y"] } compiler_engine = compile_numpy_function( function, function_parameters, numpy_array_data_gen( tuple(range(x[0], x[1] + 1) for x in input_ranges), [tensor_shape] * len(function_parameters), ), default_compilation_configuration, ) # TODO: https://github.com/zama-ai/concretefhe-internal/issues/910 args = [ numpy.random.randint(low, high, size=tensor_shape, dtype=numpy.uint8) if tensor_shape != () else random.randint(low, high) for (low, high) in input_ranges ] check_is_good_execution(compiler_engine, function, args) def subtest_compile_and_run_binary_ufunc_correctness( ufunc, upper_function, c, input_ranges, tensor_shape, default_compilation_configuration, ): """Test correctness of results when running a compiled function""" def get_function(ufunc, upper_function): return lambda x, y: upper_function(ufunc, c, x, y) function = get_function(ufunc, upper_function) function_parameters = { arg_name: EncryptedTensor(Integer(64, True), shape=tensor_shape) for arg_name in ["x", "y"] } compiler_engine = compile_numpy_function( function, function_parameters, numpy_array_data_gen( tuple(range(x[0], x[1] + 1) for x in input_ranges), [tensor_shape] * len(function_parameters), ), default_compilation_configuration, ) # TODO: https://github.com/zama-ai/concretefhe-internal/issues/910 args = [ numpy.random.randint(low, high, size=tensor_shape, dtype=numpy.uint8) if tensor_shape != () else random.randint(low, high) for (low, high) in input_ranges ] check_is_good_execution(compiler_engine, function, args) @pytest.mark.parametrize( "ufunc", [f for f in tracing.NPTracer.LIST_OF_SUPPORTED_UFUNC if f.nin == 2], ) @pytest.mark.parametrize( "tensor_shape", [pytest.param((), id="scalar"), pytest.param((3, 1, 2), id="tensor")] ) def test_binary_ufunc_operations(ufunc, default_compilation_configuration, tensor_shape): """Test biary functions which are in tracing.NPTracer.LIST_OF_SUPPORTED_UFUNC.""" sotc = partial(scalar_or_tensor_constructor, return_scalar=tensor_shape == ()) run_multi_tlu_test = False if tensor_shape != (): run_multi_tlu_test = True tensor_for_multi_tlu = numpy.arange(numpy.prod(tensor_shape)).reshape(tensor_shape) tensor_for_multi_tlu_small_values = tensor_for_multi_tlu // 2 if ufunc in [numpy.power, numpy.float_power]: # Need small constants to keep results really small subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_one(func, sotc, c, x, y), sotc(3), ((0, 4), (0, 5)), tensor_shape, default_compilation_configuration, ) subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y), sotc(2), ((0, 4), (0, 5)), tensor_shape, default_compilation_configuration, ) if run_multi_tlu_test: subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_one(func, sotc, c, x, y), tensor_for_multi_tlu_small_values, ((0, 4), (0, 5)), tensor_shape, default_compilation_configuration, ) subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y), tensor_for_multi_tlu_small_values, ((0, 4), (0, 5)), tensor_shape, default_compilation_configuration, ) elif ufunc in [numpy.floor_divide, numpy.fmod, numpy.remainder, numpy.true_divide]: subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y), sotc(31), ((1, 5), (1, 5)), tensor_shape, default_compilation_configuration, ) if run_multi_tlu_test: subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y), tensor_for_multi_tlu, ((1, 5), (1, 5)), tensor_shape, default_compilation_configuration, ) elif ufunc in [numpy.lcm, numpy.left_shift]: # Need small constants to keep results sufficiently small subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_one(func, sotc, c, x, y), sotc(3), ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y), sotc(2), ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) if run_multi_tlu_test: subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_one(func, sotc, c, x, y), tensor_for_multi_tlu if ufunc != numpy.left_shift else tensor_for_multi_tlu_small_values, ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y), tensor_for_multi_tlu if ufunc != numpy.left_shift else tensor_for_multi_tlu_small_values, ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) elif ufunc in [numpy.ldexp]: # Need small constants to keep results sufficiently small subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y), sotc(2), ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) if run_multi_tlu_test: subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y), tensor_for_multi_tlu // 2, ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) else: # General case subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_one(func, sotc, c, x, y), sotc(41), ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y), sotc(42), ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) if run_multi_tlu_test: subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_one(func, sotc, c, x, y), tensor_for_multi_tlu, ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) subtest_compile_and_run_binary_ufunc_correctness( ufunc, lambda func, c, x, y: mix_x_and_y_and_call_binary_f_two(func, sotc, c, x, y), tensor_for_multi_tlu, ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) @pytest.mark.parametrize( "ufunc", [f for f in tracing.NPTracer.LIST_OF_SUPPORTED_UFUNC if f.nin == 1] ) @pytest.mark.parametrize( "tensor_shape", [pytest.param((), id="scalar"), pytest.param((3, 1, 2), id="tensor")] ) def test_unary_ufunc_operations(ufunc, default_compilation_configuration, tensor_shape): """Test unary functions which are in tracing.NPTracer.LIST_OF_SUPPORTED_UFUNC.""" sotc = partial(scalar_or_tensor_constructor, return_scalar=tensor_shape == ()) if ufunc in [ numpy.degrees, numpy.rad2deg, ]: # Need to reduce the output value, to avoid to need too much precision subtest_compile_and_run_unary_ufunc_correctness( ufunc, lambda func, x, y: mix_x_and_y_and_call_f_which_has_large_outputs(func, sotc, x, y), ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) elif ufunc in [ numpy.negative, ]: # Need to turn the input into a float subtest_compile_and_run_unary_ufunc_correctness( ufunc, lambda func, x, y: mix_x_and_y_and_call_f_with_float_inputs(func, sotc, x, y), ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) elif ufunc in [ numpy.arccosh, numpy.log, numpy.log2, numpy.log10, numpy.reciprocal, ]: # No 0 in the domain of definition subtest_compile_and_run_unary_ufunc_correctness( ufunc, lambda func, x, y: mix_x_and_y_and_call_f_avoid_0_input(func, sotc, x, y), ((1, 5), (1, 5)), tensor_shape, default_compilation_configuration, ) elif ufunc in [ numpy.cosh, numpy.exp, numpy.exp2, numpy.expm1, numpy.square, numpy.arccos, numpy.arcsin, numpy.arctanh, numpy.sinh, ]: # Need a small range of inputs, to avoid to need too much precision subtest_compile_and_run_unary_ufunc_correctness( ufunc, lambda func, x, y: mix_x_and_y_and_call_f_which_expects_small_inputs(func, sotc, x, y), ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) else: # Regular case for univariate functions subtest_compile_and_run_unary_ufunc_correctness( ufunc, lambda func, x, y: mix_x_and_y_and_call_f(func, sotc, x, y), ((0, 5), (0, 5)), tensor_shape, default_compilation_configuration, ) @pytest.mark.parametrize( "function,input_ranges,list_of_arg_names", [ pytest.param(lambda x: x + 42, ((-5, 5),), ["x"]), pytest.param(lambda x, y: x + y + 8, ((2, 10), (4, 8)), ["x", "y"]), pytest.param(lambda x, y: (x + 1, y + 10), ((-1, 1), (3, 8)), ["x", "y"]), pytest.param( lambda x, y, z: (x + y + 1 - z, x * y + 42, z, z + 99), ((4, 8), (3, 4), (0, 4)), ["x", "y", "z"], ), pytest.param(complicated_topology, ((0, 10),), ["x"]), ], ) def test_compile_function_multiple_outputs( function, input_ranges, list_of_arg_names, default_compilation_configuration ): """Test function compile_numpy_function_into_op_graph for a program with multiple outputs""" def data_gen_local(args): for prod in itertools.product(*args): yield tuple(numpy.array(val) for val in prod) function_parameters = { arg_name: EncryptedScalar(Integer(64, True)) for arg_name in list_of_arg_names } op_graph = compile_numpy_function_into_op_graph_and_measure_bounds( function, function_parameters, data_gen_local(tuple(range(x[0], x[1] + 1) for x in input_ranges)), default_compilation_configuration, ) # TODO: For the moment, we don't have really checks, but some printfs. Later, # when we have the converter, we can check the MLIR draw_graph(op_graph, show=False) str_of_the_graph = format_operation_graph(op_graph) print(f"\n{str_of_the_graph}\n") @pytest.mark.parametrize( "function,input_ranges,list_of_arg_names", [ pytest.param(lambda x: (-27) + 4 * (x + 8), ((0, 10),), ["x"]), pytest.param(lambda x: x + (-33), ((40, 60),), ["x"]), pytest.param(lambda x: 17 - (0 - x), ((0, 10),), ["x"]), pytest.param(lambda x: 42 + x * (-3), ((0, 10),), ["x"]), pytest.param(lambda x: 43 + (-4) * x, ((0, 10),), ["x"]), pytest.param(lambda x: 3 - (-5) * x, ((0, 10),), ["x"]), pytest.param(lambda x: (-2) * (-5) * x, ((0, 10),), ["x"]), pytest.param(lambda x: (-2) * x * (-5), ((0, 10),), ["x"]), pytest.param(lambda x, y: 40 - (-3 * x) + (-2 * y), ((0, 20), (0, 20)), ["x", "y"]), pytest.param(lambda x: x + numpy.int32(42), ((0, 10),), ["x"]), pytest.param(lambda x: x + 64, ((0, 10),), ["x"]), pytest.param(lambda x: x * 3, ((0, 40),), ["x"]), pytest.param(lambda x: 120 - x, ((40, 80),), ["x"]), pytest.param(lambda x, y: x + y + 64, ((0, 20), (0, 20)), ["x", "y"]), pytest.param(lambda x, y: 100 - y + x, ((0, 20), (0, 20)), ["x", "y"]), pytest.param(lambda x, y: 50 - y * 2 + x, ((0, 20), (0, 20)), ["x", "y"]), pytest.param(lambda x: -x + 50, ((0, 20),), ["x"]), pytest.param(lambda x: numpy.dot(x, 2), ((0, 20),), ["x"]), pytest.param(lambda x: numpy.dot(2, x), ((0, 20),), ["x"]), ], ) def test_compile_and_run_correctness( function, input_ranges, list_of_arg_names, default_compilation_configuration ): """Test correctness of results when running a compiled function""" function_parameters = { arg_name: EncryptedScalar(Integer(64, False)) for arg_name in list_of_arg_names } compiler_engine = compile_numpy_function( function, function_parameters, data_gen(tuple(range(x[0], x[1] + 1) for x in input_ranges)), default_compilation_configuration, ) args = [random.randint(low, high) for (low, high) in input_ranges] assert compiler_engine.run(*args) == function(*args) @pytest.mark.parametrize( "function,input_ranges,list_of_arg_names", [ pytest.param(lambda x: x ** 2, ((0, 10),), ["x"]), pytest.param(lambda x: 2 ** (x % 5), ((0, 20),), ["x"]), # FIXME: Fails, #949 pytest.param(lambda x: abs(~x), ((0, 13),), ["x"]), pytest.param(lambda x: x << 1, ((0, 13),), ["x"]), # FIXME: Fails, #949 pytest.param(lambda x: 2 << (x % 6), ((0, 13),), ["x"]), pytest.param(lambda x: x >> 2, ((30, 100),), ["x"]), pytest.param(lambda x: 115 >> (x % 3), ((0, 17),), ["x"]), pytest.param(lambda x: x % 7, ((0, 100),), ["x"]), pytest.param(lambda x: x > 7, ((0, 20),), ["x"]), pytest.param(lambda x: x < 11, ((0, 20),), ["x"]), pytest.param(lambda x: x >= 8, ((0, 20),), ["x"]), pytest.param(lambda x: x <= 10, ((0, 20),), ["x"]), pytest.param(lambda x: x == 15, ((0, 20),), ["x"]), pytest.param(lambda x: x & 14, ((0, 20),), ["x"]), pytest.param(lambda x: x | 18, ((0, 20),), ["x"]), pytest.param(lambda x: x ^ 23, ((0, 20),), ["x"]), pytest.param(lambda x: x % 3, ((0, 20),), ["x"]), pytest.param(lambda x: 17 & x, ((0, 20),), ["x"]), pytest.param(lambda x: 19 | x, ((0, 20),), ["x"]), pytest.param(lambda x: 45 ^ x, ((0, 20),), ["x"]), pytest.param(lambda x: 19 % (x + 1), ((0, 20),), ["x"]), ], ) def test_compile_and_run_correctness__for_prog_with_tlu( function, input_ranges, list_of_arg_names, default_compilation_configuration ): """Test correctness of results when running a compiled function which uses a TLU""" function_parameters = { arg_name: EncryptedScalar(Integer(64, False)) for arg_name in list_of_arg_names } compiler_engine = compile_numpy_function( function, function_parameters, data_gen(tuple(range(x[0], x[1] + 1) for x in input_ranges)), default_compilation_configuration, ) for _ in range(16): args = [random.randint(low, high) for (low, high) in input_ranges] check_is_good_execution(compiler_engine, function, args, verbose=False) @pytest.mark.parametrize( "function,parameters,inputset,test_input,expected_output", [ pytest.param( lambda x: x + 1, { "x": EncryptedTensor(UnsignedInteger(3), shape=(3, 2)), }, [(numpy.random.randint(0, 2 ** 3, size=(3, 2)),) for _ in range(10)], ( [ [0, 7], [6, 1], [2, 5], ], ), [ [1, 8], [7, 2], [3, 6], ], ), pytest.param( lambda x: x + numpy.array([[1, 0], [2, 0], [3, 1]], dtype=numpy.uint32), { "x": EncryptedTensor(UnsignedInteger(3), shape=(3, 2)), }, [(numpy.random.randint(0, 2 ** 3, size=(3, 2)),) for _ in range(10)], ( [ [0, 7], [6, 1], [2, 5], ], ), [ [1, 7], [8, 1], [5, 6], ], ), pytest.param( lambda x, y: x + y, { "x": EncryptedTensor(UnsignedInteger(3), shape=(3, 2)), "y": EncryptedScalar(UnsignedInteger(3)), }, [ ( numpy.random.randint(0, 2 ** 3, size=(3, 2)), random.randint(0, (2 ** 3) - 1), ) for _ in range(10) ], ( [ [0, 7], [6, 1], [2, 5], ], 2, ), [ [2, 9], [8, 3], [4, 7], ], ), pytest.param( lambda x, y: x + y, { "x": EncryptedTensor(UnsignedInteger(3), shape=(3, 2)), "y": EncryptedTensor(UnsignedInteger(3), shape=(3, 2)), }, [ ( numpy.random.randint(0, 2 ** 3, size=(3, 2)), numpy.random.randint(0, 2 ** 3, size=(3, 2)), ) for _ in range(10) ], ( [ [0, 7], [6, 1], [2, 5], ], [ [1, 6], [2, 5], [3, 4], ], ), [ [1, 13], [8, 6], [5, 9], ], ), pytest.param( lambda x: 100 - x, { "x": EncryptedTensor(UnsignedInteger(3), shape=(3, 2)), }, [(numpy.random.randint(0, 2 ** 3, size=(3, 2)),) for _ in range(10)], ( [ [0, 7], [6, 1], [2, 5], ], ), [ [100, 93], [94, 99], [98, 95], ], ), pytest.param( lambda x: numpy.array([[10, 15], [20, 15], [10, 30]], dtype=numpy.uint32) - x, { "x": EncryptedTensor(UnsignedInteger(3), shape=(3, 2)), }, [(numpy.random.randint(0, 2 ** 3, size=(3, 2)),) for _ in range(10)], ( [ [0, 7], [6, 1], [2, 5], ], ), [ [10, 8], [14, 14], [8, 25], ], ), pytest.param( lambda x: x * 2, { "x": EncryptedTensor(UnsignedInteger(3), shape=(3, 2)), }, [(numpy.random.randint(0, 2 ** 3, size=(3, 2)),) for _ in range(10)], ( [ [0, 7], [6, 1], [2, 5], ], ), [ [0, 14], [12, 2], [4, 10], ], ), pytest.param( lambda x: x * numpy.array([[1, 2], [2, 1], [3, 1]], dtype=numpy.uint32), { "x": EncryptedTensor(UnsignedInteger(3), shape=(3, 2)), }, [(numpy.random.randint(0, 2 ** 3, size=(3, 2)),) for _ in range(10)], ( [ [4, 7], [6, 1], [2, 5], ], ), [ [4, 14], [12, 1], [6, 5], ], ), pytest.param( lambda x: LookupTable([2, 1, 3, 0])[x], { "x": EncryptedTensor(UnsignedInteger(2), shape=(3, 2)), }, [(numpy.random.randint(0, 2 ** 2, size=(3, 2)),) for _ in range(10)], ( [ [0, 1], [2, 1], [3, 0], ], ), [ [2, 1], [3, 1], [0, 2], ], ), pytest.param( lambda x: numpy.dot(x, 2), { "x": EncryptedTensor(UnsignedInteger(3), shape=(3,)), }, [(numpy.random.randint(0, 2 ** 3, size=(3,)),) for _ in range(10)], ([2, 7, 1],), [4, 14, 2], ), pytest.param( lambda x: numpy.dot(2, x), { "x": EncryptedTensor(UnsignedInteger(3), shape=(3,)), }, [(numpy.random.randint(0, 2 ** 3, size=(3,)),) for _ in range(10)], ([2, 7, 1],), [4, 14, 2], ), ], ) def test_compile_and_run_tensor_correctness( function, parameters, inputset, test_input, expected_output, default_compilation_configuration ): """Test correctness of results when running a compiled function with tensor operators""" circuit = compile_numpy_function( function, parameters, inputset, default_compilation_configuration, ) numpy_test_input = ( item if isinstance(item, int) else numpy.array(item, dtype=numpy.uint8) for item in test_input ) assert numpy.array_equal( circuit.run(*numpy_test_input), numpy.array(expected_output, dtype=numpy.uint8), ) @pytest.mark.parametrize( "size, input_range", [ pytest.param( 1, (0, 8), ), pytest.param( 4, (0, 5), ), pytest.param( 6, (0, 4), ), pytest.param( 10, (0, 3), ), ], ) def test_compile_and_run_dot_correctness(size, input_range, default_compilation_configuration): """Test correctness of results when running a compiled function""" low, high = input_range shape = (size,) inputset = [ (numpy.zeros(shape, dtype=numpy.uint32), numpy.zeros(shape, dtype=numpy.uint32)), ( numpy.ones(shape, dtype=numpy.uint32) * high, numpy.ones(shape, dtype=numpy.uint32) * high, ), ] for _ in range(8): inputset.append((numpy.random.randint(low, high + 1), numpy.random.randint(low, high + 1))) function_parameters = { "x": EncryptedTensor(Integer(64, False), shape), "y": ClearTensor(Integer(64, False), shape), } def function(x, y): return numpy.dot(x, y) compiler_engine = compile_numpy_function( function, function_parameters, inputset, default_compilation_configuration, ) args = [numpy.random.randint(low, high + 1, size=(size,), dtype=numpy.uint8) for __ in range(2)] assert compiler_engine.run(*args) == function(*args) @pytest.mark.parametrize( "size, input_range_x, input_range_y,modulus", [ pytest.param(6, (0, 3), (-3, 3), 32), pytest.param(3, (0, 3), (-7, 7), 64), ], ) def test_compile_and_run_dot_correctness_with_signed_cst( size, input_range_x, input_range_y, default_compilation_configuration, modulus ): """Test correctness of dot with signed constant tensor. Remark that for now, the results are only correct modulo modulus""" low_x, high_x = input_range_x low_y, high_y = input_range_y shape = (size,) # Check that never, the dot goes too high # For this, we simplify our check knowing that low_x >= 0. Under this condition, the maximal # value is for the dot is size * max(abs(high_x * low_y), abs(high_x * high_y)). And we want # is to be less than 64, to have a signed value on strictly less than 8b assert low_x >= 0 assert size * max(abs(high_x * low_y), abs(high_x * high_y)) < 64 function_parameters = { "x": EncryptedTensor(Integer(64, False), shape), } constant1 = numpy.random.randint(low_y, high_y + 1, size=(size,)) constant2 = numpy.random.randint(low_y, high_y + 1, size=(size,)) worst_x_1_1 = numpy.where(constant1 < 0, 0, high_x) worst_x_1_2 = numpy.where(constant1 > 0, 0, high_x) worst_x_2_1 = numpy.where(constant2 < 0, 0, high_x) worst_x_2_2 = numpy.where(constant2 > 0, 0, high_x) for i in range(2): inputset = [ (numpy.zeros(shape, dtype=numpy.uint32),), (numpy.ones(shape, dtype=numpy.uint32) * low_x,), (numpy.ones(shape, dtype=numpy.uint32) * high_x,), ] for _ in range(128): inputset.append((numpy.random.randint(low_x, high_x + 1),)) if i == 0: def function(x): return numpy.dot(x, constant1) inputset.extend([(worst_x_1_1,), (worst_x_1_2,)]) else: def function(x): return numpy.dot(constant2, x) inputset.extend([(worst_x_2_1,), (worst_x_2_2,)]) compiler_engine = compile_numpy_function( function, function_parameters, inputset, default_compilation_configuration ) for _ in range(5): args = [ numpy.random.randint(low_x, high_x + 1, size=(size,), dtype=numpy.uint8), ] assert check_equality_modulo(compiler_engine.run(*args), function(*args), modulus) @pytest.mark.parametrize( "size,input_range", [ pytest.param( 1, (0, 8), ), pytest.param( 4, (0, 5), ), pytest.param( 6, (0, 4), ), pytest.param( 10, (0, 3), ), ], ) def test_compile_and_run_constant_dot_correctness( size, input_range, default_compilation_configuration ): """Test correctness of results when running a compiled function""" low, high = input_range shape = (size,) inputset = [ (numpy.zeros(shape, dtype=numpy.uint32),), (numpy.ones(shape, dtype=numpy.uint32) * high,), ] for _ in range(8): inputset.append((numpy.random.randint(low, high + 1),)) constant = numpy.random.randint(low, high + 1, size=shape) def left(x): return numpy.dot(x, constant) def right(x): return numpy.dot(constant, x) left_circuit = compile_numpy_function( left, {"x": EncryptedTensor(Integer(64, False), shape)}, inputset, default_compilation_configuration, ) right_circuit = compile_numpy_function( right, {"x": EncryptedTensor(Integer(64, False), shape)}, inputset, default_compilation_configuration, ) args = (numpy.random.randint(low, high + 1, size=shape, dtype=numpy.uint8),) assert left_circuit.run(*args) == left(*args) assert right_circuit.run(*args) == right(*args) @pytest.mark.parametrize( "lhs_shape,rhs_shape,input_range", [ pytest.param( (3, 2), (2, 3), (0, 4), ), pytest.param( (1, 2), (2, 1), (0, 4), ), pytest.param( (3, 3), (3, 3), (0, 4), ), pytest.param( (2, 1), (1, 2), (0, 8), ), ], ) def test_compile_and_run_matmul_correctness( lhs_shape, rhs_shape, input_range, default_compilation_configuration ): """Test correctness of results when running a compiled function""" low, high = input_range inputset = [ (numpy.zeros(lhs_shape, dtype=numpy.uint32),), (numpy.ones(lhs_shape, dtype=numpy.uint32) * high,), ] for _ in range(8): inputset.append((numpy.random.randint(low, high + 1, size=lhs_shape),)) constant = numpy.random.randint(low, high + 1, size=rhs_shape) def using_operator(x): return x @ constant def using_function(x): return numpy.matmul(x, constant) operator_circuit = compile_numpy_function( using_operator, {"x": EncryptedTensor(UnsignedInteger(3), lhs_shape)}, inputset, default_compilation_configuration, ) function_circuit = compile_numpy_function( using_function, {"x": EncryptedTensor(UnsignedInteger(3), lhs_shape)}, inputset, default_compilation_configuration, ) args = (numpy.random.randint(low, high + 1, size=lhs_shape, dtype=numpy.uint8),) assert numpy.array_equal(operator_circuit.run(*args), using_operator(*args)) assert numpy.array_equal(function_circuit.run(*args), using_function(*args)) @pytest.mark.parametrize( "function,input_bits,list_of_arg_names", [ pytest.param(identity_lut_generator(1), (1,), ["x"], id="identity function (1-bit)"), pytest.param(identity_lut_generator(2), (2,), ["x"], id="identity function (2-bit)"), pytest.param(identity_lut_generator(3), (3,), ["x"], id="identity function (3-bit)"), pytest.param(identity_lut_generator(4), (4,), ["x"], id="identity function (4-bit)"), pytest.param(identity_lut_generator(5), (5,), ["x"], id="identity function (5-bit)"), pytest.param(identity_lut_generator(6), (6,), ["x"], id="identity function (6-bit)"), pytest.param(identity_lut_generator(7), (7,), ["x"], id="identity function (7-bit)"), pytest.param(random_lut_1b, (1,), ["x"], id="random function (1-bit)"), pytest.param(random_lut_2b, (2,), ["x"], id="random function (2-bit)"), pytest.param(random_lut_3b, (3,), ["x"], id="random function (3-bit)"), pytest.param(random_lut_4b, (4,), ["x"], id="random function (4-bit)"), pytest.param(random_lut_5b, (5,), ["x"], id="random function (5-bit)"), pytest.param(random_lut_6b, (6,), ["x"], id="random function (6-bit)"), pytest.param(random_lut_7b, (7,), ["x"], id="random function (7-bit)"), pytest.param(small_fused_table, (5,), ["x"], id="small fused table (5-bits)"), ], ) def test_compile_and_run_lut_correctness( function, input_bits, list_of_arg_names, default_compilation_configuration, ): """Test correctness of results when running a compiled function with LUT""" input_ranges = tuple((0, 2 ** input_bit - 1) for input_bit in input_bits) function_parameters = { arg_name: EncryptedScalar(Integer(input_bit, False)) for input_bit, arg_name in zip(input_bits, list_of_arg_names) } compiler_engine = compile_numpy_function( function, function_parameters, data_gen(tuple(range(x[0], x[1] + 1) for x in input_ranges)), default_compilation_configuration, ) # testing random values for _ in range(10): args = [random.randint(low, high) for (low, high) in input_ranges] check_is_good_execution(compiler_engine, function, args) # testing low values args = [low for (low, _) in input_ranges] check_is_good_execution(compiler_engine, function, args) # testing high values args = [high for (_, high) in input_ranges] check_is_good_execution(compiler_engine, function, args) def test_compile_and_run_multi_lut_correctness(default_compilation_configuration): """Test correctness of results when running a compiled function with Multi LUT""" def function_to_compile(x): table = MultiLookupTable( [ [LookupTable([1, 2, 1, 0]), LookupTable([2, 2, 1, 3])], [LookupTable([1, 0, 1, 0]), LookupTable([0, 2, 3, 3])], [LookupTable([0, 2, 3, 0]), LookupTable([2, 1, 2, 0])], ] ) return table[x] compiler_engine = compile_numpy_function( function_to_compile, { "x": EncryptedTensor(UnsignedInteger(2), shape=(3, 2)), }, [(numpy.random.randint(0, 2 ** 2, size=(3, 2)),) for _ in range(10)], default_compilation_configuration, ) # testing random values for _ in range(10): args = [numpy.random.randint(0, 2 ** 2, size=(3, 2), dtype=numpy.uint8)] check_is_good_execution(compiler_engine, function_to_compile, args) def test_compile_function_with_direct_tlu(default_compilation_configuration): """Test compile_numpy_function_into_op_graph for a program with direct table lookup""" table = LookupTable([9, 2, 4, 11]) def function(x): return x + table[x] op_graph = compile_numpy_function_into_op_graph_and_measure_bounds( function, {"x": EncryptedScalar(Integer(2, is_signed=False))}, [(0,), (1,), (2,), (3,)], default_compilation_configuration, ) str_of_the_graph = format_operation_graph(op_graph) print(f"\n{str_of_the_graph}\n") def test_compile_function_with_direct_tlu_overflow(default_compilation_configuration): """Test compile_numpy_function_into_op_graph for a program with direct table lookup overflow""" table = LookupTable([9, 2, 4, 11]) def function(x): return table[x] with pytest.raises(ValueError): compile_numpy_function_into_op_graph_and_measure_bounds( function, {"x": EncryptedScalar(Integer(3, is_signed=False))}, [(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,)], default_compilation_configuration, ) # pylint: disable=line-too-long @pytest.mark.parametrize( "function,parameters,inputset,match", [ pytest.param( lambda x: numpy.dot(x, numpy.array([-1.5])), { "x": EncryptedTensor(Integer(2, is_signed=False), shape=(1,)), }, [ (numpy.array([1]),), (numpy.array([1]),), (numpy.array([0]),), (numpy.array([0]),), (numpy.array([1]),), (numpy.array([1]),), (numpy.array([0]),), (numpy.array([0]),), (numpy.array([2]),), (numpy.array([2]),), ], ( """ function you are trying to compile isn't supported for MLIR lowering %0 = x # EncryptedTensor %1 = [-1.5] # ClearTensor ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only integer constants are supported %2 = dot(%0, %1) # EncryptedScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only integer dot product is supported return %2 """.strip() # noqa: E501 ), ), pytest.param( lambda x: x[0], {"x": EncryptedTensor(Integer(3, is_signed=True), shape=(2, 2))}, [(numpy.random.randint(-4, 2 ** 2, size=(2, 2)),) for i in range(10)], ( """ function you are trying to compile isn't supported for MLIR lowering %0 = x # EncryptedTensor ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only unsigned integer inputs are supported %1 = %0[0] # EncryptedTensor ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ indexing is not supported for the time being return %1 """.strip() # noqa: E501 ), ), pytest.param( no_fuse_unhandled, {"x": EncryptedScalar(Integer(2, False)), "y": EncryptedScalar(Integer(2, False))}, [(numpy.array(i), numpy.array(i)) for i in range(10)], ( """ function you are trying to compile isn't supported for MLIR lowering %0 = 1.5 # ClearScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only integer constants are supported %1 = x # EncryptedScalar %2 = 2.8 # ClearScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only integer constants are supported %3 = y # EncryptedScalar %4 = 9.3 # ClearScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only integer constants are supported %5 = add(%1, %2) # EncryptedScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only integer addition is supported %6 = add(%3, %4) # EncryptedScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only integer addition is supported %7 = sub(%5, %6) # EncryptedScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only integer subtraction is supported %8 = mul(%7, %0) # EncryptedScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only integer multiplication is supported %9 = astype(%8, dtype=int32) # EncryptedScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ astype with floating-point inputs is required to be fused to be supported return %9 """.strip() # noqa: E501 ), ), pytest.param( lambda x: x @ -numpy.ones(shape=(2, 3), dtype=numpy.int32), {"x": EncryptedTensor(UnsignedInteger(3), shape=(3, 2))}, [(numpy.random.randint(0, 2 ** 3, size=(3, 2)),) for i in range(10)], ( """ function you are trying to compile isn't supported for MLIR lowering %0 = x # EncryptedTensor %1 = [[-1 -1 -1] [-1 -1 -1]] # ClearTensor %2 = matmul(%0, %1) # EncryptedTensor ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ only unsigned integer matrix multiplication is supported return %2 """.strip() # noqa: E501 ), ), pytest.param( lambda x: numpy.transpose(x), {"x": EncryptedTensor(Integer(3, is_signed=False), shape=(3, 2))}, [(numpy.random.randint(0, 2 ** 3, size=(3, 2)),) for i in range(10)], ( """ function you are trying to compile isn't supported for MLIR lowering %0 = x # EncryptedTensor %1 = transpose(%0) # EncryptedTensor ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ transpose is not supported for the time being return %1 """.strip() # noqa: E501 ), ), pytest.param( lambda x: numpy.ravel(x), {"x": EncryptedTensor(Integer(3, is_signed=False), shape=(3, 2))}, [(numpy.random.randint(0, 2 ** 3, size=(3, 2)),) for i in range(10)], ( """ function you are trying to compile isn't supported for MLIR lowering %0 = x # EncryptedTensor %1 = ravel(%0) # EncryptedTensor ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ravel is not supported for the time being return %1 """.strip() # noqa: E501 ), ), pytest.param( lambda x: numpy.reshape(x, (2, 6)), {"x": EncryptedTensor(Integer(3, is_signed=False), shape=(3, 4))}, [(numpy.random.randint(0, 2 ** 3, size=(3, 4)),) for i in range(10)], ( """ function you are trying to compile isn't supported for MLIR lowering %0 = x # EncryptedTensor %1 = reshape(%0, newshape=(2, 6)) # EncryptedTensor ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ reshape is not supported for the time being return %1 """.strip() # noqa: E501 ), ), ], ) # pylint: enable=line-too-long def test_fail_compile(function, parameters, inputset, match, default_compilation_configuration): """Test function compile_numpy_function_into_op_graph for a program with signed values""" with pytest.raises(RuntimeError) as excinfo: compile_numpy_function( function, parameters, inputset, default_compilation_configuration, ) assert str(excinfo.value) == match, str(excinfo.value) def test_fail_with_intermediate_signed_values(default_compilation_configuration): """Test function with failing compilation due to intermediate signed integers.""" def function(x, y): z = numpy.abs(10 * numpy.negative(x)) z = z.astype(numpy.int32) + y return z with pytest.raises(RuntimeError): try: compile_numpy_function( function, { "x": EncryptedScalar(Integer(2, is_signed=False)), "y": EncryptedScalar(Integer(2, is_signed=False)), }, [(i, j) for i in range(2 ** 2) for j in range(2 ** 2)], default_compilation_configuration, show_mlir=True, ) except RuntimeError as error: match = """ function you are trying to compile isn't supported for MLIR lowering %0 = y # EncryptedScalar %1 = 10 # ClearScalar %2 = x # EncryptedScalar %3 = negative(%2) # EncryptedScalar %4 = mul(%3, %1) # EncryptedScalar %5 = absolute(%4) # EncryptedScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ absolute is not supported for the time being %6 = astype(%5, dtype=int32) # EncryptedScalar %7 = add(%6, %0) # EncryptedScalar return %7 """.strip() # noqa: E501 # pylint: disable=line-too-long assert str(error) == match raise def test_small_inputset_no_fail(): """Test function compile_numpy_function_into_op_graph with an unacceptably small inputset""" compile_numpy_function_into_op_graph_and_measure_bounds( lambda x: x + 42, {"x": EncryptedScalar(Integer(5, is_signed=False))}, [(0,), (3,)], CompilationConfiguration(dump_artifacts_on_unexpected_failures=False), ) def test_small_inputset_treat_warnings_as_errors(): """Test function compile_numpy_function_into_op_graph with an unacceptably small inputset""" with pytest.raises(ValueError, match=".* inputset contains too few inputs .*"): compile_numpy_function_into_op_graph_and_measure_bounds( lambda x: x + 42, {"x": EncryptedScalar(Integer(5, is_signed=False))}, [(0,), (3,)], CompilationConfiguration( dump_artifacts_on_unexpected_failures=False, treat_warnings_as_errors=True, ), ) @pytest.mark.parametrize( "function,params,shape,ref_graph_str", [ ( lambda x, y: numpy.dot(x, y), { "x": EncryptedTensor(Integer(2, is_signed=False), shape=(4,)), "y": EncryptedTensor(Integer(2, is_signed=False), shape=(4,)), }, (4,), # Remark that, when you do the dot of tensors of 4 values between 0 and 3, # you can get a maximal value of 4*3*3 = 36, ie something on 6 bits """ %0 = x # EncryptedTensor %1 = y # EncryptedTensor %2 = dot(%0, %1) # EncryptedScalar return %2 """.strip(), ), ], ) def test_compile_function_with_dot( function, params, shape, ref_graph_str, default_compilation_configuration ): """Test compile_numpy_function_into_op_graph for a program with np.dot""" # This is the exhaust, but if ever we have too long inputs (ie, large 'repeat'), # we'll have to take random values, not all values one by one def data_gen_local(max_for_ij, repeat): iter_i = itertools.product(range(0, max_for_ij + 1), repeat=repeat) iter_j = itertools.product(range(0, max_for_ij + 1), repeat=repeat) for prod_i, prod_j in itertools.product(iter_i, iter_j): yield numpy.array(prod_i), numpy.array(prod_j) max_for_ij = 3 assert len(shape) == 1 repeat = shape[0] op_graph = compile_numpy_function_into_op_graph_and_measure_bounds( function, params, data_gen_local(max_for_ij, repeat), default_compilation_configuration, ) str_of_the_graph = format_operation_graph(op_graph) assert str_of_the_graph == ref_graph_str, ( f"\n==================\nGot \n{str_of_the_graph}" f"==================\nExpected \n{ref_graph_str}" f"==================\n" ) @pytest.mark.parametrize( "function,input_ranges,list_of_arg_names", [ pytest.param(lambda x: x + 64, ((0, 10),), ["x"]), pytest.param(lambda x: x * 3, ((0, 40),), ["x"]), pytest.param(lambda x: 120 - x, ((40, 80),), ["x"]), pytest.param(lambda x, y: x + y + 64, ((0, 20), (0, 20)), ["x", "y"]), pytest.param(lambda x, y: 100 - y + x, ((0, 20), (0, 20)), ["x", "y"]), pytest.param(lambda x, y: 50 - y * 2 + x, ((0, 20), (0, 20)), ["x", "y"]), ], ) def test_compile_with_show_mlir( function, input_ranges, list_of_arg_names, default_compilation_configuration ): """Test show_mlir option""" function_parameters = { arg_name: EncryptedScalar(Integer(64, False)) for arg_name in list_of_arg_names } compile_numpy_function( function, function_parameters, data_gen(tuple(range(x[0], x[1] + 1) for x in input_ranges)), default_compilation_configuration, show_mlir=True, ) def test_compile_too_high_bitwidth(default_compilation_configuration): """Check that the check of maximal bitwidth of intermediate data works fine.""" def function(x, y): return x + y function_parameters = { "x": EncryptedScalar(Integer(64, False)), "y": EncryptedScalar(Integer(64, False)), } # A bit too much input_ranges = [(0, 100), (0, 28)] with pytest.raises(RuntimeError) as excinfo: compile_numpy_function( function, function_parameters, data_gen(tuple(range(x[0], x[1] + 1) for x in input_ranges)), default_compilation_configuration, ) assert ( str(excinfo.value) == """ max_bit_width of some nodes is too high for the current version of the compiler (maximum must be 7) which is not compatible with: %0 = x # EncryptedScalar %1 = y # EncryptedScalar %2 = add(%0, %1) # EncryptedScalar ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 8 bits is not supported for the time being return %2 """.strip() # noqa: E501 # pylint: disable=line-too-long ) # Just ok input_ranges = [(0, 99), (0, 28)] compile_numpy_function( function, function_parameters, data_gen(tuple(range(x[0], x[1] + 1) for x in input_ranges)), default_compilation_configuration, ) def test_compile_with_random_inputset(default_compilation_configuration): """Test function for compile with random input set""" configuration_to_use = deepcopy(default_compilation_configuration) configuration_to_use.enable_unsafe_features = True compile_numpy_function_into_op_graph_and_measure_bounds( lambda x: x + 1, {"x": EncryptedScalar(UnsignedInteger(6))}, inputset="random", compilation_configuration=configuration_to_use, ) compile_numpy_function( lambda x: x + 32, {"x": EncryptedScalar(UnsignedInteger(6))}, inputset="random", compilation_configuration=configuration_to_use, ) def test_fail_compile_with_random_inputset(default_compilation_configuration): """Test function for failed compile with random input set""" with pytest.raises(ValueError): try: compile_numpy_function_into_op_graph_and_measure_bounds( lambda x: x + 1, {"x": EncryptedScalar(UnsignedInteger(3))}, inputset="unsupported", compilation_configuration=default_compilation_configuration, ) except Exception as error: expected = ( "inputset can only be an iterable of tuples or the string 'random' " "but you specified 'unsupported' for it" ) assert str(error) == expected raise with pytest.raises(RuntimeError): try: compile_numpy_function( lambda x: x + 1, {"x": EncryptedScalar(UnsignedInteger(3))}, inputset="random", compilation_configuration=default_compilation_configuration, ) except Exception as error: expected = ( "Random inputset generation is an unsafe feature " "and should not be used if you don't know what you are doing" ) assert str(error) == expected raise def test_wrong_inputs(default_compilation_configuration): """Test compilation with faulty inputs""" # x should have been something like EncryptedScalar(UnsignedInteger(3)) x = [1, 2, 3] input_ranges = ((0, 10),) inputset = data_gen(tuple(range(x[0], x[1] + 1) for x in input_ranges)) dict_for_inputs = {"x": x} with pytest.raises(AssertionError) as excinfo: compile_numpy_function( lambda x: 2 * x, dict_for_inputs, inputset, default_compilation_configuration ) list_of_possible_basevalue = [ "ClearTensor", "EncryptedTensor", "ClearScalar", "EncryptedScalar", ] assert ( str(excinfo.value) == f"wrong type for inputs {dict_for_inputs}, " f"needs to be one of {list_of_possible_basevalue}" ) @pytest.mark.parametrize( "function,input_ranges,list_of_arg_names", [ pytest.param(lambda x: (x + (-27)) + 32, ((0, 10),), ["x"]), pytest.param(lambda x: ((-3) * x) + (100 - (x + 1)), ((0, 10),), ["x"]), pytest.param( lambda x: (20 + 10 * numpy.tanh(50 * (numpy.cos(x + 33.0)))).astype(numpy.uint32), ((0, 31),), ["x"], ), pytest.param( lambda x: (20 * (numpy.cos(x + 33.0)) + 30).astype(numpy.uint32), ((0, 31),), ["x"], ), pytest.param( lambda x, y: (-1) * x + (-2) * y + 40, ( (0, 10), (0, 10), ), ["x", "y"], ), ], ) def test_compile_and_run_correctness_with_negative_values( function, input_ranges, list_of_arg_names, default_compilation_configuration ): """Test correctness of results when running a compiled function, which has some negative intermediate values.""" function_parameters = { arg_name: EncryptedScalar(Integer(64, False)) for arg_name in list_of_arg_names } compiler_engine = compile_numpy_function( function, function_parameters, data_gen(tuple(range(x[0], x[1] + 1) for x in input_ranges)), default_compilation_configuration, ) args = [random.randint(low, high) for (low, high) in input_ranges] assert compiler_engine.run(*args) == function(*args) def check_equality_modulo(a, b, modulus): """Check that (a mod modulus) == (b mod modulus)""" return (a % modulus) == (b % modulus) @pytest.mark.parametrize( "function,input_ranges,list_of_arg_names,modulus", [ pytest.param(lambda x: x + (-20), ((0, 10),), ["x"], 128), pytest.param(lambda x: 10 + x * (-3), ((0, 20),), ["x"], 128), ], ) def test_compile_and_run_correctness_with_negative_results( function, input_ranges, list_of_arg_names, modulus, default_compilation_configuration ): """Test correctness of computations when the result is possibly negative: until #845 is fixed, results are currently only correct modulo a power of 2 (given by `modulus` parameter). Eg, instead of returning -3, the execution may return -3 mod 128 = 125.""" function_parameters = { arg_name: EncryptedScalar(Integer(64, False)) for arg_name in list_of_arg_names } compiler_engine = compile_numpy_function( function, function_parameters, data_gen(tuple(range(x[0], x[1] + 1) for x in input_ranges)), default_compilation_configuration, ) args = [random.randint(low, high) for (low, high) in input_ranges] assert check_equality_modulo(compiler_engine.run(*args), function(*args), modulus)