mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-01-09 22:57:59 -05:00
215 lines
6.9 KiB
Python
Executable File
215 lines
6.9 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
# This file is part of DarkFi (https://dark.fi)
|
|
#
|
|
# Copyright (C) 2020-2026 Dyne.org foundation
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import sys, toml, json, urwid, asyncio, logging
|
|
import src.util
|
|
|
|
from os.path import exists, join
|
|
from pathlib import Path
|
|
from src.model import Model
|
|
from src.rpc import JsonRpc
|
|
from src.view import View
|
|
|
|
class Dnetview:
|
|
|
|
def __init__(self):
|
|
self.ev = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(self.ev)
|
|
self.queue = asyncio.Queue()
|
|
|
|
os = src.util.get_os()
|
|
config_path = src.util.user_config_dir('darkfi', os)
|
|
|
|
suffix = '.toml'
|
|
filename = 'dnet_config'
|
|
path = join(config_path, filename + suffix)
|
|
self.config = src.util.spawn_config(path)
|
|
|
|
self.model = Model()
|
|
self.view = View(self.model)
|
|
self.rpc_conns = []
|
|
|
|
async def subscribe(self, rpc, node):
|
|
name = node['name']
|
|
host = node['host']
|
|
port = node['port']
|
|
type = node['type']
|
|
info = {}
|
|
|
|
await self.connect_loop(rpc, node, info)
|
|
|
|
if type == 'NORMAL':
|
|
await self.get_info(rpc, node, info)
|
|
while True:
|
|
await asyncio.sleep(0.01)
|
|
data = await rpc.reader.readline()
|
|
try:
|
|
data = json.loads(data)
|
|
info[name] = (type, data)
|
|
await self.queue.put(info)
|
|
except Exception as e:
|
|
logging.debug(f'{name} RPC on port {port} disconnected {e}')
|
|
# Attempt reconnection
|
|
await self.connect_loop(rpc, node, info)
|
|
await self.get_info(rpc, node, info)
|
|
|
|
if type == 'LILITH':
|
|
while True:
|
|
await asyncio.sleep(1)
|
|
try:
|
|
await self.lilith_spawns(rpc, node, info)
|
|
except Exception as e:
|
|
logging.debug(f'{name} RPC on port {port} disconnected {e}')
|
|
await self.connect_loop(rpc, node, info)
|
|
|
|
async def lilith_spawns(self, rpc, node, info):
|
|
name = node['name']
|
|
host = node['host']
|
|
port = node['port']
|
|
type = node['type']
|
|
|
|
data = await rpc._make_request('spawns', [])
|
|
info[name] = (type, data)
|
|
await self.queue.put(info)
|
|
|
|
async def get_info(self, rpc, node, info):
|
|
name = node['name']
|
|
host = node['host']
|
|
port = node['port']
|
|
type = node['type']
|
|
|
|
data = await rpc._make_request('p2p.get_info', [])
|
|
|
|
if 'error' in data:
|
|
logging.error(f"Error calling 'p2p.get_info' for '{name}'. Is it disabled with 'rpc_disabled_methods' in {name}_config.toml?")
|
|
|
|
info[name] = (type, data)
|
|
|
|
await self.queue.put(info)
|
|
await rpc.dnet_switch(True)
|
|
await rpc.dnet_subscribe_events()
|
|
|
|
async def connect_loop(self, rpc, node, info):
|
|
name = node['name']
|
|
host = node['host']
|
|
port = node['port']
|
|
type = node['type']
|
|
|
|
while True:
|
|
try:
|
|
await rpc.start(host, port)
|
|
logging.debug(f'Started {host} RPC on port {port}')
|
|
break
|
|
except Exception as e:
|
|
logging.debug(f'Cannot connect to {host} RPC on port {port}')
|
|
info[name] = (type, {})
|
|
await self.queue.put(info)
|
|
|
|
# Sleep for 2s before trying again
|
|
await asyncio.sleep(2)
|
|
continue
|
|
|
|
async def start_connect_slots(self, nodes):
|
|
tasks = []
|
|
async with asyncio.TaskGroup() as tg:
|
|
for i, node in enumerate(nodes):
|
|
rpc = JsonRpc()
|
|
self.rpc_conns.append(rpc)
|
|
subscribe = tg.create_task(self.subscribe(
|
|
rpc, node))
|
|
nodes = tg.create_task(self.update_info())
|
|
|
|
async def update_info(self):
|
|
while True:
|
|
info = {}
|
|
data = await self.queue.get()
|
|
|
|
# We parse in this manner so we can differentiate between
|
|
# normal nodes and lilith nodes when they are offline.
|
|
type = list(data.values())[0][0]
|
|
values = list(data.values())[0][1]
|
|
key = list(data.keys())[0]
|
|
info[key] = values
|
|
|
|
if not values and type == 'LILITH':
|
|
self.model.add_offline(info, True)
|
|
if not values and type == 'NORMAL':
|
|
self.model.add_offline(info, False)
|
|
|
|
if 'error' in values:
|
|
logging.error(f'{data}');
|
|
|
|
if 'result' in values:
|
|
result = values.get('result')
|
|
if 'spawns' in result:
|
|
self.model.add_lilith(info)
|
|
if 'channels' in result:
|
|
self.model.add_node(info)
|
|
|
|
if 'params' in values:
|
|
self.model.add_event(info)
|
|
|
|
self.queue.task_done()
|
|
|
|
def main(self):
|
|
logging.basicConfig(filename='dnet.log',
|
|
format='%(asctime)s %(levelname)s:%(name)s:%(message)s',
|
|
encoding='utf-8',
|
|
level=logging.DEBUG)
|
|
|
|
# Set urwid log to ERROR, effectively silencing it.
|
|
urwid_logger = logging.getLogger('urwid')
|
|
urwid_logger.setLevel(logging.ERROR)
|
|
|
|
nodes = self.config.get('nodes')
|
|
|
|
loop = urwid.MainLoop(self.view.ui, self.view.palette,
|
|
unhandled_input=self.unhandled_input,
|
|
event_loop=urwid.AsyncioEventLoop(
|
|
loop=self.ev))
|
|
|
|
self.ev.create_task(self.start_connect_slots(nodes))
|
|
self.ev.create_task(self.view.update_view(self.ev, loop))
|
|
|
|
try:
|
|
loop.run()
|
|
finally:
|
|
self.ev.run_until_complete(self.shutdown())
|
|
|
|
|
|
def unhandled_input(self, key):
|
|
if isinstance(key, tuple):
|
|
return
|
|
if key in ('q'):
|
|
for task in asyncio.all_tasks():
|
|
task.cancel()
|
|
raise urwid.ExitMainLoop()
|
|
|
|
async def shutdown(self):
|
|
for rpc in self.rpc_conns:
|
|
try:
|
|
await rpc.dnet_switch(False)
|
|
await rpc.stop()
|
|
except:
|
|
pass
|
|
|
|
if __name__ == '__main__':
|
|
dnet = Dnetview()
|
|
dnet.main()
|