From f879bd1c4b78b1e707585396b0b23588b489600b Mon Sep 17 00:00:00 2001 From: "Michael T. Kelbaugh" Date: Mon, 30 Mar 2020 13:52:57 -0400 Subject: [PATCH] race condition bug, if one model finishes its increments and publishes outputs before a recipient model receives the increment and clears its validated messages Former-commit-id: c9467009df61f2986294d730abd2adc89e1fb028 --- outer_wrapper.py | 38 +++++++++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 11 deletions(-) diff --git a/outer_wrapper.py b/outer_wrapper.py index bf704b6..84a43db 100755 --- a/outer_wrapper.py +++ b/outer_wrapper.py @@ -180,7 +180,7 @@ class OuterWrapper(ABC): self.input_schemas = None self.output_schemas = None - self.validated_schemas = {} + self.validated_messages = {"this_incstep": {}, "next_incstep": {}} self.generic_output_schema = ( "{" ' "type": "object",' @@ -389,18 +389,20 @@ class OuterWrapper(ABC): # validate against input schemas if ( incstep > 1 - and len(self.validated_schemas) != self.num_expected_inputs + and len(self.validated_messages["this_incstep"]) + != self.num_expected_inputs ): logging.critical( - f"number of validated schemas {len(self.validated_schemas)} != num_expected_inputs {self.num_expected_inputs}" + f"number of validated messages {len(self.validated_messages['this_incstep'])} != num_expected_inputs {self.num_expected_inputs}" ) event.set() raise RuntimeError # call the inner wrapper - schemas = self.validated_schemas.copy() - self.validated_schemas.clear() - results = self.increment(**schemas) + payloads = {} + for schema, message in self.validated_messages["this_incstep"].items(): + payloads[schema] = message["payload"] + results = self.increment(**payloads) # validate against output schemas for schema_name, data_msg in results.items(): @@ -462,9 +464,21 @@ class OuterWrapper(ABC): self.status = "ready" elif ( - len(self.validated_schemas) == self.num_expected_inputs + len(self.validated_messages["next_incstep"]) + == self.num_expected_inputs ): - # all input messages have been received and all input schemas have been validated + # ready for an increment + self.validated_messages[ + "this_incstep" + ] = self.validated_messages["next_incstep"].copy() + self.validated_messages["next_incstep"].clear() + self.status = "ready" + + elif ( + len(self.validated_messages["this_incstep"]) + == self.num_expected_inputs + ): + # ready for an increment self.status = "ready" else: @@ -634,10 +648,11 @@ class OuterWrapper(ABC): logging.info( f"schema {name} validated incoming message from {message['source']}" ) - if name in self.validated_schemas: + if name in self.validated_messages["next_incstep"]: logging.error( - f"schema {name} already validated a message: {message}" + f"schema {name} already validated a message: {self.validated_messages['next_incstep'][name]}" ) + logging.error(f"new message: {message}") return False else: matched.append(schema) @@ -681,7 +696,8 @@ class OuterWrapper(ABC): ][item]["properties"]["data"].get("unit", "") message["payload"][item]["granularity"] = dest_gran - self.validated_schemas[name] = message["payload"] + self.validated_messages["next_incstep"][name] = message + except ValidationError: logging.debug("validation error") except json.JSONDecodeError: