race condition bug, if one model finishes its increments and publishes outputs before a recipient model receives the increment and clears its validated messages

Former-commit-id: c9467009df61f2986294d730abd2adc89e1fb028
This commit is contained in:
Michael T. Kelbaugh
2020-03-30 13:52:57 -04:00
parent f6de1fd6f8
commit f879bd1c4b

View File

@@ -180,7 +180,7 @@ class OuterWrapper(ABC):
self.input_schemas = None
self.output_schemas = None
self.validated_schemas = {}
self.validated_messages = {"this_incstep": {}, "next_incstep": {}}
self.generic_output_schema = (
"{"
' "type": "object",'
@@ -389,18 +389,20 @@ class OuterWrapper(ABC):
# validate against input schemas
if (
incstep > 1
and len(self.validated_schemas) != self.num_expected_inputs
and len(self.validated_messages["this_incstep"])
!= self.num_expected_inputs
):
logging.critical(
f"number of validated schemas {len(self.validated_schemas)} != num_expected_inputs {self.num_expected_inputs}"
f"number of validated messages {len(self.validated_messages['this_incstep'])} != num_expected_inputs {self.num_expected_inputs}"
)
event.set()
raise RuntimeError
# call the inner wrapper
schemas = self.validated_schemas.copy()
self.validated_schemas.clear()
results = self.increment(**schemas)
payloads = {}
for schema, message in self.validated_messages["this_incstep"].items():
payloads[schema] = message["payload"]
results = self.increment(**payloads)
# validate against output schemas
for schema_name, data_msg in results.items():
@@ -462,9 +464,21 @@ class OuterWrapper(ABC):
self.status = "ready"
elif (
len(self.validated_schemas) == self.num_expected_inputs
len(self.validated_messages["next_incstep"])
== self.num_expected_inputs
):
# all input messages have been received and all input schemas have been validated
# ready for an increment
self.validated_messages[
"this_incstep"
] = self.validated_messages["next_incstep"].copy()
self.validated_messages["next_incstep"].clear()
self.status = "ready"
elif (
len(self.validated_messages["this_incstep"])
== self.num_expected_inputs
):
# ready for an increment
self.status = "ready"
else:
@@ -634,10 +648,11 @@ class OuterWrapper(ABC):
logging.info(
f"schema {name} validated incoming message from {message['source']}"
)
if name in self.validated_schemas:
if name in self.validated_messages["next_incstep"]:
logging.error(
f"schema {name} already validated a message: {message}"
f"schema {name} already validated a message: {self.validated_messages['next_incstep'][name]}"
)
logging.error(f"new message: {message}")
return False
else:
matched.append(schema)
@@ -681,7 +696,8 @@ class OuterWrapper(ABC):
][item]["properties"]["data"].get("unit", "")
message["payload"][item]["granularity"] = dest_gran
self.validated_schemas[name] = message["payload"]
self.validated_messages["next_incstep"][name] = message
except ValidationError:
logging.debug("validation error")
except json.JSONDecodeError: