publish data with metadata for current year

Former-commit-id: 855354f8639d4c9a4f8a5d7ff602e5281460b57f
This commit is contained in:
Michael T. Kelbaugh
2019-08-13 12:55:23 -04:00
parent cffb6493db
commit 3e75217e7e
2 changed files with 22 additions and 11 deletions

View File

@@ -22,8 +22,9 @@ class Broker:
models = json.load(models_file)
self.models = {model: {} for model in models['models']}
self.model_tracker = set()
self.incstep = 0
self.max_incstep = 4
self.incstep = 1
self.max_incstep = 50
self.initial_year = 2016
self.boot_timer = 60 # units: seconds
self.watchdog_timer = 10 # units: seconds
self.client = None
@@ -59,10 +60,10 @@ class Broker:
payload = message[1]
if payload['signal'] == 'file_string':
data = payload['payload'].encode()
fs.put(data, filename=payload['name'], incstep=payload['incstep'], source=payload['source'])
fs.put(data, filename=payload['name'], incstep=payload['incstep'], year=payload['incstep'] + self.initial_year, source=payload['source'])
elif payload['signal'] == 'file_bytes':
data = payload['payload'].decode('base64')
fs.put(data, filename=payload['name'], incstep=payload['incstep'], source=payload['source'])
fs.put(data, filename=payload['name'], incstep=payload['incstep'], year=payload['incstep'] + self.initial_year, source=payload['source'])
else:
messages_col = metadata_db[collection]
payload = message[1]
@@ -85,6 +86,8 @@ class Broker:
message['signal'] = 'status'
message['status'] = self.status
message['incstep'] = self.incstep
message['initial_year'] = self.initial_year
message['current_year'] = self.incstep + self.initial_year
self.pub_queue.put(message)
def pub(self, event):
@@ -225,6 +228,7 @@ class Broker:
message['signal'] = 'increment'
message['status'] = self.status
message['incstep'] = self.incstep
message['year'] = self.incstep + self.initial_year
self.pub_queue.put(message)
self.incstep += 1

View File

@@ -84,7 +84,8 @@ class OuterWrapper(ABC):
self.model_id = model_id
self.num_expected_inputs = num_expected_inputs
self.status = 'booting'
self.incstep = 0
self.incstep = 1
self.initial_year = -1
self.increment_flag = False
self.connected_to_broker = False
@@ -220,7 +221,7 @@ class OuterWrapper(ABC):
raise NotImplementedError("increment() has to be implemented in the {} inner wrapper".format(self.model_id))
def increment_handler(self, event, incstep=0):
def increment_handler(self, event, incstep):
"""
Calls increment() after validating inputs, then validates the results of the increment
:param event: the shutdown event for managing threads
@@ -229,10 +230,11 @@ class OuterWrapper(ABC):
"""
self.increment_flag = True
logging.info("about to increment, incstep {}".format(incstep))
logging.info("about to increment, incstep {}, year {}".format(incstep, self.initial_year + incstep))
self.incstep = incstep
# validate against input schemas
if incstep > 0 and len(self.validated_schemas) != self.num_expected_inputs:
if incstep > 1 and len(self.validated_schemas) != self.num_expected_inputs:
logging.critical("number of validated schemas {} != num_expected_inputs {}".format(len(self.validated_schemas), self.num_expected_inputs))
event.set()
raise RuntimeError
@@ -258,7 +260,6 @@ class OuterWrapper(ABC):
raise RuntimeError
self.increment_flag = False
self.incstep += 1
for schema, data in results.items():
data_msg = {}
data_msg['schema'] = schema
@@ -266,6 +267,7 @@ class OuterWrapper(ABC):
data_msg['signal'] = 'data'
data_msg['source'] = self.model_id
data_msg['incstep'] = self.incstep
data_msg['year'] = self.incstep + self.initial_year
self.pub_queue.put(data_msg)
for filename, html in htmls.items():
file_msg = {}
@@ -274,6 +276,7 @@ class OuterWrapper(ABC):
file_msg['signal'] = 'file_string'
file_msg['source'] = self.model_id
file_msg['incstep'] = self.incstep
file_msg['year'] = self.incstep + self.initial_year
self.pub_queue.put(file_msg)
for filename, image in images.items():
file_msg = {}
@@ -282,8 +285,10 @@ class OuterWrapper(ABC):
file_msg['signal'] = 'file_bytes'
file_msg['source'] = self.model_id
file_msg['incstep'] = self.incstep
file_msg['year'] = self.incstep + self.initial_year
self.pub_queue.put(file_msg)
logging.info("finished the increment")
logging.info("finished increment {}, year {}".format(self.incstep, self.incstep + self.initial_year))
self.incstep += 1
def send_status(self, event):
"""
@@ -308,7 +313,7 @@ class OuterWrapper(ABC):
# waiting for the increment to finish
self.status = 'incrementing'
else:
if self.incstep == 0:
if self.incstep == 1:
# kickstart the model for the first increment
self.status = 'ready'
@@ -329,6 +334,7 @@ class OuterWrapper(ABC):
message['date'] = time.ctime()
message['signal'] = 'status'
message['incstep'] = self.incstep
message['year'] = self.incstep + self.initial_year
message['status'] = self.status
self.pub_queue.put(message)
@@ -491,6 +497,7 @@ class OuterWrapper(ABC):
message = self.broker_queue.get(timeout=10)
if message.get('status') == 'booted':
self.connected_to_broker = True
self.initial_year = message.get('initial_year')
except Empty:
logging.critical("Timed out waiting for broker message")
event.set()