diff --git a/outer_wrapper.py b/outer_wrapper.py index bf704b6..84a43db 100755 --- a/outer_wrapper.py +++ b/outer_wrapper.py @@ -180,7 +180,7 @@ class OuterWrapper(ABC): self.input_schemas = None self.output_schemas = None - self.validated_schemas = {} + self.validated_messages = {"this_incstep": {}, "next_incstep": {}} self.generic_output_schema = ( "{" ' "type": "object",' @@ -389,18 +389,20 @@ class OuterWrapper(ABC): # validate against input schemas if ( incstep > 1 - and len(self.validated_schemas) != self.num_expected_inputs + and len(self.validated_messages["this_incstep"]) + != self.num_expected_inputs ): logging.critical( - f"number of validated schemas {len(self.validated_schemas)} != num_expected_inputs {self.num_expected_inputs}" + f"number of validated messages {len(self.validated_messages['this_incstep'])} != num_expected_inputs {self.num_expected_inputs}" ) event.set() raise RuntimeError # call the inner wrapper - schemas = self.validated_schemas.copy() - self.validated_schemas.clear() - results = self.increment(**schemas) + payloads = {} + for schema, message in self.validated_messages["this_incstep"].items(): + payloads[schema] = message["payload"] + results = self.increment(**payloads) # validate against output schemas for schema_name, data_msg in results.items(): @@ -462,9 +464,21 @@ class OuterWrapper(ABC): self.status = "ready" elif ( - len(self.validated_schemas) == self.num_expected_inputs + len(self.validated_messages["next_incstep"]) + == self.num_expected_inputs ): - # all input messages have been received and all input schemas have been validated + # ready for an increment + self.validated_messages[ + "this_incstep" + ] = self.validated_messages["next_incstep"].copy() + self.validated_messages["next_incstep"].clear() + self.status = "ready" + + elif ( + len(self.validated_messages["this_incstep"]) + == self.num_expected_inputs + ): + # ready for an increment self.status = "ready" else: @@ -634,10 +648,11 @@ class OuterWrapper(ABC): logging.info( f"schema {name} validated incoming message from {message['source']}" ) - if name in self.validated_schemas: + if name in self.validated_messages["next_incstep"]: logging.error( - f"schema {name} already validated a message: {message}" + f"schema {name} already validated a message: {self.validated_messages['next_incstep'][name]}" ) + logging.error(f"new message: {message}") return False else: matched.append(schema) @@ -681,7 +696,8 @@ class OuterWrapper(ABC): ][item]["properties"]["data"].get("unit", "") message["payload"][item]["granularity"] = dest_gran - self.validated_schemas[name] = message["payload"] + self.validated_messages["next_incstep"][name] = message + except ValidationError: logging.debug("validation error") except json.JSONDecodeError: