automatic granularity translation

Former-commit-id: c93920e8c7a1c62d42b0c63062bd21e4d5f190c6
This commit is contained in:
Michael T. Kelbaugh
2020-01-28 14:31:45 -05:00
parent 2eab6e0325
commit 340422c546

View File

@@ -279,15 +279,11 @@ class OuterWrapper(ABC):
schemas = self.validated_schemas.copy()
self.validated_schemas.clear()
results, htmls, images = self.increment(**schemas)
# translate results to the desired granularity?
# validate against output schemas
for schema_name, data_msg in results.items():
try:
validate(data_msg, json.loads(self.generic_output_schema))
print(schema_name)
print(data_msg)
print(self.output_schemas)
validate(data_msg, self.output_schemas[schema_name])
except Exception as e:
logging.critical("message {} failed to validate schema {}".format(data_msg, schema_name))
@@ -419,17 +415,26 @@ class OuterWrapper(ABC):
validate(message['payload'], schema)
logging.info("validated outgoing message: {}".format(message))
matched.append(schema)
for item in message['payload']:
src_gran = message['payload'][item]['granularity']
dest_gran = schema['properties'][item]['properties']['granularity'].get('value', src_gran)
agg = schema['properties'][item]['properties'].get('agg', {}).get('value')
dagg = schema['properties'][item]['properties'].get('dagg', {}).get('value')
logging.info(f"validating output: message from {name}, translating variable {item}, {src_gran} -> {dest_gran}")
data = self.translate(message['payload'][item]['data'], src_gran, dest_gran, item, agg_name=agg, disagg_name=dagg)
message['payload'][item]['data'] = data
message['payload'][item]['granularity'] = dest_gran
except ValidationError:
logging.info("validation error")
except json.JSONDecodeError:
logging.warning("json decode error")
if len(matched) == 0:
logging.info("message didn't match any output schemas: {}".format(message))
logging.info("message didn't match any output schemas: {}".format(message['source']))
elif len(matched) == 1:
logging.info("message matched an output schema: {}".format(message))
logging.info("message matched an output schema: {}".format(message['source']))
sock.send_json(message)
else:
logging.critical("more than one output schema was matched: {}".format(message))
logging.critical("more than one output schema was matched: {}".format(message['source']))
event.set()
sock.close()
@@ -463,7 +468,7 @@ class OuterWrapper(ABC):
if signal == 'status' and message.get('source') == 'broker':
self.broker_queue.put(message)
elif signal == 'data':
if not self.insert_data_message(message['payload']):
if not self.insert_data_message(message):
event.set()
else:
self.action_queue.put(message)
@@ -474,7 +479,7 @@ class OuterWrapper(ABC):
def insert_data_message(self, message):
"""
validates a data message against the input schemas
:param message: the payload of the data message to insert into the queue
:param message: the data message to insert into the queue
:return: False if message insertion throws an error, otherwise True.
returns True if the message is validated by 0 or 1 schemas
returns False if the data message is a duplicate
@@ -484,21 +489,30 @@ class OuterWrapper(ABC):
matched = []
for name, schema in self.input_schemas.items():
try:
validate(message, schema)
validate(message['payload'], schema)
logging.info("schema {} validated incoming message: {}".format(name, message))
if name in self.validated_schemas:
logging.error("schema {} already validated a message: {}".format(name, message))
return False
else:
matched.append(schema)
self.validated_schemas[name] = message
for item in message['payload']:
src_gran = message['payload'][item]['granularity']
dest_gran = schema['properties'][item]['properties']['granularity'].get('value', src_gran)
agg = schema['properties'][item]['properties'].get('agg', {}).get('value')
dagg = schema['properties'][item]['properties'].get('dagg', {}).get('value')
logging.info(f"validating input: message from {name}, translating variable {item}, {src_gran} -> {dest_gran}")
data = self.translate(message['payload'][item]['data'], src_gran, dest_gran, item, agg_name=agg, disagg_name=dagg)
message['payload'][item]['data'] = data
message['payload'][item]['granularity'] = dest_gran
self.validated_schemas[name] = message['payload']
except ValidationError:
logging.info("validation error")
except json.JSONDecodeError:
logging.warning("json decode error")
if len(matched) == 0:
logging.info("message didn't match any input schemas: {}".format(message))
logging.info("message didn't match any input schemas: {}".format(message['source']))
return True
def action_worker(self, event):