python black formatting (black simon -S -l 79)

Former-commit-id: f599b58ae5f4562ad00034ecd5b26a51e8ae6fbf
This commit is contained in:
Michael T. Kelbaugh
2020-02-07 11:57:40 -05:00
parent be62d62a02
commit 8fa18b6d5a
19 changed files with 532 additions and 236 deletions

View File

@@ -9,7 +9,6 @@ import logging
class Broker:
def __init__(self):
"""
constructor for the broker
@@ -24,14 +23,17 @@ class Broker:
self.incstep = 1
self.max_incstep = 50
self.initial_year = 2016
self.boot_timer = 60 # units: seconds
self.watchdog_timer = 60 # units: seconds
self.boot_timer = 60 # units: seconds
self.watchdog_timer = 60 # units: seconds
self.client = None
self.mongo_queue = Queue()
self.broker_id = 'broker'
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout,
format='%(asctime)s - %(levelname)s - %(filename)s:%(funcName)s:%(lineno)d - %(message)s')
logging.basicConfig(
level=logging.DEBUG,
stream=sys.stdout,
format='%(asctime)s - %(levelname)s - %(filename)s:%(funcName)s:%(lineno)d - %(message)s',
)
logging.info(self.models)
def insert_into_mongodb(self, event):
@@ -119,7 +121,10 @@ class Broker:
except zmq.ZMQError:
continue
logging.info(json.dumps(message))
if message.get('source') in self.models and message.get('signal') == 'status':
if (
message.get('source') in self.models
and message.get('signal') == 'status'
):
self.models[message.get('source')] = message
self.model_tracker.add(message.get('source'))
if message.get('signal') == 'data':
@@ -173,7 +178,11 @@ class Broker:
"""
while not event.is_set():
for i in range(self.boot_timer if self.status == 'booting' else self.watchdog_timer):
for i in range(
self.boot_timer
if self.status == 'booting'
else self.watchdog_timer
):
time.sleep(1)
if self.model_tracker == set(self.models.keys()):
self.status = 'booted'
@@ -181,8 +190,12 @@ class Broker:
break
else:
missing_models = set(self.models.keys()) - self.model_tracker
logging.critical(f"Timed out waiting for {missing_models}{' to initialize' if self.status == 'booting' else ''}")
logging.critical(f"Broker will shut down now, current time: {time.ctime()}")
logging.critical(
f"Timed out waiting for {missing_models}{' to initialize' if self.status == 'booting' else ''}"
)
logging.critical(
f"Broker will shut down now, current time: {time.ctime()}"
)
event.set()
def send_increment_pulse(self, event):
@@ -198,12 +211,22 @@ class Broker:
# check to send an increment pulse
for model, status in self.models.items():
if status.get('status') != 'ready' or status.get('incstep') != self.incstep:
if (
status.get('status') != 'ready'
or status.get('incstep') != self.incstep
):
break
else:
if self.incstep > self.max_incstep and self.mongo_queue.empty():
logging.critical(f"successfully finished last increment {self.max_incstep}")
logging.critical(f"Broker will shut down now, current time: {time.ctime()}")
if (
self.incstep > self.max_incstep
and self.mongo_queue.empty()
):
logging.critical(
f"successfully finished last increment {self.max_incstep}"
)
logging.critical(
f"Broker will shut down now, current time: {time.ctime()}"
)
event.set()
else:
logging.info(f"sending increment pulse {self.incstep}")
@@ -240,10 +263,14 @@ class Broker:
watchdog_thread = Thread(target=self.watchdog, args=(shutdown,))
watchdog_thread.start()
increment_pulse_thread = Thread(target=self.send_increment_pulse, args=(shutdown,))
increment_pulse_thread = Thread(
target=self.send_increment_pulse, args=(shutdown,)
)
increment_pulse_thread.start()
mongo_thread = Thread(target=self.insert_into_mongodb, args=(shutdown,))
mongo_thread = Thread(
target=self.insert_into_mongodb, args=(shutdown,)
)
mongo_thread.start()
try:

View File

@@ -3,21 +3,27 @@ from fair.forward import fair_scm
import numpy as np
import json
#need to iterate through another dictionary that has counties as values
#sum up the values set equaal to new variable
#scale it then CFT
# need to iterate through another dictionary that has counties as values
# sum up the values set equaal to new variable
# scale it then CFT
def temperature_simulation(electric):
total = sum(list(filter(None, electric.values())))
#emissions[i]=
emissions = np.array(total)
#other_rf = np.zeros(emissions.size)
#for x in range(0, emissions.size):
# other_rf[x] = 0.5 * np.sin(2 * np.pi * (x) / 14.0)
total = sum(list(filter(None, electric.values())))
# emissions[i]=
emissions = np.array(total)
# other_rf = np.zeros(emissions.size)
#emissions=emissions*6.66667*3.5714285 #scaling factors
C,F,T = fair.forward.fair_scm(emissions_driven=True, emissions=np.array([emissions*6.66667*3.5714285]), useMultigas=False)
return T
#temperature_simulation({'co2':{'data':{5646:9.9,489247:6,234708:4.5}},'therm':[2,3,2]})
# for x in range(0, emissions.size):
# other_rf[x] = 0.5 * np.sin(2 * np.pi * (x) / 14.0)
# emissions=emissions*6.66667*3.5714285 #scaling factors
C, F, T = fair.forward.fair_scm(
emissions_driven=True,
emissions=np.array([emissions * 6.66667 * 3.5714285]),
useMultigas=False,
)
return T
# temperature_simulation({'co2':{'data':{5646:9.9,489247:6,234708:4.5}},'therm':[2,3,2]})

View File

@@ -1,18 +1,21 @@
import glob
import sys
import fair
sys.path.append('/')
from outer_wrapper import OuterWrapper
from climate import temperature_simulation
#put the json file from the output of power supply into the schemas/input file
# put the json file from the output of power supply into the schemas/input file
class InnerWrapper(OuterWrapper):
def __init__(self):
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
super().__init__(model_id="climate", num_expected_inputs=num_input_schemas)
#self.electric=None
super().__init__(
model_id="climate", num_expected_inputs=num_input_schemas
)
# self.electric=None
def configure(self, **kwargs):
if 'co2' in kwargs.keys():
@@ -23,11 +26,18 @@ class InnerWrapper(OuterWrapper):
def increment(self, **kwargs):
if 'power_output' in kwargs.keys():
self.electric = kwargs['power_output']['co2']['data']
else:
else:
print('input co2 not found')
temperature=float(temperature_simulation(self.electric))
return {'climate': { 'climate': {'data': {'global_temp': temperature}, 'granularity': 'global'}}}
temperature = float(temperature_simulation(self.electric))
return {
'climate': {
'climate': {
'data': {'global_temp': temperature},
'granularity': 'global',
}
}
}
def main():

View File

@@ -10,7 +10,7 @@ import json
def temp_inc(init_data, year):
json1_data = init_data
year = year-1
year = year - 1
mean_glob_temps = []
with open("/opt/src/weights.json") as f:
weights = json.load(f)
@@ -29,11 +29,13 @@ def temp_inc(init_data, year):
# Contiguous U.S bounded by (49 N, 122W), (24N 66W)
if 49 >= float(i) >= 23 and -68 >= float(j) >= -128:
single_year_US[i][j] = (
json1_data[i][j][year][0], json1_data[i][j][year][1], json1_data[i][j][year][2]-273.15
json1_data[i][j][year][0],
json1_data[i][j][year][1],
json1_data[i][j][year][2] - 273.15,
)
# Apply weights
weighted_sum = np.sum([a*b for a, b in zip(mean_glob_temps, weights)])
weighted_sum = np.sum([a * b for a, b in zip(mean_glob_temps, weights)])
# Output: Global average (C) +
# grid of U.S (precipitation (mm), evaporation (mm), surface temp(C))
@@ -44,7 +46,15 @@ def temp_inc(init_data, year):
lon = float(lon)
if lon < 0:
lon += 180
translated_pr["lat_" + str(lat) + "_lon_" + str(lon)] = lon_values[0]
translated_ev["lat_" + str(lat) + "_lon_" + str(lon)] = lon_values[1]
translated_pr["lat_" + str(lat) + "_lon_" + str(lon)] = lon_values[
0
]
translated_ev["lat_" + str(lat) + "_lon_" + str(lon)] = lon_values[
1
]
return weighted_sum-273.15, translated_pr, translated_ev #single_year_US
return (
weighted_sum - 273.15,
translated_pr,
translated_ev,
) # single_year_US

View File

@@ -1,35 +1,48 @@
import glob
import sys
sys.path.append('/')
from outer_wrapper import OuterWrapper
from climate_model import temp_inc
class InnerWrapper(OuterWrapper):
def __init__(self):
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
super().__init__(model_id="gfdl_cm3", num_expected_inputs=num_input_schemas)
super().__init__(
model_id="gfdl_cm3", num_expected_inputs=num_input_schemas
)
def configure(self, **kwargs):
self.raw_data = kwargs['rcp26data']
if 'rcp26data' in kwargs.keys():
self.mean_temp, self.climate_data0, self.climate_data1 = temp_inc(self.raw_data, self.incstep)
self.mean_temp, self.climate_data0, self.climate_data1 = temp_inc(
self.raw_data, self.incstep
)
else:
print('rcp data not found')
def increment(self, **kwargs):
self.global_temp, self.precipitation, self.evaporation = temp_inc(self.raw_data, self.incstep)
self.global_temp, self.precipitation, self.evaporation = temp_inc(
self.raw_data, self.incstep
)
results ={'gfdl_cm3':
{'global_temp':
{'data': {'temp': self.global_temp}, 'granularity': 'global'},
'precipitation':
{'data': self.precipitation, 'granularity': 'climate'},
'evaporation':
{'data': self.evaporation, 'granularity': 'climate'}
}
}
results = {
'gfdl_cm3': {
'global_temp': {
'data': {'temp': self.global_temp},
'granularity': 'global',
},
'precipitation': {
'data': self.precipitation,
'granularity': 'climate',
},
'evaporation': {
'data': self.evaporation,
'granularity': 'climate',
},
}
}
return results

View File

@@ -1,5 +1,6 @@
import glob
import sys
sys.path.append('/')
from outer_wrapper import OuterWrapper
import pyhector
@@ -7,10 +8,11 @@ import json
class InnerWrapper(OuterWrapper):
def __init__(self):
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
super().__init__(model_id="hector", num_expected_inputs=num_input_schemas)
super().__init__(
model_id="hector", num_expected_inputs=num_input_schemas
)
def configure(self, **kwargs):
self.rcp = kwargs['bootstrap']['rcp']
@@ -23,7 +25,16 @@ class InnerWrapper(OuterWrapper):
print("rcp85")
pandas_df = pyhector.run(pyhector.rcp85)
return {'climate': {'climate': {'data': json.loads(pandas_df["temperature.Tgav"].to_json()), 'granularity': 'global'}}}
return {
'climate': {
'climate': {
'data': json.loads(
pandas_df["temperature.Tgav"].to_json()
),
'granularity': 'global',
}
}
}
def main():

View File

@@ -1,38 +1,40 @@
#def populationfunction(logisticpopulation):
# def populationfunction(logisticpopulation):
# pop={}
# sum = 0
# for i in logisticpopulation.keys():
# N = logisticpopulation[i] #this is the county population
# k = 300000000/3007 #this scales the max capacity to the county level (there are 3007 US counties)
# r = 1.0061 #this is the growth rate
# pop[i] = r*N*((k-N)/k) #this is the equation
# sum = 0
# for i in logisticpopulation.keys():
# N = logisticpopulation[i] #this is the county population
# k = 300000000/3007 #this scales the max capacity to the county level (there are 3007 US counties)
# r = 1.0061 #this is the growth rate
# pop[i] = r*N*((k-N)/k) #this is the equation
# return pop
# return pop
#def populationfunction(logisticpopulation):
# def populationfunction(logisticpopulation):
# pop={}
# sum = 0
# for i in logisticpopulation.keys():
# sum += logisticpopulation[i]
#print(sum)
# print(sum)
# N = logisticpopulation[i] #this is the county population
# k = (N/sum) *300000000 #this scales the max capacity to the county level (there are 3007 US counties)
# r = 1.0061 #this is the growth rate
# pop[i] = r*N*((k-N)/k) #this is the equation
# r = 1.0061 #this is the growth rate
# pop[i] = r*N*((k-N)/k) #this is the equation
# return pop
# return pop
def populationfunction(population):
pop={}
pop = {}
mysum = 0
for i in population.keys():
#or j in logisticpopulation.keys()[i]:
mysum += population[i]
# or j in logisticpopulation.keys()[i]:
mysum += population[i]
for i in population.keys():
N = population[i] #this is the county population
k = (N/mysum) *400000000 #this scales the max capacity to the county level (there are 3007 US counties)
r = 1.0071 #this is the growth rate
pop[i] = N + r*N*((k-N)/k) #this is the equation
N = population[i] # this is the county population
k = (
N / mysum
) * 400000000 # this scales the max capacity to the county level (there are 3007 US counties)
r = 1.0071 # this is the growth rate
pop[i] = N + r * N * ((k - N) / k) # this is the equation
return pop

View File

@@ -1,30 +1,41 @@
import glob
import sys
sys.path.append('/')
from outer_wrapper import OuterWrapper
from LogisticGrowth import populationfunction
class InnerWrapper(OuterWrapper):
class InnerWrapper(OuterWrapper):
def __init__(self):
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
super().__init__(model_id="logisticpopulation", num_expected_inputs=num_input_schemas)
super().__init__(
model_id="logisticpopulation",
num_expected_inputs=num_input_schemas,
)
def configure(self, **kwargs):
if '2016 populations' in kwargs.keys():
self.population = kwargs['2016 populations']
self.population = kwargs['2016 populations']
else:
print('population initialization data not found')
def increment(self, **kwargs):
#if 'logisticpopulation' in kwargs.keys():
# if 'logisticpopulation' in kwargs.keys():
# self.population = kwargs['logistispopulation']['logisticpopulation']['data']
#else:
# else:
# print('input population not found')
population = populationfunction(self.population)
self.population=population
return {'logisticpopulation': {'logisticpopulation': {'data': population, 'granularity': 'county'}}}
self.population = population
return {
'logisticpopulation': {
'logisticpopulation': {
'data': population,
'granularity': 'county',
}
}
}
def main():

View File

@@ -24,12 +24,14 @@ def pop_sim(init_data):
# of {county1_index: {2000: pop, 2001: pop, etc}, county2_index:...}
# applies Holt linear trend method to predict one year ahead
# outputted data is dict of {county_index: next_year_pop}
temp = {}
for key, county in data.items():
population = pd.Series(county)
fit1 = Holt(np.asarray(population)).fit(smoothing_level=0.7, smoothing_slope=0.3)
fit1 = Holt(np.asarray(population)).fit(
smoothing_level=0.7, smoothing_slope=0.3
)
next_year = fit1.forecast(1)[0]
temp[key] = next_year
data[key][str(int(max(data[key].keys())) + 1)] = next_year
@@ -38,4 +40,3 @@ def pop_sim(init_data):
file.write(json.dumps(data))
return temp

View File

@@ -1,27 +1,37 @@
import glob
import sys
sys.path.append('/')
from outer_wrapper import OuterWrapper
from PopulationSimulation import pop_sim
class InnerWrapper(OuterWrapper):
class InnerWrapper(OuterWrapper):
def __init__(self):
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
super().__init__(model_id="population", num_expected_inputs=num_input_schemas)
super().__init__(
model_id="population", num_expected_inputs=num_input_schemas
)
def configure(self, **kwargs):
if 'county_populations' in kwargs.keys():
self.data = kwargs['county_populations']
self.data = kwargs['county_populations']
else:
print('population initialization data not found')
def increment(self, **kwargs):
data = pop_sim(self.data)
self.data = data
results = {'population': {'population': {'data': data, 'granularity': 'county'}}}
results = {
'population': {
'population': {'data': data, 'granularity': 'county'}
}
}
return results
#is taking in the data from the previous run
# is taking in the data from the previous run
def main():
wrapper = InnerWrapper()

View File

@@ -8,33 +8,38 @@ Created on Wed Jul 11 14:10:24 2018
import pandas as pd
def pow_dem_sim(pop,cons):
#sets baseline initialization if no data received
def pow_dem_sim(pop, cons):
# sets baseline initialization if no data received
# Must receive data as dict of {county_id: current_population, ...}
# loads in static state values
# simply multiplies current pop by state consumption per capita
temp = {}
count = pd.DataFrame(pop,index=['pop'])
count = pd.DataFrame(pop, index=['pop'])
count = count.T
count.reset_index(inplace=True)
count['State'] = count['index'].apply(lambda x: x[:-3])
state_pops = count.groupby('State').sum().reset_index()
count = pd.merge(count,state_pops,on='State',how='left')
count = pd.merge(count, state_pops, on='State', how='left')
count['perc'] = count.apply(lambda x: x.pop_x / x.pop_y, axis=1)
cons_pc = pd.DataFrame(cons,index=['cons'])
cons_pc = pd.DataFrame(cons, index=['cons'])
cons_pc = cons_pc.T
count = pd.merge(count,cons_pc.reset_index(), left_on='State',right_on='index',how='left')
count['demand'] = count.apply(lambda x: (x.pop_y * x.cons) * x.perc,axis=1)
count = count[['index_x','demand']].set_index('index_x')
count = pd.merge(
count,
cons_pc.reset_index(),
left_on='State',
right_on='index',
how='left',
)
count['demand'] = count.apply(
lambda x: (x.pop_y * x.cons) * x.perc, axis=1
)
count = count[['index_x', 'demand']].set_index('index_x')
for index, row in count.iterrows():
temp[index] = row['demand']
return temp

View File

@@ -1,35 +1,59 @@
import glob
import sys
sys.path.append('/')
from outer_wrapper import OuterWrapper
from DemandSimulation import pow_dem_sim #for water demand, going to do something similar ##
from DemandSimulation import (
pow_dem_sim,
) # for water demand, going to do something similar ##
class InnerWrapper(OuterWrapper):
def __init__(self):
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
super().__init__(model_id="power_demand", num_expected_inputs=num_input_schemas)
super().__init__(
model_id="power_demand", num_expected_inputs=num_input_schemas
)
def configure(self, **kwargs): #this would be the water consumption rate in here
if 'state_consumption_per_capita' in kwargs.keys(): #instead of state, do water 2015, the json we made
self.cons = kwargs['state_consumption_per_capita'] #copy the file name
def configure(
self, **kwargs
): # this would be the water consumption rate in here
if (
'state_consumption_per_capita' in kwargs.keys()
): # instead of state, do water 2015, the json we made
self.cons = kwargs[
'state_consumption_per_capita'
] # copy the file name
else:
print('State consumption data not found')
if '2016_populations' in kwargs.keys(): #instead of 2016 populations would put the name of the 2015 water consumption rate
if (
'2016_populations' in kwargs.keys()
): # instead of 2016 populations would put the name of the 2015 water consumption rate
self.pop = kwargs['2016_populations']
def increment(self, **kwargs): #this is handling all the new data that is coming from other models, the whole function would be similar besides power instead of water
def increment(
self, **kwargs
): # this is handling all the new data that is coming from other models, the whole function would be similar besides power instead of water
if 'population' in kwargs.keys():
self.pop = kwargs['population']['population']['data'] #assume you can keep as is for now and it may work
self.pop = kwargs['population']['population'][
'data'
] # assume you can keep as is for now and it may work
else:
print('input population not found')
demand = pow_dem_sim(self.pop, self.cons) #instead of power demand simulation, have water demand, your inputs will be population and consumption rate, #do water demand sim, #and change inputs to the actual name of our arguments
demand = pow_dem_sim(
self.pop, self.cons
) # instead of power demand simulation, have water demand, your inputs will be population and consumption rate, #do water demand sim, #and change inputs to the actual name of our arguments
results = {'power_demand': {'power_demand': {'data': demand, 'granularity': 'county'}}} #obviously this will all say water demand isntead of power demand
#checks to see if it has data on population, writes it to selfpop and outputs power demand
#does not use self because it doesnt use its own previous data
results = {
'power_demand': {
'power_demand': {'data': demand, 'granularity': 'county'}
}
} # obviously this will all say water demand isntead of power demand
# checks to see if it has data on population, writes it to selfpop and outputs power demand
# does not use self because it doesnt use its own previous data
return results
def main():
wrapper = InnerWrapper()
wrapper.run()

View File

@@ -5,14 +5,14 @@
"type": "object",
"properties":{
"data": {"type": "object"},
"granularity": {"type": "string", "value": "NERC"}
"granularity": {"type": "string", "value": "county"}
}
},
"thermo_water": {
"type": "object",
"properties":{
"data": {"type": "object"},
"granularity": {"type": "string", "value": "NERC"}
"granularity": {"type": "string", "value": "HUC8"}
}
}
},

View File

@@ -6,46 +6,65 @@ Created on Wed Jul 11 15:14:47 2018
import pandas as pd
def gen_sim(demand,prof):
def gen_sim(demand, prof):
# multiply by county demand to yield county generation profile
# aggregate up to state level to apply profile then project back down
counties = pd.DataFrame(demand,index=['demand']).T.reset_index().rename(columns={'index':'county'})
counties = (
pd.DataFrame(demand, index=['demand'])
.T.reset_index()
.rename(columns={'index': 'county'})
)
counties['state'] = counties.county.apply(lambda x: x[:-3])
state_demand = counties.groupby('state')['demand'].sum()
state_prof = pd.DataFrame(prof).T.reset_index().rename(columns={'index':'state'})
state_prof = (
pd.DataFrame(prof).T.reset_index().rename(columns={'index': 'state'})
)
state_demand = state_demand.to_frame().reset_index()
counties = pd.merge(counties, state_demand, on='state',how='left')
counties = pd.merge(counties, state_prof, on='state',how='left')
counties['Fuel Used (MMBtu)'] = counties.apply(lambda x: (x['MMBtu per MWh']) * (x.demand_x),axis=1)
counties['CO2 Emissions (tons)'] = counties.apply(lambda x: (x['Tons CO2 per MWh']) * (x.demand_x),axis=1)
counties['Water Used (Mgal)'] = counties.apply(lambda x: (x['Mgal_per_MWh']) * (x.demand_x),axis=1)
counties = counties[['county','Fuel Used (MMBtu)','CO2 Emissions (tons)','Water Used (Mgal)']].set_index('county')
# data = counties.to_dict(orient='index')
counties = pd.merge(counties, state_demand, on='state', how='left')
counties = pd.merge(counties, state_prof, on='state', how='left')
counties['Fuel Used (MMBtu)'] = counties.apply(
lambda x: (x['MMBtu per MWh']) * (x.demand_x), axis=1
)
counties['CO2 Emissions (tons)'] = counties.apply(
lambda x: (x['Tons CO2 per MWh']) * (x.demand_x), axis=1
)
counties['Water Used (Mgal)'] = counties.apply(
lambda x: (x['Mgal_per_MWh']) * (x.demand_x), axis=1
)
counties = counties[
[
'county',
'Fuel Used (MMBtu)',
'CO2 Emissions (tons)',
'Water Used (Mgal)',
]
].set_index('county')
# data = counties.to_dict(orient='index')
co2 = {}
h2o = {}
for index, row in counties.iterrows():
co2[index] = row['CO2 Emissions (tons)']
h2o[index] = row['Water Used (Mgal)']
return co2, h2o
#water usage (withdrawals and consumption in the 860, we care about consumption), emissions of co2 (may also be in the 860), 923 should have total power produced by each power plant (would match regional demand), aggregating up then dividing by the state population, instead of doing on the state level, we want to instead do it on the nerc level, supply can be on nerc level,wrapper between the supply and demand does the granularity work so we don't have to worry about state vs. NERC, change the input file, he broke it out and was calculating stuff on counties, would adjust and repopulate, do on NERC and calculate on the NERC level, the point in the supply model is to make power demand equal to power supply on the nerc level, need to find the aggregation and disaggregation pairing
#want to be able to do it in county nercs
#1. read in P.P. data
# water usage (withdrawals and consumption in the 860, we care about consumption), emissions of co2 (may also be in the 860), 923 should have total power produced by each power plant (would match regional demand), aggregating up then dividing by the state population, instead of doing on the state level, we want to instead do it on the nerc level, supply can be on nerc level,wrapper between the supply and demand does the granularity work so we don't have to worry about state vs. NERC, change the input file, he broke it out and was calculating stuff on counties, would adjust and repopulate, do on NERC and calculate on the NERC level, the point in the supply model is to make power demand equal to power supply on the nerc level, need to find the aggregation and disaggregation pairing
# want to be able to do it in county nercs
# 1. read in P.P. data
# P.P. I.D./County/Nerc/type/..
# Max Capacity / 2016 Annual Predictions/C.F.?
# CO2/Water Consumption
#want it to be in popwer plant data
# want it to be in popwer plant data
# once we read in powerplant data, we have county demand, NERC Match: we are going to aggregate the powerplant data on nerc, and we will have county demand from the power demand model, it will say much power, SIMON will turn the county demand into nerc demand from the wrappers, will have a NERC demand read in, the NERC emand read in needs to be fulfilled by the power plants, we will aggregate all the pp data based off of nerc and then calculate a scaling factor
# we will calculate scaling factor for county demand NERC match, it will be a regional nerc scaling factor for each NERC, we are going to take that scaling factor and apply it back to each pp and output co2 and water consumption, we are then going to take the scaling factor and go back to the powerplant level
#we want supply and demand on the nerc level and then to depend what it means for each individual powerplant, want supply to match demand on nerc and then break it out at county level, maybe should not let powerplants be at maximum capacity, is there a different kind of capacity, or is it just historical use
#capacity? there is generally a capacity factor which is like the maximum loading like if you took an integral over power usage (coal = 90%), might need a distribution network
#2. county demand nerc match
#3. scale p.p. data
#4. aggregate it to anything you want, you want it as tight as possible so probably countynerc
# we want supply and demand on the nerc level and then to depend what it means for each individual powerplant, want supply to match demand on nerc and then break it out at county level, maybe should not let powerplants be at maximum capacity, is there a different kind of capacity, or is it just historical use
# capacity? there is generally a capacity factor which is like the maximum loading like if you took an integral over power usage (coal = 90%), might need a distribution network
# 2. county demand nerc match
# 3. scale p.p. data
# 4. aggregate it to anything you want, you want it as tight as possible so probably countynerc
## will be easily able to do things and we will be happy the nerc is there, still being processed at powerplant but given a scaling factor

View File

@@ -1,14 +1,17 @@
import glob
import sys
sys.path.append('/')
from outer_wrapper import OuterWrapper
from GenerationSimulation import gen_sim
class InnerWrapper(OuterWrapper):
class InnerWrapper(OuterWrapper):
def __init__(self):
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
super().__init__(model_id="power_supply", num_expected_inputs=num_input_schemas)
super().__init__(
model_id="power_supply", num_expected_inputs=num_input_schemas
)
def configure(self, **kwargs):
if 'state_energy_profiles' in kwargs.keys():
@@ -25,10 +28,15 @@ class InnerWrapper(OuterWrapper):
print('input demand not found')
emissions, water = gen_sim(self.dem, self.prof)
results = {'power_supply': { 'co2': {'data': emissions, 'granularity': 'county'},
'thermo_water': {'data': water, 'granularity': 'county'}}}
results = {
'power_supply': {
'co2': {'data': emissions, 'granularity': 'county'},
'thermo_water': {'data': water, 'granularity': 'county'},
}
}
return results
def main():
wrapper = InnerWrapper()
wrapper.run()

View File

@@ -6,18 +6,14 @@ import json
import pandas as pd
def Water_Demand_Simulation(countypop,rate):
def Water_Demand_Simulation(countypop, rate):
water={}
water = {}
for i in rate.keys():
if i in countypop.keys():
water[i]= (countypop[i]*rate[i])
else:
water[i]=(0)
#with open('water_demand_2015.json', 'w') as file:
if i in countypop.keys():
water[i] = countypop[i] * rate[i]
else:
water[i] = 0
# with open('water_demand_2015.json', 'w') as file:
# file.write(json.dumps(data))
return water

View File

@@ -1,22 +1,28 @@
import glob
import sys
sys.path.append('/')
from outer_wrapper import OuterWrapper
from Water_Demand_Model import Water_Demand_Simulation
class InnerWrapper(OuterWrapper):
class InnerWrapper(OuterWrapper):
def __init__(self):
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
super().__init__(model_id="water_demand", num_expected_inputs=num_input_schemas)
super().__init__(
model_id="water_demand", num_expected_inputs=num_input_schemas
)
def configure(self, **kwargs):
if 'rates' in kwargs.keys():
self.rate=kwargs['rates']
if '2016_populations' in kwargs.keys(): #instead of 2016 populations would put the name of the 2015 water consumption rate
self.rate = kwargs['rates']
if (
'2016_populations' in kwargs.keys()
): # instead of 2016 populations would put the name of the 2015 water consumption rate
self.countypop = kwargs['2016_populations']
#replace the populations with the 2015 water consumption rate
#need to take out the extra variable
# replace the populations with the 2015 water consumption rate
# need to take out the extra variable
def increment(self, **kwargs):
if 'population' in kwargs.keys():
self.countypop = kwargs['population']['population']['data']
@@ -24,7 +30,11 @@ class InnerWrapper(OuterWrapper):
print('input population not found')
demand = Water_Demand_Simulation(self.countypop, self.rate)
results = {'water_demand': {'water_demand': {'data': demand, 'granularity': 'county'}}}
results = {
'water_demand': {
'water_demand': {'data': demand, 'granularity': 'county'}
}
}
return results

View File

@@ -1,21 +1,27 @@
import glob
import sys
sys.path.append('/')
from outer_wrapper import OuterWrapper
class InnerWrapper(OuterWrapper):
def __init__(self):
num_input_schemas = len(glob.glob("/opt/schemas/input/*.json"))
super().__init__(model_id="unique_model_name", num_expected_inputs=num_input_schemas)
super().__init__(
model_id="unique_model_name", num_expected_inputs=num_input_schemas
)
self.data = None
def configure(self, **kwargs):
self.data = kwargs['schema_name']
def increment(self, **kwargs):
return {'schema_name': {'data_variable_name': {'data': {}, 'granularity': 'county'}}}
return {
'schema_name': {
'data_variable_name': {'data': {}, 'granularity': 'county'}
}
}
def main():

View File

@@ -19,7 +19,6 @@ from collections import defaultdict
class Graph(nx.DiGraph):
def __init__(self, filename):
"""
constructor for the granularity graph
@@ -31,13 +30,26 @@ class Graph(nx.DiGraph):
super().__init__()
# map the translation function names to the functions
self.functions = {"simple_sum": self.simple_sum, "simple_average": self.simple_average, "weighted_average": self.weighted_average, "distribute_uniformly": self.distribute_uniformly, "distribute_by_area": self.distribute_by_area, "distribute_identically": self.distribute_identically}
self.functions = {
"simple_sum": self.simple_sum,
"simple_average": self.simple_average,
"weighted_average": self.weighted_average,
"distribute_uniformly": self.distribute_uniformly,
"distribute_by_area": self.distribute_by_area,
"distribute_identically": self.distribute_identically,
}
# build the graph by loading it from the JSON file
with open(filename, mode='r') as json_file:
data = json.load(json_file)
for node in data['nodes']:
self.add_node(node['id'], name=node.get('name'), type=node.get('type'), shape=node.get('shape'), area=node.get('area'))
self.add_node(
node['id'],
name=node.get('name'),
type=node.get('type'),
shape=node.get('shape'),
area=node.get('area'),
)
for edge in data['links']:
self.add_edge(edge['source'], edge['target'])
@@ -73,10 +85,17 @@ class Graph(nx.DiGraph):
parent_area = self.nodes[ancestor]['area']
break
else:
logging.error(f"none of the nodes in {ancestors} have granularity {parent_granularity}")
parent_area = sum([self.nodes[value[0]]['area'] for value in values])
logging.error(
f"none of the nodes in {ancestors} have granularity {parent_granularity}"
)
parent_area = sum(
[self.nodes[value[0]]['area'] for value in values]
)
return sum([value[1] * self.nodes[value[0]]['area'] for value in values]) / parent_area
return (
sum([value[1] * self.nodes[value[0]]['area'] for value in values])
/ parent_area
)
def distribute_uniformly(self, value, instance, child_granularity):
"""
@@ -86,7 +105,11 @@ class Graph(nx.DiGraph):
:param child_granularity: the intended granularity of the transformation (the child node of the instance in the abstract graph)
:return: a dict mapping each child node to its equal share of the parent value
"""
children = [child for child in self.successors(instance) if self.nodes[child]['type'] == child_granularity]
children = [
child
for child in self.successors(instance)
if self.nodes[child]['type'] == child_granularity
]
mean = value / len(children) if children else 0
distributed = {child: mean for child in children}
return distributed
@@ -99,7 +122,11 @@ class Graph(nx.DiGraph):
:param child_granularity: the intended granularity of the transformation (the child node of the instance in the abstract graph)
:return: a dict mapping each child node to the parent value
"""
children = [child for child in self.successors(instance) if self.nodes[child]['type'] == child_granularity]
children = [
child
for child in self.successors(instance)
if self.nodes[child]['type'] == child_granularity
]
distributed = {child: value for child in children}
return distributed
@@ -111,14 +138,20 @@ class Graph(nx.DiGraph):
:param child_granularity: the intended granularity of the transformation (the child node of the instance in the abstract graph)
:return: a dict mapping ecah child node to its area-proportionate share of the parent value
"""
children = [child for child in self.successors(instance) if self.nodes[child]['type'] == child_granularity]
children = [
child
for child in self.successors(instance)
if self.nodes[child]['type'] == child_granularity
]
parent_area = self.nodes[instance]["area"]
distributed = {child: value * self.nodes[child]["area"] / parent_area for child in children}
distributed = {
child: value * self.nodes[child]["area"] / parent_area
for child in children
}
return distributed
class OuterWrapper(ABC):
def __init__(self, model_id, num_expected_inputs):
"""
constructor for the outer wrapper, an abstract base class inherited by the inner wrapper
@@ -148,21 +181,26 @@ class OuterWrapper(ABC):
self.input_schemas = None
self.output_schemas = None
self.validated_schemas = {}
self.generic_output_schema = '{' \
' "type": "object",' \
' "patternProperties": {' \
' ".*": {' \
' "type": "object", ' \
' "properties": {' \
' "data": {"type": "object"}, "granularity": {"type": "string"}' \
' },' \
' "required": ["data", "granularity"]' \
' }' \
' }' \
'}'
self.generic_output_schema = (
'{'
' "type": "object",'
' "patternProperties": {'
' ".*": {'
' "type": "object", '
' "properties": {'
' "data": {"type": "object"}, "granularity": {"type": "string"}'
' },'
' "required": ["data", "granularity"]'
' }'
' }'
'}'
)
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout,
format='%(asctime)s - %(levelname)s - %(filename)s:%(funcName)s:%(lineno)d - %(message)s')
logging.basicConfig(
level=logging.DEBUG,
stream=sys.stdout,
format='%(asctime)s - %(levelname)s - %(filename)s:%(funcName)s:%(lineno)d - %(message)s',
)
def meet(self, a, b):
sort = sorted((a, b))
@@ -188,9 +226,15 @@ class OuterWrapper(ABC):
parents = defaultdict(list)
for instance, value in data.items():
if instance not in self.instance_graph.nodes:
logging.warning(f"instance {instance} not in instance graph")
logging.warning(
f"instance {instance} not in instance graph"
)
continue
parent = [parent for parent in self.instance_graph.predecessors(instance) if self.instance_graph.nodes[parent]['type'] == path[1]]
parent = [
parent
for parent in self.instance_graph.predecessors(instance)
if self.instance_graph.nodes[parent]['type'] == path[1]
]
assert len(parent) == 1
parents[parent[0]].append((instance, value))
@@ -203,7 +247,9 @@ class OuterWrapper(ABC):
# translate to the next granularity in the path
return self.aggregate(translated, path[1], dest, agg_name)
else:
raise Exception(f"error aggregating from {src} to {dest}, no path found")
raise Exception(
f"error aggregating from {src} to {dest}, no path found"
)
def disaggregate(self, data, src, dest, disagg_name=None):
@@ -224,20 +270,26 @@ class OuterWrapper(ABC):
translated = {}
for instance, value in data.items():
if instance not in self.instance_graph.nodes:
logging.warning(f"instance {instance} not in instance graph")
logging.warning(
f"instance {instance} not in instance graph"
)
else:
trans_func = self.instance_graph.functions.get(disagg_name)
# for this parent, create a dict of child instances mapped to disaggregated values
children = trans_func(value, instance, path[1])
# add this parent's dict of children to the flat dict
translated = {**translated, **children}
# translate to the next granularity in the path
return self.disaggregate(translated, path[1], dest, disagg_name)
else:
raise Exception(f"error disaggregating from {src} to {dest}, no path found")
raise Exception(
f"error disaggregating from {src} to {dest}, no path found"
)
def translate(self, data, src, dest, variable, agg_name=None, disagg_name=None):
def translate(
self, data, src, dest, variable, agg_name=None, disagg_name=None
):
"""
:param data: dictionary mapping instance nodes (of src granularity) to their values
@@ -265,13 +317,21 @@ class OuterWrapper(ABC):
return self.aggregate(data, src, dest, agg_name)
# translate between branches of the granularity graph: disaggregate down, then aggregate back up
elif nx.has_path(self.abstract_graph, src, self.meet(src, dest)) and nx.has_path(self.abstract_graph, dest, self.meet(src, dest)):
disaggregated = self.disaggregate(data, src, self.meet(src, dest), disagg_name)
aggregated = self.aggregate(disaggregated, self.meet(src, dest), dest, agg_name)
elif nx.has_path(
self.abstract_graph, src, self.meet(src, dest)
) and nx.has_path(self.abstract_graph, dest, self.meet(src, dest)):
disaggregated = self.disaggregate(
data, src, self.meet(src, dest), disagg_name
)
aggregated = self.aggregate(
disaggregated, self.meet(src, dest), dest, agg_name
)
return aggregated
else:
raise Exception(f"error translating {variable} from {src} to {dest}, no path found")
raise Exception(
f"error translating {variable} from {src} to {dest}, no path found"
)
def load_json_objects(self, dir_path):
"""
@@ -296,7 +356,9 @@ class OuterWrapper(ABC):
:return:
"""
raise NotImplementedError(f"configure() has to be implemented in the {self.model_id} inner wrapper")
raise NotImplementedError(
f"configure() has to be implemented in the {self.model_id} inner wrapper"
)
@abstractmethod
def increment(self, **kwargs):
@@ -306,7 +368,9 @@ class OuterWrapper(ABC):
:return:
"""
raise NotImplementedError(f"increment() has to be implemented in the {self.model_id} inner wrapper")
raise NotImplementedError(
f"increment() has to be implemented in the {self.model_id} inner wrapper"
)
def increment_handler(self, event, incstep):
"""
@@ -317,12 +381,19 @@ class OuterWrapper(ABC):
"""
self.increment_flag = True
logging.info(f"about to increment, incstep {incstep}, year {self.initial_year + incstep}")
logging.info(
f"about to increment, incstep {incstep}, year {self.initial_year + incstep}"
)
self.incstep = incstep
# validate against input schemas
if incstep > 1 and len(self.validated_schemas) != self.num_expected_inputs:
logging.critical(f"number of validated schemas {len(self.validated_schemas)} != num_expected_inputs {self.num_expected_inputs}")
if (
incstep > 1
and len(self.validated_schemas) != self.num_expected_inputs
):
logging.critical(
f"number of validated schemas {len(self.validated_schemas)} != num_expected_inputs {self.num_expected_inputs}"
)
event.set()
raise RuntimeError
@@ -337,7 +408,9 @@ class OuterWrapper(ABC):
validate(data_msg, json.loads(self.generic_output_schema))
validate(data_msg, self.output_schemas[schema_name])
except Exception as e:
logging.critical(f"message {data_msg} failed to validate schema {schema_name}")
logging.critical(
f"message {data_msg} failed to validate schema {schema_name}"
)
event.set()
raise RuntimeError
@@ -356,7 +429,9 @@ class OuterWrapper(ABC):
data_msg['incstep'] = self.incstep
data_msg['year'] = self.incstep + self.initial_year
self.pub_queue.put(data_msg)
logging.info(f"finished increment {self.incstep}, year {self.incstep + self.initial_year}")
logging.info(
f"finished increment {self.incstep}, year {self.incstep + self.initial_year}"
)
self.incstep += 1
def send_status(self, event):
@@ -386,7 +461,9 @@ class OuterWrapper(ABC):
# kickstart the model for the first increment
self.status = 'ready'
elif len(self.validated_schemas) == self.num_expected_inputs:
elif (
len(self.validated_schemas) == self.num_expected_inputs
):
# all input messages have been received and all input schemas have been validated
self.status = 'ready'
@@ -451,13 +528,32 @@ class OuterWrapper(ABC):
src_gran = message['payload'][item]['granularity']
# get granularity and translation functions from the schema
dest_gran = schema['properties'][item]['properties']['granularity'].get('value', src_gran)
agg = schema['properties'][item]['properties'].get('agg', {}).get('value')
dagg = schema['properties'][item]['properties'].get('dagg', {}).get('value')
dest_gran = schema['properties'][item]['properties'][
'granularity'
].get('value', src_gran)
agg = (
schema['properties'][item]['properties']
.get('agg', {})
.get('value')
)
dagg = (
schema['properties'][item]['properties']
.get('dagg', {})
.get('value')
)
# translate the data and update the data message
logging.info(f"validating output: message from {name}, translating variable {item}, {src_gran} -> {dest_gran}")
data = self.translate(message['payload'][item]['data'], src_gran, dest_gran, item, agg_name=agg, disagg_name=dagg)
logging.info(
f"validating output: message from {name}, translating variable {item}, {src_gran} -> {dest_gran}"
)
data = self.translate(
message['payload'][item]['data'],
src_gran,
dest_gran,
item,
agg_name=agg,
disagg_name=dagg,
)
message['payload'][item]['data'] = data
message['payload'][item]['granularity'] = dest_gran
@@ -466,12 +562,18 @@ class OuterWrapper(ABC):
except json.JSONDecodeError:
logging.warning("json decode error")
if len(matched) == 0:
logging.info(f"message didn't match any output schemas: {message['source']}")
logging.info(
f"message didn't match any output schemas: {message['source']}"
)
elif len(matched) == 1:
logging.info(f"message matched an output schema: {message['source']}")
logging.info(
f"message matched an output schema: {message['source']}"
)
sock.send_json(message)
else:
logging.critical(f"more than one output schema was matched: {message['source']}")
logging.critical(
f"more than one output schema was matched: {message['source']}"
)
event.set()
sock.close()
@@ -528,9 +630,13 @@ class OuterWrapper(ABC):
for name, schema in self.input_schemas.items():
try:
validate(message['payload'], schema)
logging.info(f"schema {name} validated incoming message: {message}")
logging.info(
f"schema {name} validated incoming message: {message}"
)
if name in self.validated_schemas:
logging.error(f"schema {name} already validated a message: {message}")
logging.error(
f"schema {name} already validated a message: {message}"
)
return False
else:
matched.append(schema)
@@ -542,13 +648,32 @@ class OuterWrapper(ABC):
src_gran = message['payload'][item]['granularity']
# get granularity and translation functions from the schema
dest_gran = schema['properties'][item]['properties']['granularity'].get('value', src_gran)
agg = schema['properties'][item]['properties'].get('agg', {}).get('value')
dagg = schema['properties'][item]['properties'].get('dagg', {}).get('value')
dest_gran = schema['properties'][item]['properties'][
'granularity'
].get('value', src_gran)
agg = (
schema['properties'][item]['properties']
.get('agg', {})
.get('value')
)
dagg = (
schema['properties'][item]['properties']
.get('dagg', {})
.get('value')
)
# translate the data and update the data message
logging.info(f"validating input: message from {name}, translating variable {item}, {src_gran} -> {dest_gran}")
data = self.translate(message['payload'][item]['data'], src_gran, dest_gran, item, agg_name=agg, disagg_name=dagg)
logging.info(
f"validating input: message from {name}, translating variable {item}, {src_gran} -> {dest_gran}"
)
data = self.translate(
message['payload'][item]['data'],
src_gran,
dest_gran,
item,
agg_name=agg,
disagg_name=dagg,
)
message['payload'][item]['data'] = data
message['payload'][item]['granularity'] = dest_gran
@@ -559,7 +684,9 @@ class OuterWrapper(ABC):
logging.warning("json decode error")
if len(matched) == 0:
logging.info(f"message didn't match any input schemas: {message['source']}")
logging.info(
f"message didn't match any input schemas: {message['source']}"
)
return True
def action_worker(self, event):