mirror of
https://github.com/JHUAPL/SIMoN.git
synced 2026-01-09 14:57:56 -05:00
my changes, to be overwritten:
Former-commit-id: 3a87289824110e1fa380289f564d8498cf63e021
This commit is contained in:
21
LICENSE.md
Normal file
21
LICENSE.md
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2020 The Johns Hopkins University Applied Physics Laboratory
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
66
README.md
66
README.md
@@ -1,16 +1,30 @@
|
||||
## Setup
|
||||
* clone the repository
|
||||
* git clone https://github.com/circuitinstitute/simon.git
|
||||
## SIMoN
|
||||
System Integration with Multiscale Networks
|
||||
Copyright 2020 The Johns Hopkins University Applied Physics Laboratory
|
||||
Licensed under the MIT License
|
||||
|
||||
## Description
|
||||
The SIMoN joint modeling framework integrates independently-designed predictive models into a cohesive system, in order to produce a unified model. While many useful models are limited to predicting only a single isolated component of a larger system, SIMoN is able to connect models together so that collectively they can provide a more complete representation of the global system and its dynamics. By using the SIMoN software, a modeler is able to join these disparate models together in various combinations and find new insights in their data.
|
||||
In order to translate data from its models across different geographic granularities, SIMoN uses a network graph that represents all the granularities, their corresponding entities, and their relationships to each other. The individual models feed each other updated data inputs at synchronized time intervals, and traverse the network graph to translate their data from one granularity to another. A sample granularity graph is provided, but modelers can extend it or create a graph of their own, by modifying and using the graphs/build.py script.
|
||||
SIMoN is written in Python 3, and uses Docker to manage its models and their integration. Each model runs in its own separate, modular Docker container. An additional container runs the system’s centralized Broker, which receives each model’s data outputs using a PyZMQ publish-subscribe messaging pattern. The Broker then redirects the data to any models that request it. The models can then use this data as their inputs for the next incremental step in the system’s synchronized run.
|
||||
|
||||
## Setup
|
||||
SIMoN uses Docker and Compose to run its models in separate containers. To run SIMoN, clone the repo and install these tools.
|
||||
Additionally, install make, so that the shell commands that operate SIMoN can be executed more easily using the Makefile.
|
||||
|
||||
* install Docker
|
||||
* https://docs.docker.com/install/
|
||||
* install Docker Compose
|
||||
* https://docs.docker.com/compose/install/
|
||||
* install Make
|
||||
|
||||
## Usage
|
||||
1. choose the models that you want to run together in the SIMoN framework
|
||||
2. add your chosen models to the "models" list in `broker/config.json`
|
||||
3. add your chosen models to the "services" in `build/docker-compose.yml`
|
||||
1. Choose the models that you want to run together in the SIMoN framework. Note their interdependencies, and make sure that each model has a source for all of its necessary data inputs.
|
||||
2. Once you have a consistent set of models, add the unique name of each of the models to the "models" list in `broker/config.json`
|
||||
3. Create an entry for each model in the "services" section in `build/docker-compose.yml`
|
||||
`model_name_1:
|
||||
build: ../models/examples/model_name_1/
|
||||
volumes:
|
||||
- ../models/examples/model_name_1:/opt:ro`
|
||||
4. start SIMoN
|
||||
* `make all`
|
||||
5. shut down SIMoN
|
||||
@@ -18,19 +32,29 @@
|
||||
* `make clean` to stop all models and clear the database
|
||||
|
||||
## Add a new model
|
||||
1. create a new directory with 3 required subdirectories, based on `models/template/`
|
||||
* `src/` stores the model's unique source code
|
||||
1. In the models/ directory, copy the template/ directory and rename it to the ID (unique name) of your new model.
|
||||
1. Within this new directory are several required directories and files:
|
||||
* `src/` stores the model's source code
|
||||
* `inner_wrapper.py`
|
||||
* required file
|
||||
* must implement `configure()` and `increment()` methods
|
||||
* must specify the model's ID (unique name)
|
||||
* any aditional code that the model needs
|
||||
* `schemas/` stores JSON schemas that data messages must validate against
|
||||
* `input/` schemas that incoming data messages must validate against
|
||||
* `*.json`
|
||||
* `output/` schemas that ougoing data messages must validate against
|
||||
* `*.json`
|
||||
* `config/` stores JSON objects with parameters needed for the model's initialization
|
||||
* this file receives input data from other models, performs operations on it, and returns the output data that will be sent to other models.
|
||||
* implement the `configure()` and `increment()` abstract methods
|
||||
* replace the template name with the the model's ID (its unique name)
|
||||
* call the functions (e.g., my_function_1) defined in other custom scripts from this file
|
||||
* `my_function_1.py`
|
||||
* aditional code that your model uses
|
||||
* `my_function_2.py`
|
||||
* aditional code that your model uses
|
||||
* `schemas/input/` stores JSON schemas that incoming data messages must validate against
|
||||
* `*.json`
|
||||
* granularity: specify the granularity of input data that this model needs. SIMoN will translate incoming data to this granularity before sending it to the model's inner wrapper.
|
||||
* `schemas/output/` stores JSON schemas that outgoing data messages must validate against
|
||||
* `*.json`
|
||||
2. add the new model to the "models" list in `broker/config.json`
|
||||
3. add the new model to the "services" in `build/docker-compose.yml`
|
||||
* granularity: specify the granularity of data that this model will output. SIMoN will translate outgoing data to this granularity after receiving it from the model's inner wrapper.
|
||||
* `config/` stores JSON objects with the initial data and parameters needed for the model's first time step
|
||||
* `*.json`
|
||||
2. add the name of the new model to the "models" list in `broker/config.json`
|
||||
3. add the new model to the "services" in `build/docker-compose.yml` by specifying its path:
|
||||
`new_model_name:
|
||||
build: ../models/examples/new_model_name/
|
||||
volumes:
|
||||
- ../models/examples/new_model_name:/opt:ro`
|
||||
|
||||
@@ -1 +1 @@
|
||||
{"directed": true, "multigraph": false, "graph": {}, "nodes": [{"id": "UnitedStates"}, {"id": "state"}, {"id": "county"}, {"id": "NERC"}, {"id": "HUC8"}, {"id": "climate"}, {"id": "NERC^state"}, {"id": "HUC8^state"}, {"id": "climate^state"}, {"id": "HUC8^NERC"}, {"id": "NERC^climate"}, {"id": "NERC^county"}, {"id": "HUC8^climate"}, {"id": "HUC8^county"}, {"id": "climate^county"}], "links": [{"source": "UnitedStates", "target": "state"}, {"source": "UnitedStates", "target": "NERC"}, {"source": "UnitedStates", "target": "HUC8"}, {"source": "UnitedStates", "target": "climate"}, {"source": "state", "target": "county"}, {"source": "state", "target": "NERC^state"}, {"source": "state", "target": "HUC8^state"}, {"source": "state", "target": "climate^state"}, {"source": "county", "target": "NERC^county"}, {"source": "county", "target": "HUC8^county"}, {"source": "county", "target": "climate^county"}, {"source": "NERC", "target": "NERC^state"}, {"source": "NERC", "target": "HUC8^NERC"}, {"source": "NERC", "target": "NERC^climate"}, {"source": "NERC", "target": "NERC^county"}, {"source": "HUC8", "target": "HUC8^state"}, {"source": "HUC8", "target": "HUC8^NERC"}, {"source": "HUC8", "target": "HUC8^climate"}, {"source": "HUC8", "target": "HUC8^county"}, {"source": "climate", "target": "climate^state"}, {"source": "climate", "target": "NERC^climate"}, {"source": "climate", "target": "HUC8^climate"}, {"source": "climate", "target": "climate^county"}, {"source": "NERC^state", "target": "NERC^county"}, {"source": "HUC8^state", "target": "HUC8^county"}, {"source": "climate^state", "target": "climate^county"}]}
|
||||
{"directed": true, "multigraph": false, "graph": {"id": "d26744a8-2dfd-4bae-8e14-221fb96fb92b", "projection": 3085, "granularities": ["UnitedStates", "state", "NERC", "HUC8", "latlon", "county"], "minimum_intersect_area": 1, "nodes": 35614, "links": 65720, "counts": {"UnitedStates": 1, "state": 49, "NERC": 22, "HUC8": 2118, "latlon": 209, "county": 3108, "UnitedStates^state": 0, "NERC^UnitedStates": 0, "HUC8^UnitedStates": 0, "UnitedStates^latlon": 0, "UnitedStates^county": 0, "NERC^state": 126, "HUC8^state": 2830, "latlon^state": 428, "county^state": 0, "HUC8^NERC": 2643, "NERC^latlon": 350, "NERC^county": 3219, "HUC8^latlon": 4016, "HUC8^county": 11700, "county^latlon": 4795}, "areas": {"UnitedStates": 7792222.205781722, "state": 7792129.761006697, "NERC": 7792222.20578146, "HUC8": 7790895.333544435, "latlon": 7792221.132959093, "county": 7792129.761006684, "UnitedStates^state": 0, "NERC^UnitedStates": 0, "HUC8^UnitedStates": 0, "UnitedStates^latlon": 0, "UnitedStates^county": 0, "NERC^state": 7792119.496465746, "HUC8^state": 7790787.976451337, "latlon^state": 7792125.899565454, "county^state": 0, "HUC8^NERC": 7790884.9456620645, "NERC^latlon": 7792219.4876443455, "NERC^county": 7791903.372250843, "HUC8^latlon": 7790880.130752959, "HUC8^county": 7790559.405443681, "county^latlon": 7792114.229985136}}, "nodes": [{"id": "UnitedStates"}, {"id": "state"}, {"id": "county"}, {"id": "NERC"}, {"id": "HUC8"}, {"id": "latlon"}, {"id": "NERC^state"}, {"id": "HUC8^state"}, {"id": "latlon^state"}, {"id": "HUC8^NERC"}, {"id": "NERC^latlon"}, {"id": "NERC^county"}, {"id": "HUC8^latlon"}, {"id": "HUC8^county"}, {"id": "county^latlon"}], "links": [{"source": "UnitedStates", "target": "state"}, {"source": "UnitedStates", "target": "NERC"}, {"source": "UnitedStates", "target": "HUC8"}, {"source": "UnitedStates", "target": "latlon"}, {"source": "state", "target": "county"}, {"source": "state", "target": "NERC^state"}, {"source": "state", "target": "HUC8^state"}, {"source": "state", "target": "latlon^state"}, {"source": "county", "target": "NERC^county"}, {"source": "county", "target": "HUC8^county"}, {"source": "county", "target": "county^latlon"}, {"source": "NERC", "target": "NERC^state"}, {"source": "NERC", "target": "HUC8^NERC"}, {"source": "NERC", "target": "NERC^latlon"}, {"source": "NERC", "target": "NERC^county"}, {"source": "HUC8", "target": "HUC8^state"}, {"source": "HUC8", "target": "HUC8^NERC"}, {"source": "HUC8", "target": "HUC8^latlon"}, {"source": "HUC8", "target": "HUC8^county"}, {"source": "latlon", "target": "latlon^state"}, {"source": "latlon", "target": "NERC^latlon"}, {"source": "latlon", "target": "HUC8^latlon"}, {"source": "latlon", "target": "county^latlon"}]}
|
||||
@@ -9,7 +9,6 @@ import logging
|
||||
|
||||
|
||||
class Broker:
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
constructor for the broker
|
||||
@@ -24,14 +23,17 @@ class Broker:
|
||||
self.incstep = 1
|
||||
self.max_incstep = 50
|
||||
self.initial_year = 2016
|
||||
self.boot_timer = 60 # units: seconds
|
||||
self.watchdog_timer = 60 # units: seconds
|
||||
self.boot_timer = 60 # units: seconds
|
||||
self.watchdog_timer = 60 # units: seconds
|
||||
self.client = None
|
||||
self.mongo_queue = Queue()
|
||||
self.broker_id = 'broker'
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout,
|
||||
format='%(asctime)s - %(levelname)s - %(filename)s:%(funcName)s:%(lineno)d - %(message)s')
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
stream=sys.stdout,
|
||||
format='%(asctime)s - %(levelname)s - %(filename)s:%(funcName)s:%(lineno)d - %(message)s',
|
||||
)
|
||||
logging.info(self.models)
|
||||
|
||||
def insert_into_mongodb(self, event):
|
||||
@@ -119,7 +121,10 @@ class Broker:
|
||||
except zmq.ZMQError:
|
||||
continue
|
||||
logging.info(json.dumps(message))
|
||||
if message.get('source') in self.models and message.get('signal') == 'status':
|
||||
if (
|
||||
message.get('source') in self.models
|
||||
and message.get('signal') == 'status'
|
||||
):
|
||||
self.models[message.get('source')] = message
|
||||
self.model_tracker.add(message.get('source'))
|
||||
if message.get('signal') == 'data':
|
||||
@@ -173,7 +178,11 @@ class Broker:
|
||||
"""
|
||||
|
||||
while not event.is_set():
|
||||
for i in range(self.boot_timer if self.status == 'booting' else self.watchdog_timer):
|
||||
for i in range(
|
||||
self.boot_timer
|
||||
if self.status == 'booting'
|
||||
else self.watchdog_timer
|
||||
):
|
||||
time.sleep(1)
|
||||
if self.model_tracker == set(self.models.keys()):
|
||||
self.status = 'booted'
|
||||
@@ -181,8 +190,12 @@ class Broker:
|
||||
break
|
||||
else:
|
||||
missing_models = set(self.models.keys()) - self.model_tracker
|
||||
logging.critical(f"Timed out waiting for {missing_models}{' to initialize' if self.status == 'booting' else ''}")
|
||||
logging.critical(f"Broker will shut down now, current time: {time.ctime()}")
|
||||
logging.critical(
|
||||
f"Timed out waiting for {missing_models}{' to initialize' if self.status == 'booting' else ''}"
|
||||
)
|
||||
logging.critical(
|
||||
f"Broker will shut down now, current time: {time.ctime()}"
|
||||
)
|
||||
event.set()
|
||||
|
||||
def send_increment_pulse(self, event):
|
||||
@@ -198,12 +211,22 @@ class Broker:
|
||||
|
||||
# check to send an increment pulse
|
||||
for model, status in self.models.items():
|
||||
if status.get('status') != 'ready' or status.get('incstep') != self.incstep:
|
||||
if (
|
||||
status.get('status') != 'ready'
|
||||
or status.get('incstep') != self.incstep
|
||||
):
|
||||
break
|
||||
else:
|
||||
if self.incstep > self.max_incstep and self.mongo_queue.empty():
|
||||
logging.critical(f"successfully finished last increment {self.max_incstep}")
|
||||
logging.critical(f"Broker will shut down now, current time: {time.ctime()}")
|
||||
if (
|
||||
self.incstep > self.max_incstep
|
||||
and self.mongo_queue.empty()
|
||||
):
|
||||
logging.critical(
|
||||
f"successfully finished last increment {self.max_incstep}"
|
||||
)
|
||||
logging.critical(
|
||||
f"Broker will shut down now, current time: {time.ctime()}"
|
||||
)
|
||||
event.set()
|
||||
else:
|
||||
logging.info(f"sending increment pulse {self.incstep}")
|
||||
@@ -240,10 +263,14 @@ class Broker:
|
||||
watchdog_thread = Thread(target=self.watchdog, args=(shutdown,))
|
||||
watchdog_thread.start()
|
||||
|
||||
increment_pulse_thread = Thread(target=self.send_increment_pulse, args=(shutdown,))
|
||||
increment_pulse_thread = Thread(
|
||||
target=self.send_increment_pulse, args=(shutdown,)
|
||||
)
|
||||
increment_pulse_thread.start()
|
||||
|
||||
mongo_thread = Thread(target=self.insert_into_mongodb, args=(shutdown,))
|
||||
mongo_thread = Thread(
|
||||
target=self.insert_into_mongodb, args=(shutdown,)
|
||||
)
|
||||
mongo_thread.start()
|
||||
|
||||
try:
|
||||
|
||||
@@ -1 +1 @@
|
||||
b9a4ec463f5fa11e23a0b0fd2f142ef913ba9860
|
||||
0e80818e3e9aaf580248a07fd4eb12586301e5e9
|
||||
@@ -3,21 +3,27 @@ from fair.forward import fair_scm
|
||||
import numpy as np
|
||||
import json
|
||||
|
||||
#need to iterate through another dictionary that has counties as values
|
||||
#sum up the values set equaal to new variable
|
||||
#scale it then CFT
|
||||
# need to iterate through another dictionary that has counties as values
|
||||
# sum up the values set equaal to new variable
|
||||
# scale it then CFT
|
||||
def temperature_simulation(electric):
|
||||
total = sum(list(filter(None, electric.values())))
|
||||
#emissions[i]=
|
||||
emissions = np.array(total)
|
||||
#other_rf = np.zeros(emissions.size)
|
||||
|
||||
#for x in range(0, emissions.size):
|
||||
# other_rf[x] = 0.5 * np.sin(2 * np.pi * (x) / 14.0)
|
||||
total = sum(list(filter(None, electric.values())))
|
||||
# emissions[i]=
|
||||
emissions = np.array(total)
|
||||
# other_rf = np.zeros(emissions.size)
|
||||
|
||||
#emissions=emissions*6.66667*3.5714285 #scaling factors
|
||||
|
||||
C,F,T = fair.forward.fair_scm(emissions_driven=True, emissions=np.array([emissions*6.66667*3.5714285]), useMultigas=False)
|
||||
|
||||
return T
|
||||
#temperature_simulation({'co2':{'data':{5646:9.9,489247:6,234708:4.5}},'therm':[2,3,2]})
|
||||
# for x in range(0, emissions.size):
|
||||
# other_rf[x] = 0.5 * np.sin(2 * np.pi * (x) / 14.0)
|
||||
|
||||
# emissions=emissions*6.66667*3.5714285 #scaling factors
|
||||
|
||||
C, F, T = fair.forward.fair_scm(
|
||||
emissions_driven=True,
|
||||
emissions=np.array([emissions * 6.66667 * 3.5714285]),
|
||||
useMultigas=False,
|
||||
)
|
||||
|
||||
return T
|
||||
|
||||
|
||||
# temperature_simulation({'co2':{'data':{5646:9.9,489247:6,234708:4.5}},'therm':[2,3,2]})
|
||||
|
||||
@@ -1,18 +1,21 @@
|
||||
import glob
|
||||
import sys
|
||||
import fair
|
||||
|
||||
sys.path.append('/')
|
||||
from outer_wrapper import OuterWrapper
|
||||
from climate import temperature_simulation
|
||||
|
||||
#put the json file from the output of power supply into the schemas/input file
|
||||
# put the json file from the output of power supply into the schemas/input file
|
||||
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
|
||||
def __init__(self):
|
||||
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
|
||||
super().__init__(model_id="climate", num_expected_inputs=num_input_schemas)
|
||||
#self.electric=None
|
||||
super().__init__(
|
||||
model_id="climate", num_expected_inputs=num_input_schemas
|
||||
)
|
||||
# self.electric=None
|
||||
|
||||
def configure(self, **kwargs):
|
||||
if 'co2' in kwargs.keys():
|
||||
@@ -23,11 +26,18 @@ class InnerWrapper(OuterWrapper):
|
||||
def increment(self, **kwargs):
|
||||
if 'power_output' in kwargs.keys():
|
||||
self.electric = kwargs['power_output']['co2']['data']
|
||||
else:
|
||||
else:
|
||||
print('input co2 not found')
|
||||
temperature=float(temperature_simulation(self.electric))
|
||||
|
||||
return {'climate': { 'climate': {'data': {'global_temp': temperature}, 'granularity': 'global'}}}
|
||||
temperature = float(temperature_simulation(self.electric))
|
||||
|
||||
return {
|
||||
'climate': {
|
||||
'climate': {
|
||||
'data': {'global_temp': temperature},
|
||||
'granularity': 'global',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
@@ -5,14 +5,14 @@
|
||||
"type": "object",
|
||||
"properties":{
|
||||
"data": {"type": "object"},
|
||||
"granularity": {"type": "string", "value": "HUC8"}
|
||||
"granularity": {"type": "string", "value": "latlon"}
|
||||
}
|
||||
},
|
||||
"evaporation": {
|
||||
"type": "object",
|
||||
"properties":{
|
||||
"data": {"type": "object"},
|
||||
"granularity": {"type": "string", "value": "HUC8"}
|
||||
"granularity": {"type": "string", "value": "latlon"}
|
||||
}
|
||||
},
|
||||
"global_temp": {
|
||||
|
||||
@@ -10,6 +10,10 @@ import json
|
||||
|
||||
def temp_inc(init_data, year):
|
||||
json1_data = init_data
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
year = year - 1
|
||||
>>>>>>> 154bcdf2bb3d01cda303fe3e9cbbe67a926c0763
|
||||
mean_glob_temps = []
|
||||
with open("/opt/src/weights.json") as f:
|
||||
weights = json.load(f)
|
||||
@@ -28,11 +32,17 @@ def temp_inc(init_data, year):
|
||||
# Contiguous U.S bounded by (49 N, 122W), (24N 66W)
|
||||
if 49 >= float(i) >= 23 and -68 >= float(j) >= -128:
|
||||
single_year_US[i][j] = (
|
||||
<<<<<<< HEAD
|
||||
json1_data[i][j][year][0], json1_data[i][j][year][1], json1_data[i][j][year][3]-273.15
|
||||
=======
|
||||
json1_data[i][j][year][0],
|
||||
json1_data[i][j][year][1],
|
||||
json1_data[i][j][year][2] - 273.15,
|
||||
>>>>>>> 154bcdf2bb3d01cda303fe3e9cbbe67a926c0763
|
||||
)
|
||||
|
||||
# Apply weights
|
||||
weighted_sum = np.sum([a*b for a, b in zip(mean_glob_temps, weights)])
|
||||
weighted_sum = np.sum([a * b for a, b in zip(mean_glob_temps, weights)])
|
||||
|
||||
# Output: Global average (C) +
|
||||
# grid of U.S (precipitation (mm), evaporation (mm), surface temp(C))
|
||||
@@ -40,10 +50,19 @@ def temp_inc(init_data, year):
|
||||
translated_ev = {}
|
||||
for lat, lat_values in single_year_US.items():
|
||||
for lon, lon_values in lat_values.items():
|
||||
lat = float(lat)
|
||||
lon = float(lon)
|
||||
if lon < 0:
|
||||
lon += 180
|
||||
translated_pr["lat_" + str(lat) + "_lon_" + str(lon)] = lon_values[0]
|
||||
translated_ev["lat_" + str(lat) + "_lon_" + str(lon)] = lon_values[1]
|
||||
translated_pr[
|
||||
f"lat_{int(lat*100)}_lon_{int(lon*100)}"
|
||||
] = lon_values[0]
|
||||
translated_ev[
|
||||
f"lat_{int(lat*100)}_lon_{int(lon*100)}"
|
||||
] = lon_values[1]
|
||||
|
||||
return weighted_sum-273.15, translated_pr, translated_ev #single_year_US
|
||||
return (
|
||||
weighted_sum - 273.15,
|
||||
translated_pr,
|
||||
translated_ev,
|
||||
) # single_year_US
|
||||
|
||||
@@ -1,35 +1,48 @@
|
||||
import glob
|
||||
import sys
|
||||
|
||||
sys.path.append('/')
|
||||
from outer_wrapper import OuterWrapper
|
||||
from climate_model import temp_inc
|
||||
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
|
||||
def __init__(self):
|
||||
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
|
||||
super().__init__(model_id="gfdl_cm3", num_expected_inputs=num_input_schemas)
|
||||
super().__init__(
|
||||
model_id="gfdl_cm3", num_expected_inputs=num_input_schemas
|
||||
)
|
||||
|
||||
def configure(self, **kwargs):
|
||||
self.raw_data = kwargs['rcp26data']
|
||||
if 'rcp26data' in kwargs.keys():
|
||||
self.mean_temp, self.climate_data0, self.climate_data1 = temp_inc(self.raw_data, self.incstep)
|
||||
self.global_temp, self.precipitation, self.evaporation = temp_inc(
|
||||
self.raw_data, self.incstep
|
||||
)
|
||||
else:
|
||||
print('rcp data not found')
|
||||
|
||||
def increment(self, **kwargs):
|
||||
self.global_temp, self.precipitation, self.evaporation = temp_inc(self.raw_data, self.incstep)
|
||||
self.global_temp, self.precipitation, self.evaporation = temp_inc(
|
||||
self.raw_data, self.incstep
|
||||
)
|
||||
|
||||
results ={'gfdl_cm3':
|
||||
{'global_temp':
|
||||
{'data': {'temp': self.global_temp}, 'granularity': 'global'},
|
||||
'precipitation':
|
||||
{'data': self.precipitation, 'granularity': 'climate'},
|
||||
'evaporation':
|
||||
{'data': self.evaporation, 'granularity': 'climate'}
|
||||
}
|
||||
}
|
||||
results = {
|
||||
'gfdl_cm3': {
|
||||
'global_temp': {
|
||||
'data': {'temp': self.global_temp},
|
||||
'granularity': 'global',
|
||||
},
|
||||
'precipitation': {
|
||||
'data': self.precipitation,
|
||||
'granularity': 'latlon',
|
||||
},
|
||||
'evaporation': {
|
||||
'data': self.evaporation,
|
||||
'granularity': 'latlon',
|
||||
},
|
||||
}
|
||||
}
|
||||
return results
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import glob
|
||||
import sys
|
||||
|
||||
sys.path.append('/')
|
||||
from outer_wrapper import OuterWrapper
|
||||
import pyhector
|
||||
@@ -7,10 +8,11 @@ import json
|
||||
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
|
||||
def __init__(self):
|
||||
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
|
||||
super().__init__(model_id="hector", num_expected_inputs=num_input_schemas)
|
||||
super().__init__(
|
||||
model_id="hector", num_expected_inputs=num_input_schemas
|
||||
)
|
||||
|
||||
def configure(self, **kwargs):
|
||||
self.rcp = kwargs['bootstrap']['rcp']
|
||||
@@ -23,7 +25,16 @@ class InnerWrapper(OuterWrapper):
|
||||
print("rcp85")
|
||||
pandas_df = pyhector.run(pyhector.rcp85)
|
||||
|
||||
return {'climate': {'climate': {'data': json.loads(pandas_df["temperature.Tgav"].to_json()), 'granularity': 'global'}}}
|
||||
return {
|
||||
'climate': {
|
||||
'climate': {
|
||||
'data': json.loads(
|
||||
pandas_df["temperature.Tgav"].to_json()
|
||||
),
|
||||
'granularity': 'global',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
@@ -1,38 +1,40 @@
|
||||
|
||||
#def populationfunction(logisticpopulation):
|
||||
# def populationfunction(logisticpopulation):
|
||||
# pop={}
|
||||
# sum = 0
|
||||
# for i in logisticpopulation.keys():
|
||||
# N = logisticpopulation[i] #this is the county population
|
||||
# k = 300000000/3007 #this scales the max capacity to the county level (there are 3007 US counties)
|
||||
# r = 1.0061 #this is the growth rate
|
||||
# pop[i] = r*N*((k-N)/k) #this is the equation
|
||||
# sum = 0
|
||||
# for i in logisticpopulation.keys():
|
||||
# N = logisticpopulation[i] #this is the county population
|
||||
# k = 300000000/3007 #this scales the max capacity to the county level (there are 3007 US counties)
|
||||
# r = 1.0061 #this is the growth rate
|
||||
# pop[i] = r*N*((k-N)/k) #this is the equation
|
||||
|
||||
# return pop
|
||||
# return pop
|
||||
|
||||
#def populationfunction(logisticpopulation):
|
||||
# def populationfunction(logisticpopulation):
|
||||
# pop={}
|
||||
# sum = 0
|
||||
# for i in logisticpopulation.keys():
|
||||
# sum += logisticpopulation[i]
|
||||
#print(sum)
|
||||
# print(sum)
|
||||
# N = logisticpopulation[i] #this is the county population
|
||||
# k = (N/sum) *300000000 #this scales the max capacity to the county level (there are 3007 US counties)
|
||||
# r = 1.0061 #this is the growth rate
|
||||
# pop[i] = r*N*((k-N)/k) #this is the equation
|
||||
# r = 1.0061 #this is the growth rate
|
||||
# pop[i] = r*N*((k-N)/k) #this is the equation
|
||||
|
||||
# return pop
|
||||
|
||||
# return pop
|
||||
|
||||
def populationfunction(population):
|
||||
pop={}
|
||||
pop = {}
|
||||
mysum = 0
|
||||
for i in population.keys():
|
||||
#or j in logisticpopulation.keys()[i]:
|
||||
mysum += population[i]
|
||||
|
||||
# or j in logisticpopulation.keys()[i]:
|
||||
mysum += population[i]
|
||||
|
||||
for i in population.keys():
|
||||
N = population[i] #this is the county population
|
||||
k = (N/mysum) *400000000 #this scales the max capacity to the county level (there are 3007 US counties)
|
||||
r = 1.0071 #this is the growth rate
|
||||
pop[i] = N + r*N*((k-N)/k) #this is the equation
|
||||
N = population[i] # this is the county population
|
||||
k = (
|
||||
N / mysum
|
||||
) * 400000000 # this scales the max capacity to the county level (there are 3007 US counties)
|
||||
r = 1.0071 # this is the growth rate
|
||||
pop[i] = N + r * N * ((k - N) / k) # this is the equation
|
||||
return pop
|
||||
|
||||
@@ -1,30 +1,41 @@
|
||||
import glob
|
||||
import sys
|
||||
|
||||
sys.path.append('/')
|
||||
from outer_wrapper import OuterWrapper
|
||||
from LogisticGrowth import populationfunction
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
def __init__(self):
|
||||
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
|
||||
super().__init__(model_id="logisticpopulation", num_expected_inputs=num_input_schemas)
|
||||
super().__init__(
|
||||
model_id="logisticpopulation",
|
||||
num_expected_inputs=num_input_schemas,
|
||||
)
|
||||
|
||||
def configure(self, **kwargs):
|
||||
if '2016 populations' in kwargs.keys():
|
||||
self.population = kwargs['2016 populations']
|
||||
self.population = kwargs['2016 populations']
|
||||
else:
|
||||
print('population initialization data not found')
|
||||
|
||||
def increment(self, **kwargs):
|
||||
#if 'logisticpopulation' in kwargs.keys():
|
||||
# if 'logisticpopulation' in kwargs.keys():
|
||||
# self.population = kwargs['logistispopulation']['logisticpopulation']['data']
|
||||
#else:
|
||||
# else:
|
||||
# print('input population not found')
|
||||
|
||||
population = populationfunction(self.population)
|
||||
self.population=population
|
||||
return {'logisticpopulation': {'logisticpopulation': {'data': population, 'granularity': 'county'}}}
|
||||
self.population = population
|
||||
return {
|
||||
'logisticpopulation': {
|
||||
'logisticpopulation': {
|
||||
'data': population,
|
||||
'granularity': 'county',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
@@ -2,4 +2,3 @@ pandas
|
||||
numpy
|
||||
statsmodels
|
||||
scipy==1.2.1
|
||||
bokeh
|
||||
|
||||
@@ -24,12 +24,14 @@ def pop_sim(init_data):
|
||||
# of {county1_index: {2000: pop, 2001: pop, etc}, county2_index:...}
|
||||
# applies Holt linear trend method to predict one year ahead
|
||||
# outputted data is dict of {county_index: next_year_pop}
|
||||
|
||||
|
||||
temp = {}
|
||||
|
||||
for key, county in data.items():
|
||||
population = pd.Series(county)
|
||||
fit1 = Holt(np.asarray(population)).fit(smoothing_level=0.7, smoothing_slope=0.3)
|
||||
fit1 = Holt(np.asarray(population)).fit(
|
||||
smoothing_level=0.7, smoothing_slope=0.3
|
||||
)
|
||||
next_year = fit1.forecast(1)[0]
|
||||
temp[key] = next_year
|
||||
data[key][str(int(max(data[key].keys())) + 1)] = next_year
|
||||
@@ -38,4 +40,3 @@ def pop_sim(init_data):
|
||||
file.write(json.dumps(data))
|
||||
|
||||
return temp
|
||||
|
||||
|
||||
@@ -1,27 +1,37 @@
|
||||
import glob
|
||||
import sys
|
||||
|
||||
sys.path.append('/')
|
||||
from outer_wrapper import OuterWrapper
|
||||
from PopulationSimulation import pop_sim
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
def __init__(self):
|
||||
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
|
||||
super().__init__(model_id="population", num_expected_inputs=num_input_schemas)
|
||||
super().__init__(
|
||||
model_id="population", num_expected_inputs=num_input_schemas
|
||||
)
|
||||
|
||||
def configure(self, **kwargs):
|
||||
if 'county_populations' in kwargs.keys():
|
||||
self.data = kwargs['county_populations']
|
||||
self.data = kwargs['county_populations']
|
||||
else:
|
||||
print('population initialization data not found')
|
||||
|
||||
def increment(self, **kwargs):
|
||||
data = pop_sim(self.data)
|
||||
self.data = data
|
||||
results = {'population': {'population': {'data': data, 'granularity': 'county'}}}
|
||||
results = {
|
||||
'population': {
|
||||
'population': {'data': data, 'granularity': 'county'}
|
||||
}
|
||||
}
|
||||
return results
|
||||
#is taking in the data from the previous run
|
||||
|
||||
|
||||
# is taking in the data from the previous run
|
||||
|
||||
|
||||
def main():
|
||||
wrapper = InnerWrapper()
|
||||
|
||||
@@ -1,2 +1 @@
|
||||
pandas
|
||||
bokeh
|
||||
|
||||
@@ -8,33 +8,38 @@ Created on Wed Jul 11 14:10:24 2018
|
||||
import pandas as pd
|
||||
|
||||
|
||||
def pow_dem_sim(pop,cons):
|
||||
#sets baseline initialization if no data received
|
||||
|
||||
def pow_dem_sim(pop, cons):
|
||||
# sets baseline initialization if no data received
|
||||
|
||||
# Must receive data as dict of {county_id: current_population, ...}
|
||||
# loads in static state values
|
||||
# simply multiplies current pop by state consumption per capita
|
||||
temp = {}
|
||||
|
||||
count = pd.DataFrame(pop,index=['pop'])
|
||||
|
||||
count = pd.DataFrame(pop, index=['pop'])
|
||||
count = count.T
|
||||
count.reset_index(inplace=True)
|
||||
count['State'] = count['index'].apply(lambda x: x[:-3])
|
||||
state_pops = count.groupby('State').sum().reset_index()
|
||||
count = pd.merge(count,state_pops,on='State',how='left')
|
||||
count = pd.merge(count, state_pops, on='State', how='left')
|
||||
count['perc'] = count.apply(lambda x: x.pop_x / x.pop_y, axis=1)
|
||||
|
||||
cons_pc = pd.DataFrame(cons,index=['cons'])
|
||||
cons_pc = pd.DataFrame(cons, index=['cons'])
|
||||
cons_pc = cons_pc.T
|
||||
|
||||
count = pd.merge(count,cons_pc.reset_index(), left_on='State',right_on='index',how='left')
|
||||
count['demand'] = count.apply(lambda x: (x.pop_y * x.cons) * x.perc,axis=1)
|
||||
count = count[['index_x','demand']].set_index('index_x')
|
||||
|
||||
count = pd.merge(
|
||||
count,
|
||||
cons_pc.reset_index(),
|
||||
left_on='State',
|
||||
right_on='index',
|
||||
how='left',
|
||||
)
|
||||
count['demand'] = count.apply(
|
||||
lambda x: (x.pop_y * x.cons) * x.perc, axis=1
|
||||
)
|
||||
count = count[['index_x', 'demand']].set_index('index_x')
|
||||
|
||||
for index, row in count.iterrows():
|
||||
temp[index] = row['demand']
|
||||
|
||||
|
||||
|
||||
return temp
|
||||
|
||||
|
||||
@@ -1,35 +1,59 @@
|
||||
import glob
|
||||
import sys
|
||||
|
||||
sys.path.append('/')
|
||||
from outer_wrapper import OuterWrapper
|
||||
from DemandSimulation import pow_dem_sim #for water demand, going to do something similar ##
|
||||
from DemandSimulation import (
|
||||
pow_dem_sim,
|
||||
) # for water demand, going to do something similar ##
|
||||
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
|
||||
def __init__(self):
|
||||
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
|
||||
super().__init__(model_id="power_demand", num_expected_inputs=num_input_schemas)
|
||||
super().__init__(
|
||||
model_id="power_demand", num_expected_inputs=num_input_schemas
|
||||
)
|
||||
|
||||
def configure(self, **kwargs): #this would be the water consumption rate in here
|
||||
if 'state_consumption_per_capita' in kwargs.keys(): #instead of state, do water 2015, the json we made
|
||||
self.cons = kwargs['state_consumption_per_capita'] #copy the file name
|
||||
def configure(
|
||||
self, **kwargs
|
||||
): # this would be the water consumption rate in here
|
||||
if (
|
||||
'state_consumption_per_capita' in kwargs.keys()
|
||||
): # instead of state, do water 2015, the json we made
|
||||
self.cons = kwargs[
|
||||
'state_consumption_per_capita'
|
||||
] # copy the file name
|
||||
else:
|
||||
print('State consumption data not found')
|
||||
if '2016_populations' in kwargs.keys(): #instead of 2016 populations would put the name of the 2015 water consumption rate
|
||||
if (
|
||||
'2016_populations' in kwargs.keys()
|
||||
): # instead of 2016 populations would put the name of the 2015 water consumption rate
|
||||
self.pop = kwargs['2016_populations']
|
||||
|
||||
def increment(self, **kwargs): #this is handling all the new data that is coming from other models, the whole function would be similar besides power instead of water
|
||||
def increment(
|
||||
self, **kwargs
|
||||
): # this is handling all the new data that is coming from other models, the whole function would be similar besides power instead of water
|
||||
if 'population' in kwargs.keys():
|
||||
self.pop = kwargs['population']['population']['data'] #assume you can keep as is for now and it may work
|
||||
self.pop = kwargs['population']['population'][
|
||||
'data'
|
||||
] # assume you can keep as is for now and it may work
|
||||
else:
|
||||
print('input population not found')
|
||||
demand = pow_dem_sim(self.pop, self.cons) #instead of power demand simulation, have water demand, your inputs will be population and consumption rate, #do water demand sim, #and change inputs to the actual name of our arguments
|
||||
demand = pow_dem_sim(
|
||||
self.pop, self.cons
|
||||
) # instead of power demand simulation, have water demand, your inputs will be population and consumption rate, #do water demand sim, #and change inputs to the actual name of our arguments
|
||||
|
||||
results = {'power_demand': {'power_demand': {'data': demand, 'granularity': 'county'}}} #obviously this will all say water demand isntead of power demand
|
||||
#checks to see if it has data on population, writes it to selfpop and outputs power demand
|
||||
#does not use self because it doesnt use its own previous data
|
||||
results = {
|
||||
'power_demand': {
|
||||
'power_demand': {'data': demand, 'granularity': 'county'}
|
||||
}
|
||||
} # obviously this will all say water demand isntead of power demand
|
||||
# checks to see if it has data on population, writes it to selfpop and outputs power demand
|
||||
# does not use self because it doesnt use its own previous data
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
wrapper = InnerWrapper()
|
||||
wrapper.run()
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -1,2 +1 @@
|
||||
pandas
|
||||
bokeh
|
||||
|
||||
@@ -5,14 +5,14 @@
|
||||
"type": "object",
|
||||
"properties":{
|
||||
"data": {"type": "object"},
|
||||
"granularity": {"type": "string", "value": "NERC"}
|
||||
"granularity": {"type": "string", "value": "county"}
|
||||
}
|
||||
},
|
||||
"thermo_water": {
|
||||
"type": "object",
|
||||
"properties":{
|
||||
"data": {"type": "object"},
|
||||
"granularity": {"type": "string", "value": "NERC"}
|
||||
"granularity": {"type": "string", "value": "HUC8"}
|
||||
}
|
||||
}
|
||||
},
|
||||
@@ -6,46 +6,65 @@ Created on Wed Jul 11 15:14:47 2018
|
||||
|
||||
import pandas as pd
|
||||
|
||||
def gen_sim(demand,prof):
|
||||
|
||||
|
||||
def gen_sim(demand, prof):
|
||||
|
||||
# multiply by county demand to yield county generation profile
|
||||
# aggregate up to state level to apply profile then project back down
|
||||
counties = pd.DataFrame(demand,index=['demand']).T.reset_index().rename(columns={'index':'county'})
|
||||
counties = (
|
||||
pd.DataFrame(demand, index=['demand'])
|
||||
.T.reset_index()
|
||||
.rename(columns={'index': 'county'})
|
||||
)
|
||||
counties['state'] = counties.county.apply(lambda x: x[:-3])
|
||||
state_demand = counties.groupby('state')['demand'].sum()
|
||||
state_prof = pd.DataFrame(prof).T.reset_index().rename(columns={'index':'state'})
|
||||
state_prof = (
|
||||
pd.DataFrame(prof).T.reset_index().rename(columns={'index': 'state'})
|
||||
)
|
||||
state_demand = state_demand.to_frame().reset_index()
|
||||
counties = pd.merge(counties, state_demand, on='state',how='left')
|
||||
counties = pd.merge(counties, state_prof, on='state',how='left')
|
||||
counties['Fuel Used (MMBtu)'] = counties.apply(lambda x: (x['MMBtu per MWh']) * (x.demand_x),axis=1)
|
||||
counties['CO2 Emissions (tons)'] = counties.apply(lambda x: (x['Tons CO2 per MWh']) * (x.demand_x),axis=1)
|
||||
counties['Water Used (Mgal)'] = counties.apply(lambda x: (x['Mgal_per_MWh']) * (x.demand_x),axis=1)
|
||||
counties = counties[['county','Fuel Used (MMBtu)','CO2 Emissions (tons)','Water Used (Mgal)']].set_index('county')
|
||||
|
||||
# data = counties.to_dict(orient='index')
|
||||
|
||||
counties = pd.merge(counties, state_demand, on='state', how='left')
|
||||
counties = pd.merge(counties, state_prof, on='state', how='left')
|
||||
counties['Fuel Used (MMBtu)'] = counties.apply(
|
||||
lambda x: (x['MMBtu per MWh']) * (x.demand_x), axis=1
|
||||
)
|
||||
counties['CO2 Emissions (tons)'] = counties.apply(
|
||||
lambda x: (x['Tons CO2 per MWh']) * (x.demand_x), axis=1
|
||||
)
|
||||
counties['Water Used (Mgal)'] = counties.apply(
|
||||
lambda x: (x['Mgal_per_MWh']) * (x.demand_x), axis=1
|
||||
)
|
||||
counties = counties[
|
||||
[
|
||||
'county',
|
||||
'Fuel Used (MMBtu)',
|
||||
'CO2 Emissions (tons)',
|
||||
'Water Used (Mgal)',
|
||||
]
|
||||
].set_index('county')
|
||||
|
||||
# data = counties.to_dict(orient='index')
|
||||
|
||||
co2 = {}
|
||||
h2o = {}
|
||||
for index, row in counties.iterrows():
|
||||
co2[index] = row['CO2 Emissions (tons)']
|
||||
h2o[index] = row['Water Used (Mgal)']
|
||||
|
||||
|
||||
return co2, h2o
|
||||
#water usage (withdrawals and consumption in the 860, we care about consumption), emissions of co2 (may also be in the 860), 923 should have total power produced by each power plant (would match regional demand), aggregating up then dividing by the state population, instead of doing on the state level, we want to instead do it on the nerc level, supply can be on nerc level,wrapper between the supply and demand does the granularity work so we don't have to worry about state vs. NERC, change the input file, he broke it out and was calculating stuff on counties, would adjust and repopulate, do on NERC and calculate on the NERC level, the point in the supply model is to make power demand equal to power supply on the nerc level, need to find the aggregation and disaggregation pairing
|
||||
#want to be able to do it in county nercs
|
||||
#1. read in P.P. data
|
||||
|
||||
|
||||
# water usage (withdrawals and consumption in the 860, we care about consumption), emissions of co2 (may also be in the 860), 923 should have total power produced by each power plant (would match regional demand), aggregating up then dividing by the state population, instead of doing on the state level, we want to instead do it on the nerc level, supply can be on nerc level,wrapper between the supply and demand does the granularity work so we don't have to worry about state vs. NERC, change the input file, he broke it out and was calculating stuff on counties, would adjust and repopulate, do on NERC and calculate on the NERC level, the point in the supply model is to make power demand equal to power supply on the nerc level, need to find the aggregation and disaggregation pairing
|
||||
# want to be able to do it in county nercs
|
||||
# 1. read in P.P. data
|
||||
# P.P. I.D./County/Nerc/type/..
|
||||
# Max Capacity / 2016 Annual Predictions/C.F.?
|
||||
# CO2/Water Consumption
|
||||
#want it to be in popwer plant data
|
||||
# want it to be in popwer plant data
|
||||
# once we read in powerplant data, we have county demand, NERC Match: we are going to aggregate the powerplant data on nerc, and we will have county demand from the power demand model, it will say much power, SIMON will turn the county demand into nerc demand from the wrappers, will have a NERC demand read in, the NERC emand read in needs to be fulfilled by the power plants, we will aggregate all the pp data based off of nerc and then calculate a scaling factor
|
||||
# we will calculate scaling factor for county demand NERC match, it will be a regional nerc scaling factor for each NERC, we are going to take that scaling factor and apply it back to each pp and output co2 and water consumption, we are then going to take the scaling factor and go back to the powerplant level
|
||||
#we want supply and demand on the nerc level and then to depend what it means for each individual powerplant, want supply to match demand on nerc and then break it out at county level, maybe should not let powerplants be at maximum capacity, is there a different kind of capacity, or is it just historical use
|
||||
#capacity? there is generally a capacity factor which is like the maximum loading like if you took an integral over power usage (coal = 90%), might need a distribution network
|
||||
#2. county demand nerc match
|
||||
#3. scale p.p. data
|
||||
#4. aggregate it to anything you want, you want it as tight as possible so probably countynerc
|
||||
# we want supply and demand on the nerc level and then to depend what it means for each individual powerplant, want supply to match demand on nerc and then break it out at county level, maybe should not let powerplants be at maximum capacity, is there a different kind of capacity, or is it just historical use
|
||||
# capacity? there is generally a capacity factor which is like the maximum loading like if you took an integral over power usage (coal = 90%), might need a distribution network
|
||||
# 2. county demand nerc match
|
||||
# 3. scale p.p. data
|
||||
# 4. aggregate it to anything you want, you want it as tight as possible so probably countynerc
|
||||
## will be easily able to do things and we will be happy the nerc is there, still being processed at powerplant but given a scaling factor
|
||||
|
||||
|
||||
@@ -1,14 +1,17 @@
|
||||
import glob
|
||||
import sys
|
||||
|
||||
sys.path.append('/')
|
||||
from outer_wrapper import OuterWrapper
|
||||
from GenerationSimulation import gen_sim
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
def __init__(self):
|
||||
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
|
||||
super().__init__(model_id="power_supply", num_expected_inputs=num_input_schemas)
|
||||
super().__init__(
|
||||
model_id="power_supply", num_expected_inputs=num_input_schemas
|
||||
)
|
||||
|
||||
def configure(self, **kwargs):
|
||||
if 'state_energy_profiles' in kwargs.keys():
|
||||
@@ -25,10 +28,15 @@ class InnerWrapper(OuterWrapper):
|
||||
print('input demand not found')
|
||||
emissions, water = gen_sim(self.dem, self.prof)
|
||||
|
||||
results = {'power_output': { 'co2': {'data': emissions, 'granularity': 'county'},
|
||||
'thermo_water': {'data': water, 'granularity': 'county'}}}
|
||||
results = {
|
||||
'power_supply': {
|
||||
'co2': {'data': emissions, 'granularity': 'county'},
|
||||
'thermo_water': {'data': water, 'granularity': 'county'},
|
||||
}
|
||||
}
|
||||
return results
|
||||
|
||||
|
||||
def main():
|
||||
wrapper = InnerWrapper()
|
||||
wrapper.run()
|
||||
|
||||
@@ -1,2 +1 @@
|
||||
pandas
|
||||
bokeh
|
||||
|
||||
@@ -6,18 +6,14 @@ import json
|
||||
import pandas as pd
|
||||
|
||||
|
||||
def Water_Demand_Simulation(countypop,rate):
|
||||
def Water_Demand_Simulation(countypop, rate):
|
||||
|
||||
water={}
|
||||
water = {}
|
||||
for i in rate.keys():
|
||||
if i in countypop.keys():
|
||||
water[i]= (countypop[i]*rate[i])
|
||||
else:
|
||||
water[i]=(0)
|
||||
#with open('water_demand_2015.json', 'w') as file:
|
||||
if i in countypop.keys():
|
||||
water[i] = countypop[i] * rate[i]
|
||||
else:
|
||||
water[i] = 0
|
||||
# with open('water_demand_2015.json', 'w') as file:
|
||||
# file.write(json.dumps(data))
|
||||
return water
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,22 +1,28 @@
|
||||
import glob
|
||||
import sys
|
||||
|
||||
sys.path.append('/')
|
||||
from outer_wrapper import OuterWrapper
|
||||
from Water_Demand_Model import Water_Demand_Simulation
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
def __init__(self):
|
||||
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
|
||||
super().__init__(model_id="water_demand", num_expected_inputs=num_input_schemas)
|
||||
super().__init__(
|
||||
model_id="water_demand", num_expected_inputs=num_input_schemas
|
||||
)
|
||||
|
||||
def configure(self, **kwargs):
|
||||
if 'rates' in kwargs.keys():
|
||||
self.rate=kwargs['rates']
|
||||
if '2016_populations' in kwargs.keys(): #instead of 2016 populations would put the name of the 2015 water consumption rate
|
||||
self.rate = kwargs['rates']
|
||||
if (
|
||||
'2016_populations' in kwargs.keys()
|
||||
): # instead of 2016 populations would put the name of the 2015 water consumption rate
|
||||
self.countypop = kwargs['2016_populations']
|
||||
#replace the populations with the 2015 water consumption rate
|
||||
#need to take out the extra variable
|
||||
|
||||
# replace the populations with the 2015 water consumption rate
|
||||
# need to take out the extra variable
|
||||
def increment(self, **kwargs):
|
||||
if 'population' in kwargs.keys():
|
||||
self.countypop = kwargs['population']['population']['data']
|
||||
@@ -24,7 +30,11 @@ class InnerWrapper(OuterWrapper):
|
||||
print('input population not found')
|
||||
demand = Water_Demand_Simulation(self.countypop, self.rate)
|
||||
|
||||
results = {'water_demand': {'water_demand': {'data': demand, 'granularity': 'county'}}}
|
||||
results = {
|
||||
'water_demand': {
|
||||
'water_demand': {'data': demand, 'granularity': 'county'}
|
||||
}
|
||||
}
|
||||
return results
|
||||
|
||||
|
||||
|
||||
@@ -1,21 +1,27 @@
|
||||
import glob
|
||||
import sys
|
||||
|
||||
sys.path.append('/')
|
||||
from outer_wrapper import OuterWrapper
|
||||
|
||||
|
||||
class InnerWrapper(OuterWrapper):
|
||||
|
||||
def __init__(self):
|
||||
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
|
||||
super().__init__(model_id="unique_model_name", num_expected_inputs=num_input_schemas)
|
||||
super().__init__(
|
||||
model_id="unique_model_name", num_expected_inputs=num_input_schemas
|
||||
)
|
||||
self.data = None
|
||||
|
||||
def configure(self, **kwargs):
|
||||
self.data = kwargs['schema_name']
|
||||
|
||||
def increment(self, **kwargs):
|
||||
return {'schema_name': {'data_variable_name': {'data': {}, 'granularity': 'county'}}}
|
||||
return {
|
||||
'schema_name': {
|
||||
'data_variable_name': {'data': {}, 'granularity': 'county'}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
247
outer_wrapper.py
247
outer_wrapper.py
@@ -19,7 +19,6 @@ from collections import defaultdict
|
||||
|
||||
|
||||
class Graph(nx.DiGraph):
|
||||
|
||||
def __init__(self, filename):
|
||||
"""
|
||||
constructor for the granularity graph
|
||||
@@ -31,13 +30,26 @@ class Graph(nx.DiGraph):
|
||||
super().__init__()
|
||||
|
||||
# map the translation function names to the functions
|
||||
self.functions = {"simple_sum": self.simple_sum, "simple_average": self.simple_average, "weighted_average": self.weighted_average, "distribute_uniformly": self.distribute_uniformly, "distribute_by_area": self.distribute_by_area, "distribute_identically": self.distribute_identically}
|
||||
self.functions = {
|
||||
"simple_sum": self.simple_sum,
|
||||
"simple_average": self.simple_average,
|
||||
"weighted_average": self.weighted_average,
|
||||
"distribute_uniformly": self.distribute_uniformly,
|
||||
"distribute_by_area": self.distribute_by_area,
|
||||
"distribute_identically": self.distribute_identically,
|
||||
}
|
||||
|
||||
# build the graph by loading it from the JSON file
|
||||
with open(filename, mode='r') as json_file:
|
||||
data = json.load(json_file)
|
||||
for node in data['nodes']:
|
||||
self.add_node(node['id'], name=node.get('name'), type=node.get('type'), shape=node.get('shape'), area=node.get('area'))
|
||||
self.add_node(
|
||||
node['id'],
|
||||
name=node.get('name'),
|
||||
type=node.get('type'),
|
||||
shape=node.get('shape'),
|
||||
area=node.get('area'),
|
||||
)
|
||||
for edge in data['links']:
|
||||
self.add_edge(edge['source'], edge['target'])
|
||||
|
||||
@@ -73,10 +85,17 @@ class Graph(nx.DiGraph):
|
||||
parent_area = self.nodes[ancestor]['area']
|
||||
break
|
||||
else:
|
||||
logging.error(f"none of the nodes in {ancestors} have granularity {parent_granularity}")
|
||||
parent_area = sum([self.nodes[value[0]]['area'] for value in values])
|
||||
logging.error(
|
||||
f"none of the nodes in {ancestors} have granularity {parent_granularity}"
|
||||
)
|
||||
parent_area = sum(
|
||||
[self.nodes[value[0]]['area'] for value in values]
|
||||
)
|
||||
|
||||
return sum([value[1] * self.nodes[value[0]]['area'] for value in values]) / parent_area
|
||||
return (
|
||||
sum([value[1] * self.nodes[value[0]]['area'] for value in values])
|
||||
/ parent_area
|
||||
)
|
||||
|
||||
def distribute_uniformly(self, value, instance, child_granularity):
|
||||
"""
|
||||
@@ -86,7 +105,11 @@ class Graph(nx.DiGraph):
|
||||
:param child_granularity: the intended granularity of the transformation (the child node of the instance in the abstract graph)
|
||||
:return: a dict mapping each child node to its equal share of the parent value
|
||||
"""
|
||||
children = [child for child in self.successors(instance) if self.nodes[child]['type'] == child_granularity]
|
||||
children = [
|
||||
child
|
||||
for child in self.successors(instance)
|
||||
if self.nodes[child]['type'] == child_granularity
|
||||
]
|
||||
mean = value / len(children) if children else 0
|
||||
distributed = {child: mean for child in children}
|
||||
return distributed
|
||||
@@ -99,7 +122,11 @@ class Graph(nx.DiGraph):
|
||||
:param child_granularity: the intended granularity of the transformation (the child node of the instance in the abstract graph)
|
||||
:return: a dict mapping each child node to the parent value
|
||||
"""
|
||||
children = [child for child in self.successors(instance) if self.nodes[child]['type'] == child_granularity]
|
||||
children = [
|
||||
child
|
||||
for child in self.successors(instance)
|
||||
if self.nodes[child]['type'] == child_granularity
|
||||
]
|
||||
distributed = {child: value for child in children}
|
||||
return distributed
|
||||
|
||||
@@ -111,14 +138,20 @@ class Graph(nx.DiGraph):
|
||||
:param child_granularity: the intended granularity of the transformation (the child node of the instance in the abstract graph)
|
||||
:return: a dict mapping ecah child node to its area-proportionate share of the parent value
|
||||
"""
|
||||
children = [child for child in self.successors(instance) if self.nodes[child]['type'] == child_granularity]
|
||||
children = [
|
||||
child
|
||||
for child in self.successors(instance)
|
||||
if self.nodes[child]['type'] == child_granularity
|
||||
]
|
||||
parent_area = self.nodes[instance]["area"]
|
||||
distributed = {child: value * self.nodes[child]["area"] / parent_area for child in children}
|
||||
distributed = {
|
||||
child: value * self.nodes[child]["area"] / parent_area
|
||||
for child in children
|
||||
}
|
||||
return distributed
|
||||
|
||||
|
||||
class OuterWrapper(ABC):
|
||||
|
||||
def __init__(self, model_id, num_expected_inputs):
|
||||
"""
|
||||
constructor for the outer wrapper, an abstract base class inherited by the inner wrapper
|
||||
@@ -148,21 +181,26 @@ class OuterWrapper(ABC):
|
||||
self.input_schemas = None
|
||||
self.output_schemas = None
|
||||
self.validated_schemas = {}
|
||||
self.generic_output_schema = '{' \
|
||||
' "type": "object",' \
|
||||
' "patternProperties": {' \
|
||||
' ".*": {' \
|
||||
' "type": "object", ' \
|
||||
' "properties": {' \
|
||||
' "data": {"type": "object"}, "granularity": {"type": "string"}' \
|
||||
' },' \
|
||||
' "required": ["data", "granularity"]' \
|
||||
' }' \
|
||||
' }' \
|
||||
'}'
|
||||
self.generic_output_schema = (
|
||||
'{'
|
||||
' "type": "object",'
|
||||
' "patternProperties": {'
|
||||
' ".*": {'
|
||||
' "type": "object", '
|
||||
' "properties": {'
|
||||
' "data": {"type": "object"}, "granularity": {"type": "string"}'
|
||||
' },'
|
||||
' "required": ["data", "granularity"]'
|
||||
' }'
|
||||
' }'
|
||||
'}'
|
||||
)
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout,
|
||||
format='%(asctime)s - %(levelname)s - %(filename)s:%(funcName)s:%(lineno)d - %(message)s')
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
stream=sys.stdout,
|
||||
format='%(asctime)s - %(levelname)s - %(filename)s:%(funcName)s:%(lineno)d - %(message)s',
|
||||
)
|
||||
|
||||
def meet(self, a, b):
|
||||
sort = sorted((a, b))
|
||||
@@ -188,9 +226,15 @@ class OuterWrapper(ABC):
|
||||
parents = defaultdict(list)
|
||||
for instance, value in data.items():
|
||||
if instance not in self.instance_graph.nodes:
|
||||
logging.warning(f"instance {instance} not in instance graph")
|
||||
logging.warning(
|
||||
f"instance {instance} not in instance graph"
|
||||
)
|
||||
continue
|
||||
parent = [parent for parent in self.instance_graph.predecessors(instance) if self.instance_graph.nodes[parent]['type'] == path[1]]
|
||||
parent = [
|
||||
parent
|
||||
for parent in self.instance_graph.predecessors(instance)
|
||||
if self.instance_graph.nodes[parent]['type'] == path[1]
|
||||
]
|
||||
assert len(parent) == 1
|
||||
parents[parent[0]].append((instance, value))
|
||||
|
||||
@@ -203,7 +247,9 @@ class OuterWrapper(ABC):
|
||||
# translate to the next granularity in the path
|
||||
return self.aggregate(translated, path[1], dest, agg_name)
|
||||
else:
|
||||
raise Exception(f"error aggregating from {src} to {dest}, no path found")
|
||||
raise Exception(
|
||||
f"error aggregating from {src} to {dest}, no path found"
|
||||
)
|
||||
|
||||
def disaggregate(self, data, src, dest, disagg_name=None):
|
||||
|
||||
@@ -224,20 +270,26 @@ class OuterWrapper(ABC):
|
||||
translated = {}
|
||||
for instance, value in data.items():
|
||||
if instance not in self.instance_graph.nodes:
|
||||
logging.warning(f"instance {instance} not in instance graph")
|
||||
logging.warning(
|
||||
f"instance {instance} not in instance graph"
|
||||
)
|
||||
else:
|
||||
trans_func = self.instance_graph.functions.get(disagg_name)
|
||||
# for this parent, create a dict of child instances mapped to disaggregated values
|
||||
children = trans_func(value, instance, path[1])
|
||||
# add this parent's dict of children to the flat dict
|
||||
translated = {**translated, **children}
|
||||
|
||||
|
||||
# translate to the next granularity in the path
|
||||
return self.disaggregate(translated, path[1], dest, disagg_name)
|
||||
else:
|
||||
raise Exception(f"error disaggregating from {src} to {dest}, no path found")
|
||||
raise Exception(
|
||||
f"error disaggregating from {src} to {dest}, no path found"
|
||||
)
|
||||
|
||||
def translate(self, data, src, dest, variable, agg_name=None, disagg_name=None):
|
||||
def translate(
|
||||
self, data, src, dest, variable, agg_name=None, disagg_name=None
|
||||
):
|
||||
"""
|
||||
|
||||
:param data: dictionary mapping instance nodes (of src granularity) to their values
|
||||
@@ -265,13 +317,21 @@ class OuterWrapper(ABC):
|
||||
return self.aggregate(data, src, dest, agg_name)
|
||||
|
||||
# translate between branches of the granularity graph: disaggregate down, then aggregate back up
|
||||
elif nx.has_path(self.abstract_graph, src, self.meet(src, dest)) and nx.has_path(self.abstract_graph, dest, self.meet(src, dest)):
|
||||
disaggregated = self.disaggregate(data, src, self.meet(src, dest), disagg_name)
|
||||
aggregated = self.aggregate(disaggregated, self.meet(src, dest), dest, agg_name)
|
||||
elif nx.has_path(
|
||||
self.abstract_graph, src, self.meet(src, dest)
|
||||
) and nx.has_path(self.abstract_graph, dest, self.meet(src, dest)):
|
||||
disaggregated = self.disaggregate(
|
||||
data, src, self.meet(src, dest), disagg_name
|
||||
)
|
||||
aggregated = self.aggregate(
|
||||
disaggregated, self.meet(src, dest), dest, agg_name
|
||||
)
|
||||
return aggregated
|
||||
|
||||
else:
|
||||
raise Exception(f"error translating {variable} from {src} to {dest}, no path found")
|
||||
raise Exception(
|
||||
f"error translating {variable} from {src} to {dest}, no path found"
|
||||
)
|
||||
|
||||
def load_json_objects(self, dir_path):
|
||||
"""
|
||||
@@ -296,7 +356,9 @@ class OuterWrapper(ABC):
|
||||
:return:
|
||||
"""
|
||||
|
||||
raise NotImplementedError(f"configure() has to be implemented in the {self.model_id} inner wrapper")
|
||||
raise NotImplementedError(
|
||||
f"configure() has to be implemented in the {self.model_id} inner wrapper"
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def increment(self, **kwargs):
|
||||
@@ -306,7 +368,9 @@ class OuterWrapper(ABC):
|
||||
:return:
|
||||
"""
|
||||
|
||||
raise NotImplementedError(f"increment() has to be implemented in the {self.model_id} inner wrapper")
|
||||
raise NotImplementedError(
|
||||
f"increment() has to be implemented in the {self.model_id} inner wrapper"
|
||||
)
|
||||
|
||||
def increment_handler(self, event, incstep):
|
||||
"""
|
||||
@@ -317,12 +381,19 @@ class OuterWrapper(ABC):
|
||||
"""
|
||||
|
||||
self.increment_flag = True
|
||||
logging.info(f"about to increment, incstep {incstep}, year {self.initial_year + incstep}")
|
||||
logging.info(
|
||||
f"about to increment, incstep {incstep}, year {self.initial_year + incstep}"
|
||||
)
|
||||
self.incstep = incstep
|
||||
|
||||
# validate against input schemas
|
||||
if incstep > 1 and len(self.validated_schemas) != self.num_expected_inputs:
|
||||
logging.critical(f"number of validated schemas {len(self.validated_schemas)} != num_expected_inputs {self.num_expected_inputs}")
|
||||
if (
|
||||
incstep > 1
|
||||
and len(self.validated_schemas) != self.num_expected_inputs
|
||||
):
|
||||
logging.critical(
|
||||
f"number of validated schemas {len(self.validated_schemas)} != num_expected_inputs {self.num_expected_inputs}"
|
||||
)
|
||||
event.set()
|
||||
raise RuntimeError
|
||||
|
||||
@@ -337,7 +408,9 @@ class OuterWrapper(ABC):
|
||||
validate(data_msg, json.loads(self.generic_output_schema))
|
||||
validate(data_msg, self.output_schemas[schema_name])
|
||||
except Exception as e:
|
||||
logging.critical(f"message {data_msg} failed to validate schema {schema_name}")
|
||||
logging.critical(
|
||||
f"message {data_msg} failed to validate schema {schema_name}"
|
||||
)
|
||||
event.set()
|
||||
raise RuntimeError
|
||||
|
||||
@@ -356,7 +429,9 @@ class OuterWrapper(ABC):
|
||||
data_msg['incstep'] = self.incstep
|
||||
data_msg['year'] = self.incstep + self.initial_year
|
||||
self.pub_queue.put(data_msg)
|
||||
logging.info(f"finished increment {self.incstep}, year {self.incstep + self.initial_year}")
|
||||
logging.info(
|
||||
f"finished increment {self.incstep}, year {self.incstep + self.initial_year}"
|
||||
)
|
||||
self.incstep += 1
|
||||
|
||||
def send_status(self, event):
|
||||
@@ -386,7 +461,9 @@ class OuterWrapper(ABC):
|
||||
# kickstart the model for the first increment
|
||||
self.status = 'ready'
|
||||
|
||||
elif len(self.validated_schemas) == self.num_expected_inputs:
|
||||
elif (
|
||||
len(self.validated_schemas) == self.num_expected_inputs
|
||||
):
|
||||
# all input messages have been received and all input schemas have been validated
|
||||
self.status = 'ready'
|
||||
|
||||
@@ -451,13 +528,32 @@ class OuterWrapper(ABC):
|
||||
src_gran = message['payload'][item]['granularity']
|
||||
|
||||
# get granularity and translation functions from the schema
|
||||
dest_gran = schema['properties'][item]['properties']['granularity'].get('value', src_gran)
|
||||
agg = schema['properties'][item]['properties'].get('agg', {}).get('value')
|
||||
dagg = schema['properties'][item]['properties'].get('dagg', {}).get('value')
|
||||
dest_gran = schema['properties'][item]['properties'][
|
||||
'granularity'
|
||||
].get('value', src_gran)
|
||||
agg = (
|
||||
schema['properties'][item]['properties']
|
||||
.get('agg', {})
|
||||
.get('value')
|
||||
)
|
||||
dagg = (
|
||||
schema['properties'][item]['properties']
|
||||
.get('dagg', {})
|
||||
.get('value')
|
||||
)
|
||||
|
||||
# translate the data and update the data message
|
||||
logging.info(f"validating output: message from {name}, translating variable {item}, {src_gran} -> {dest_gran}")
|
||||
data = self.translate(message['payload'][item]['data'], src_gran, dest_gran, item, agg_name=agg, disagg_name=dagg)
|
||||
logging.info(
|
||||
f"validating output: message from {name}, translating variable {item}, {src_gran} -> {dest_gran}"
|
||||
)
|
||||
data = self.translate(
|
||||
message['payload'][item]['data'],
|
||||
src_gran,
|
||||
dest_gran,
|
||||
item,
|
||||
agg_name=agg,
|
||||
disagg_name=dagg,
|
||||
)
|
||||
message['payload'][item]['data'] = data
|
||||
message['payload'][item]['granularity'] = dest_gran
|
||||
|
||||
@@ -466,12 +562,18 @@ class OuterWrapper(ABC):
|
||||
except json.JSONDecodeError:
|
||||
logging.warning("json decode error")
|
||||
if len(matched) == 0:
|
||||
logging.info(f"message didn't match any output schemas: {message['source']}")
|
||||
logging.info(
|
||||
f"message didn't match any output schemas: {message['source']}"
|
||||
)
|
||||
elif len(matched) == 1:
|
||||
logging.info(f"message matched an output schema: {message['source']}")
|
||||
logging.info(
|
||||
f"message matched an output schema: {message['source']}"
|
||||
)
|
||||
sock.send_json(message)
|
||||
else:
|
||||
logging.critical(f"more than one output schema was matched: {message['source']}")
|
||||
logging.critical(
|
||||
f"more than one output schema was matched: {message['source']}"
|
||||
)
|
||||
event.set()
|
||||
|
||||
sock.close()
|
||||
@@ -528,9 +630,13 @@ class OuterWrapper(ABC):
|
||||
for name, schema in self.input_schemas.items():
|
||||
try:
|
||||
validate(message['payload'], schema)
|
||||
logging.info(f"schema {name} validated incoming message: {message}")
|
||||
logging.info(
|
||||
f"schema {name} validated incoming message: {message}"
|
||||
)
|
||||
if name in self.validated_schemas:
|
||||
logging.error(f"schema {name} already validated a message: {message}")
|
||||
logging.error(
|
||||
f"schema {name} already validated a message: {message}"
|
||||
)
|
||||
return False
|
||||
else:
|
||||
matched.append(schema)
|
||||
@@ -542,13 +648,32 @@ class OuterWrapper(ABC):
|
||||
src_gran = message['payload'][item]['granularity']
|
||||
|
||||
# get granularity and translation functions from the schema
|
||||
dest_gran = schema['properties'][item]['properties']['granularity'].get('value', src_gran)
|
||||
agg = schema['properties'][item]['properties'].get('agg', {}).get('value')
|
||||
dagg = schema['properties'][item]['properties'].get('dagg', {}).get('value')
|
||||
dest_gran = schema['properties'][item]['properties'][
|
||||
'granularity'
|
||||
].get('value', src_gran)
|
||||
agg = (
|
||||
schema['properties'][item]['properties']
|
||||
.get('agg', {})
|
||||
.get('value')
|
||||
)
|
||||
dagg = (
|
||||
schema['properties'][item]['properties']
|
||||
.get('dagg', {})
|
||||
.get('value')
|
||||
)
|
||||
|
||||
# translate the data and update the data message
|
||||
logging.info(f"validating input: message from {name}, translating variable {item}, {src_gran} -> {dest_gran}")
|
||||
data = self.translate(message['payload'][item]['data'], src_gran, dest_gran, item, agg_name=agg, disagg_name=dagg)
|
||||
logging.info(
|
||||
f"validating input: message from {name}, translating variable {item}, {src_gran} -> {dest_gran}"
|
||||
)
|
||||
data = self.translate(
|
||||
message['payload'][item]['data'],
|
||||
src_gran,
|
||||
dest_gran,
|
||||
item,
|
||||
agg_name=agg,
|
||||
disagg_name=dagg,
|
||||
)
|
||||
message['payload'][item]['data'] = data
|
||||
message['payload'][item]['granularity'] = dest_gran
|
||||
|
||||
@@ -559,7 +684,9 @@ class OuterWrapper(ABC):
|
||||
logging.warning("json decode error")
|
||||
|
||||
if len(matched) == 0:
|
||||
logging.info(f"message didn't match any input schemas: {message['source']}")
|
||||
logging.info(
|
||||
f"message didn't match any input schemas: {message['source']}"
|
||||
)
|
||||
return True
|
||||
|
||||
def action_worker(self, event):
|
||||
|
||||
Reference in New Issue
Block a user