diff --git a/bin/deg/README.md b/bin/deg/README.md
new file mode 100644
index 000000000..721a3a6a6
--- /dev/null
+++ b/bin/deg/README.md
@@ -0,0 +1,75 @@
+# deg (Debugging Event Graph)
+
+A simple tui to explore darkfi's Event Graph state. Displays:
+
+1. Active components of EventGraph.
+2. Live protocol msgs.
+3. Update active components.
+
+## Run
+
+### Using a venv
+
+`Deg` requires Python 3.12.0. Make sure Python is installed and on the
+latest version.
+
+Depending on your setup you may need to install a virtual environment
+for Python. Do so as follows:
+
+```shell
+% python -m venv python-env
+% source python-env/bin/activate
+```
+
+Then install the requirements:
+
+```shell
+% pip install -r requirements.txt
+```
+
+Run deg:
+
+```shell
+% ./deg
+```
+
+You will need to reactivate the venv in your current terminal session
+each time you use `deg` as follows:
+
+```shell
+% source python-env/bin/activate
+```
+
+### Without a venv
+
+If you don't require a venv, install the requirements and run `deg` as follows:
+
+```shell
+% pip install -r requirements.txt
+% python main.py
+```
+
+## Config
+
+On first run, `deg` will create a config file in the config directory
+specific to your operating system.
+
+To use `deg` you will need to open the config file and modify it. Enter
+the RPC ports of the nodes you want to connect to and title them as you
+see fit. The default config file uses localhost, but you can replace
+this with hostnames or external IP addresses. You must also specify
+whether it is a `NORMAL` or a `LILITH` node.
+
+## Usage
+
+Navigate up and down using the arrow keys. Scroll the message log using
+`PageUp` and `PageDown`. Type `q` to quit.
+
+## Logging
+
+deg creates a log file in `bin/deg/deg.log`. To see json data and
+other debug info, tail the file like so:
+
+```shell
+tail -f deg.log
+```
diff --git a/bin/deg/deg b/bin/deg/deg
new file mode 100755
index 000000000..c32f3b162
--- /dev/null
+++ b/bin/deg/deg
@@ -0,0 +1,142 @@
+#!/usr/bin/python3
+
+# This file is part of DarkFi (https://dark.fi)
+#
+# Copyright (C) 2020-2024 Dyne.org foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import json, urwid, asyncio, logging
+import src.util
+
+from os.path import join
+from src.model import Model
+from src.rpc import JsonRpc
+from src.view import View
+
+class Degview:
+
+ def __init__(self):
+ self.ev = asyncio.new_event_loop()
+ asyncio.set_event_loop(self.ev)
+ self.queue = asyncio.Queue()
+
+ os = src.util.get_os()
+ config_path = src.util.user_config_dir('darkfi', os)
+
+ suffix = '.toml'
+ filename = 'deg_config'
+ path = join(config_path, filename + suffix)
+ self.config = src.util.spawn_config(path)
+
+ self.model = Model()
+ self.view = View(self.model)
+
+ async def subscribe(self, rpc, node):
+ name = node['name']
+ host = node['host']
+ port = node['port']
+ type = node['type']
+ info = {}
+
+ while True:
+ try:
+ await rpc.start(host, port)
+ logging.debug(f'Started {name} RPC on port {port}')
+ break
+ except Exception as e:
+ info[name] = {}
+ await self.queue.put(info)
+ continue
+
+ if type == 'NORMAL':
+ data = await rpc._make_request('eventgraph.get_info', [])
+ info[name] = data
+
+ await self.queue.put(info)
+ await rpc.deg_switch(True)
+ await rpc.deg_subscribe_events()
+
+ while True:
+ await asyncio.sleep(0.01)
+ data = await rpc.reader.readline()
+ try:
+ data = json.loads(data)
+ info[name] = data
+ await self.queue.put(info)
+ except:
+ info[name] = {}
+ await self.queue.put(info)
+
+ await rpc.deg_switch(False)
+
+ await rpc.stop()
+
+ async def start_connect_slots(self, nodes):
+ tasks = []
+ async with asyncio.TaskGroup() as tg:
+ for i, node in enumerate(nodes):
+ rpc = JsonRpc()
+ subscribe = tg.create_task(self.subscribe(
+ rpc, node))
+ nodes = tg.create_task(self.update_info())
+
+ async def update_info(self):
+ while True:
+ info = await self.queue.get()
+ values = list(info.values())[0]
+
+ if not values:
+ self.model.add_offline(info)
+
+ logging.debug(f"values: {values}")
+
+ if 'result' in values:
+ result = values.get('result')
+ if 'eventgraph_info' in result:
+ self.model.add_eg(info)
+
+ if 'params' in values:
+ self.model.add_event(info)
+
+ self.queue.task_done()
+
+ def main(self):
+ logging.basicConfig(filename='deg.log',
+ encoding='utf-8',
+ level=logging.DEBUG)
+ nodes = self.config.get('nodes')
+
+ loop = urwid.MainLoop(self.view.ui, self.view.palette,
+ unhandled_input=self.unhandled_input,
+ event_loop=urwid.AsyncioEventLoop(
+ loop=self.ev))
+
+ self.ev.create_task(self.start_connect_slots(nodes))
+ self.ev.create_task(self.view.update_view(self.ev, loop))
+
+ loop.run()
+
+ def unhandled_input(self, key):
+ if isinstance(key, tuple):
+ return
+ if key in ('q'):
+ for task in asyncio.all_tasks():
+ task.cancel()
+ raise urwid.ExitMainLoop()
+
+
+if __name__ == '__main__':
+ deg = Degview()
+ deg.main()
diff --git a/bin/deg/deg_config.toml b/bin/deg/deg_config.toml
new file mode 100644
index 000000000..d54b41788
--- /dev/null
+++ b/bin/deg/deg_config.toml
@@ -0,0 +1,11 @@
+[[nodes]]
+name = "darkirc"
+host = "localhost"
+port = 26660
+type = "NORMAL"
+
+[[nodes]]
+name = "genev"
+host = "localhost"
+port = 28880
+type = "NORMAL"
diff --git a/bin/deg/requirements.txt b/bin/deg/requirements.txt
new file mode 100644
index 000000000..71fc29e80
--- /dev/null
+++ b/bin/deg/requirements.txt
@@ -0,0 +1,2 @@
+toml==0.10.2
+urwid==2.2.3
diff --git a/bin/deg/src/__init__.py b/bin/deg/src/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/bin/deg/src/model.py b/bin/deg/src/model.py
new file mode 100644
index 000000000..3d97e1086
--- /dev/null
+++ b/bin/deg/src/model.py
@@ -0,0 +1,89 @@
+# This file is part of DarkFi (https://dark.fi)
+#
+# Copyright (C) 2020-2024 Dyne.org foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import logging, time
+import datetime as dt
+from collections import defaultdict as dd
+
+
+class Model:
+
+ def __init__(self):
+ self.nodes = {}
+
+ def add_eg(self, node):
+ name = list(node.keys())[0]
+ values = list(node.values())[0]
+ info = values['result']['eventgraph_info']
+
+ self.nodes[name] = {}
+ self.nodes[name]['current_genesis'] = {}
+ self.nodes[name]['broadcasted_ids'] = {}
+ self.nodes[name]['synced'] = {}
+ self.nodes[name]['event'] = {}
+ self.nodes[name]['unreferenced_tips'] = {}
+ self.nodes[name]['msgs'] = dd(list)
+
+ if info['current_genesis']:
+ self.nodes[name]['current_genesis'] = info['current_genesis']
+
+ if info['broadcasted_ids']:
+ self.nodes[name]['broadcasted_ids'] = info['broadcasted_ids']
+
+ if info['synced']:
+ self.nodes[name]['synced'] = info['synced']
+
+ if info['unreferenced_tips']:
+ self.nodes[name]['unreferenced_tips'] = info['unreferenced_tips']
+
+ def add_offline(self, node):
+ name = list(node.keys())[0]
+ values = list(node.values())[0]
+ self.nodes[name] = values
+
+ def add_event(self, event):
+ name = list(event.keys())[0]
+ values = list(event.values())[0]
+ params = values.get('params')
+ event = params[0].get('event')
+ info = params[0].get('info')
+
+ t = time.localtime()
+ current_time = time.strftime('%H:%M:%S', t)
+
+ match event:
+ case 'send':
+ nano = info.get('time')
+ cmd = info.get('cmd')
+ ev_info = info.get('info')
+ t = (dt.datetime
+ .fromtimestamp(int(nano)/1000000000)
+ .strftime('%H:%M:%S'))
+ msgs = self.nodes[name]['msgs']
+ msgs[name].append((t, "send", cmd, ev_info))
+ case 'recv':
+ nano = info.get('time')
+ cmd = info.get('cmd')
+ ev_info = info.get('info')
+ t = (dt.datetime
+ .fromtimestamp(int(nano)/1000000000)
+ .strftime('%H:%M:%S'))
+ msgs = self.nodes[name]['msgs']
+ msgs[name].append((t, "recv", cmd, ev_info))
+
+ def __repr__(self):
+ return f'{self.nodes}'
diff --git a/bin/deg/src/rpc.py b/bin/deg/src/rpc.py
new file mode 100644
index 000000000..f00e81161
--- /dev/null
+++ b/bin/deg/src/rpc.py
@@ -0,0 +1,74 @@
+# This file is part of DarkFi (https://dark.fi)
+#
+# Copyright (C) 2020-2024 Dyne.org foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import json
+import random
+import logging
+import asyncio
+
+
+class JsonRpc:
+
+ async def start(self, host, port):
+ logging.info(f"trying to connect to {host}:{port}")
+ reader, writer = await asyncio.open_connection(host, port)
+ self.reader = reader
+ self.writer = writer
+
+ async def stop(self):
+ self.writer.close()
+ await self.writer.wait_closed()
+
+ async def _make_request(self, method, params):
+ ident = random.randint(0, 2**16)
+ request = {
+ "jsonrpc": "2.0",
+ "method": method,
+ "params": params,
+ "id": ident,
+ }
+
+ message = json.dumps(request) + "\n"
+ self.writer.write(message.encode())
+ await self.writer.drain()
+ data = await self.reader.readline()
+ message = data.decode().strip()
+ response = json.loads(message)
+ return response
+
+ async def _subscribe(self, method, params):
+ ident = random.randint(0, 2**16)
+ request = {
+ "jsonrpc": "2.0",
+ "method": method,
+ "params": params,
+ "id": ident,
+ }
+
+ message = json.dumps(request) + "\n"
+ self.writer.write(message.encode())
+ await self.writer.drain()
+ logging.debug("Subscribed")
+
+ async def ping(self):
+ return await self._make_request("ping", [])
+
+ async def deg_switch(self, state):
+ return await self._make_request("deg.switch", [state])
+
+ async def deg_subscribe_events(self):
+ return await self._subscribe("deg.subscribe_events", [])
diff --git a/bin/deg/src/scroll.py b/bin/deg/src/scroll.py
new file mode 100644
index 000000000..450294428
--- /dev/null
+++ b/bin/deg/src/scroll.py
@@ -0,0 +1,576 @@
+# This file is part of DarkFi (https://dark.fi)
+#
+# Copyright (C) 2020-2024 Dyne.org foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import urwid
+from urwid.widget import (BOX, FLOW, FIXED)
+
+# Scroll actions
+SCROLL_LINE_UP = 'line up'
+SCROLL_LINE_DOWN = 'line down'
+SCROLL_PAGE_UP = 'page up'
+SCROLL_PAGE_DOWN = 'page down'
+SCROLL_TO_TOP = 'to top'
+SCROLL_TO_END = 'to end'
+
+# Scrollbar positions
+SCROLLBAR_LEFT = 'left'
+SCROLLBAR_RIGHT = 'right'
+
+# Add support for ScrollBar class (see stig.tui.scroll)
+# https://github.com/urwid/urwid/issues/226
+class ListBox_patched(urwid.ListBox):
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._rows_max = None
+
+ def _invalidate(self):
+ super()._invalidate()
+ self._rows_max = None
+
+ def get_scrollpos(self, size, focus=False):
+ """Current scrolling position
+ Lower limit is 0, upper limit is the highest index of `body`.
+ """
+ middle, top, bottom = self.calculate_visible(size, focus)
+ if middle is None:
+ return 0
+ else:
+ offset_rows, _, focus_pos, _, _ = middle
+ maxcol, maxrow = size
+ flow_size = (maxcol,)
+
+ body = self.body
+ if hasattr(body, 'positions'):
+ # For body[pos], pos can be anything, not just an int. In that
+ # case, the positions() method returns an interable of valid
+ # positions.
+ positions = tuple(self.body.positions())
+ focus_index = positions.index(focus_pos)
+ widgets_above_focus = (body[pos] for pos in positions[:focus_index])
+ else:
+ # Treat body like a normal list
+ widgets_above_focus = (w for w in body[:focus_pos])
+
+ rows_above_focus = sum(w.rows(flow_size) for w in widgets_above_focus)
+ rows_above_top = rows_above_focus - offset_rows
+ return rows_above_top
+
+ def rows_max(self, size, focus=False):
+ if self._rows_max is None:
+ flow_size = (size[0],)
+ body = self.body
+ if hasattr(body, 'positions'):
+ self._rows_max = sum(body[pos].rows(flow_size) for pos in body.positions())
+ else:
+ self._rows_max = sum(w.rows(flow_size) for w in self.body)
+ return self._rows_max
+
+urwid.ListBox = ListBox_patched
+
+class Scrollable(urwid.WidgetDecoration):
+
+ def sizing(self):
+ return frozenset([BOX,])
+
+ def selectable(self):
+ return True
+
+ def __init__(self, widget):
+ """Box widget that makes a fixed or flow widget vertically scrollable
+
+ TODO: Focusable widgets are handled, including switching focus, but
+ possibly not intuitively, depending on the arrangement of widgets. When
+ switching focus to a widget that is ouside of the visible part of the
+ original widget, the canvas scrolls up/down to the focused widget. It
+ would be better to scroll until the next focusable widget is in sight
+ first. But for that to work we must somehow obtain a list of focusable
+ rows in the original canvas.
+ """
+ if not any(s in widget.sizing() for s in (FIXED, FLOW)):
+ raise ValueError('Not a fixed or flow widget: %r' % widget)
+ self._trim_top = 0
+ self._scroll_action = None
+ self._forward_keypress = None
+ self._old_cursor_coords = None
+ self._rows_max_cached = 0
+ self.__super.__init__(widget)
+
+ def render(self, size, focus=False):
+ maxcol, maxrow = size
+
+ # Render complete original widget
+ ow = self._original_widget
+ ow_size = self._get_original_widget_size(size)
+ canv_full = ow.render(ow_size, focus)
+
+ # Make full canvas editable
+ canv = urwid.CompositeCanvas(canv_full)
+ canv_cols, canv_rows = canv.cols(), canv.rows()
+
+ if canv_cols <= maxcol:
+ pad_width = maxcol - canv_cols
+ if pad_width > 0:
+ # Canvas is narrower than available horizontal space
+ canv.pad_trim_left_right(0, pad_width)
+
+ if canv_rows <= maxrow:
+ fill_height = maxrow - canv_rows
+ if fill_height > 0:
+ # Canvas is lower than available vertical space
+ canv.pad_trim_top_bottom(0, fill_height)
+
+ if canv_cols <= maxcol and canv_rows <= maxrow:
+ # Canvas is small enough to fit without trimming
+ return canv
+
+ self._adjust_trim_top(canv, size)
+
+ # Trim canvas if necessary
+ trim_top = self._trim_top
+ trim_end = canv_rows - maxrow - trim_top
+ trim_right = canv_cols - maxcol
+ if trim_top > 0:
+ canv.trim(trim_top)
+ if trim_end > 0:
+ canv.trim_end(trim_end)
+ if trim_right > 0:
+ canv.pad_trim_left_right(0, -trim_right)
+
+ # Disable cursor display if cursor is outside of visible canvas parts
+ if canv.cursor is not None:
+ curscol, cursrow = canv.cursor
+ if cursrow >= maxrow or cursrow < 0:
+ canv.cursor = None
+
+ # Figure out whether we should forward keypresses to original widget
+ if canv.cursor is not None:
+ # Trimmed canvas contains the cursor, e.g. in an Edit widget
+ self._forward_keypress = True
+ else:
+ if canv_full.cursor is not None:
+ # Full canvas contains the cursor, but scrolled out of view
+ self._forward_keypress = False
+ else:
+ # Original widget does not have a cursor, but may be selectable
+
+ # FIXME: Using ow.selectable() is bad because the original
+ # widget may be selectable because it's a container widget with
+ # a key-grabbing widget that is scrolled out of view.
+ # ow.selectable() returns True anyway because it doesn't know
+ # how we trimmed our canvas.
+ #
+ # To fix this, we need to resolve ow.focus and somehow
+ # ask canv whether it contains bits of the focused widget. I
+ # can't see a way to do that.
+ if ow.selectable():
+ self._forward_keypress = True
+ else:
+ self._forward_keypress = False
+
+ return canv
+
+ def keypress(self, size, key):
+ # Maybe offer key to original widget
+ if self._forward_keypress:
+ ow = self._original_widget
+ ow_size = self._get_original_widget_size(size)
+
+ # Remember previous cursor position if possible
+ if hasattr(ow, 'get_cursor_coords'):
+ self._old_cursor_coords = ow.get_cursor_coords(ow_size)
+
+ key = ow.keypress(ow_size, key)
+ if key is None:
+ return None
+
+ # Handle up/down, page up/down, etc
+ command_map = self._command_map
+ if command_map[key] == urwid.CURSOR_UP:
+ self._scroll_action = SCROLL_LINE_UP
+ elif command_map[key] == urwid.CURSOR_DOWN:
+ self._scroll_action = SCROLL_LINE_DOWN
+
+ elif command_map[key] == urwid.CURSOR_PAGE_UP:
+ self._scroll_action = SCROLL_PAGE_UP
+ elif command_map[key] == urwid.CURSOR_PAGE_DOWN:
+ self._scroll_action = SCROLL_PAGE_DOWN
+
+ elif command_map[key] == urwid.CURSOR_MAX_LEFT: # 'home'
+ self._scroll_action = SCROLL_TO_TOP
+ elif command_map[key] == urwid.CURSOR_MAX_RIGHT: # 'end'
+ self._scroll_action = SCROLL_TO_END
+
+ else:
+ return key
+
+ self._invalidate()
+
+ def mouse_event(self, size, event, button, col, row, focus):
+ ow = self._original_widget
+ if hasattr(ow, 'mouse_event'):
+ ow_size = self._get_original_widget_size(size)
+ row += self._trim_top
+ return ow.mouse_event(ow_size, event, button, col, row, focus)
+ else:
+ return False
+
+ def _adjust_trim_top(self, canv, size):
+ """Adjust self._trim_top according to self._scroll_action"""
+ action = self._scroll_action
+ self._scroll_action = None
+
+ maxcol, maxrow = size
+ trim_top = self._trim_top
+ canv_rows = canv.rows()
+
+ if trim_top < 0:
+ # Negative trim_top values use bottom of canvas as reference
+ trim_top = canv_rows - maxrow + trim_top + 1
+
+ if canv_rows <= maxrow:
+ self._trim_top = 0 # Reset scroll position
+ return
+
+ def ensure_bounds(new_trim_top):
+ return max(0, min(canv_rows - maxrow, new_trim_top))
+
+ if action == SCROLL_LINE_UP:
+ self._trim_top = ensure_bounds(trim_top - 1)
+ elif action == SCROLL_LINE_DOWN:
+ self._trim_top = ensure_bounds(trim_top + 1)
+
+ elif action == SCROLL_PAGE_UP:
+ self._trim_top = ensure_bounds(trim_top - maxrow+1)
+ elif action == SCROLL_PAGE_DOWN:
+ self._trim_top = ensure_bounds(trim_top + maxrow-1)
+
+ elif action == SCROLL_TO_TOP:
+ self._trim_top = 0
+ elif action == SCROLL_TO_END:
+ self._trim_top = canv_rows - maxrow
+
+ else:
+ self._trim_top = ensure_bounds(trim_top)
+
+ # If the cursor was moved by the most recent keypress, adjust trim_top
+ # so that the new cursor position is within the displayed canvas part.
+ # But don't do this if the cursor is at the top/bottom edge so we can still scroll out
+ if self._old_cursor_coords is not None and self._old_cursor_coords != canv.cursor:
+ self._old_cursor_coords = None
+ curscol, cursrow = canv.cursor
+ if cursrow < self._trim_top:
+ self._trim_top = cursrow
+ elif cursrow >= self._trim_top + maxrow:
+ self._trim_top = max(0, cursrow - maxrow + 1)
+
+ def _get_original_widget_size(self, size):
+ ow = self._original_widget
+ sizing = ow.sizing()
+ if FIXED in sizing:
+ return ()
+ elif FLOW in sizing:
+ return (size[0],)
+
+ def get_scrollpos(self, size=None, focus=False):
+ """Current scrolling position
+
+ Lower limit is 0, upper limit is the maximum number of rows with the
+ given maxcol minus maxrow.
+
+ NOTE: The returned value may be too low or too high if the position has
+ changed but the widget wasn't rendered yet.
+ """
+ return self._trim_top
+
+ def set_scrollpos(self, position):
+ """Set scrolling position
+
+ If `position` is positive it is interpreted as lines from the top.
+ If `position` is negative it is interpreted as lines from the bottom.
+
+ Values that are too high or too low values are automatically adjusted
+ during rendering.
+ """
+ self._trim_top = int(position)
+ self._invalidate()
+
+ def rows_max(self, size=None, focus=False):
+ """Return the number of rows for `size`
+
+ If `size` is not given, the currently rendered number of rows is returned.
+ """
+ if size is not None:
+ ow = self._original_widget
+ ow_size = self._get_original_widget_size(size)
+ sizing = ow.sizing()
+ if FIXED in sizing:
+ self._rows_max_cached = ow.pack(ow_size, focus)[1]
+ elif FLOW in sizing:
+ self._rows_max_cached = ow.rows(ow_size, focus)
+ else:
+ raise RuntimeError('Not a flow/box widget: %r' % self._original_widget)
+ return self._rows_max_cached
+
+
+DEFAULT_THUMB_CHAR = '\u2588'
+DEFAULT_TROUGH_CHAR = " "
+DEFAULT_SIDE = SCROLLBAR_RIGHT
+
+
+class ScrollBar(urwid.WidgetDecoration):
+
+ _thumb_char = DEFAULT_THUMB_CHAR
+ _trough_char = DEFAULT_TROUGH_CHAR
+ _thumb_indicator_top = None
+ _thumb_indicator_bottom = None
+ _scroll_bar_side = DEFAULT_SIDE
+
+ def sizing(self):
+ return frozenset((BOX,))
+
+ def selectable(self):
+ return True
+
+ def __init__(self, widget,
+ thumb_char=None, trough_char=None,
+ thumb_indicator_top=None, thumb_indicator_bottom=None,
+ side=DEFAULT_SIDE, width=1,
+ always_visible=False):
+ """Box widget that adds a scrollbar to `widget`
+
+ `widget` must be a box widget with the following methods:
+ - `get_scrollpos` takes the arguments `size` and `focus` and returns
+ the index of the first visible row.
+ - `set_scrollpos` (optional; needed for mouse click support) takes the
+ index of the first visible row.
+ - `rows_max` takes `size` and `focus` and returns the total number of
+ rows `widget` can render.
+
+ `thumb_char` is the character used for the scrollbar handle.
+ `trough_char` is used for the space above and below the handle.
+ `side` must be 'left' or 'right'.
+ `width` specifies the number of columns the scrollbar uses.
+ `always_visible` will always draw the scrollbar, even when unnecessary.
+ """
+ if BOX not in widget.sizing():
+ raise ValueError('Not a box widget: %r' % widget)
+ self.__super.__init__(widget)
+ if thumb_char is not None:
+ self._thumb_char = thumb_char
+ if trough_char is not None:
+ self._trough_char = trough_char
+ if thumb_indicator_top is not None:
+ self._thumb_indicator_top = thumb_indicator_top
+ if thumb_indicator_bottom is not None:
+ self._thumb_indicator_bottom = thumb_indicator_bottom
+
+ self.scrollbar_side = side
+ self.scrollbar_width = max(1, width)
+ self.always_visible = always_visible
+ self._original_widget_size = (0, 0)
+
+ def render(self, size, focus=False):
+ maxcol, maxrow = size
+
+ sb_width = self._scrollbar_width
+ ow_size = (max(0, maxcol - sb_width), maxrow)
+ sb_width = maxcol - ow_size[0]
+
+ ow = self._original_widget
+ ow_base = self.scrolling_base_widget
+ if not self.always_visible:
+ ow_rows_max = ow_base.rows_max(size, focus)
+ if ow_rows_max <= maxrow:
+ # Canvas fits without scrolling - no scrollbar needed
+ self._original_widget_size = size
+ return ow.render(size, focus)
+ ow_rows_max = ow_base.rows_max(ow_size, focus)
+
+ ow_canv = ow.render(ow_size, focus)
+ self._original_widget_size = ow_size
+
+ pos = ow_base.get_scrollpos(ow_size, focus)
+ posmax = ow_rows_max - maxrow
+
+ # Thumb shrinks/grows according to the ratio of
+ # /
+ thumb_weight = min(1, maxrow / max(1, ow_rows_max))
+ thumb_height = max(1, round(thumb_weight * maxrow))
+
+ # Thumb may only touch top/bottom if the first/last row is visible
+ top_weight = float(pos) / max(1, posmax)
+ top_height = int((maxrow-thumb_height) * top_weight)
+ if top_height == 0 and top_weight > 0:
+ top_height = 1
+
+ # Bottom part is remaining space
+ bottom_height = maxrow - thumb_height - top_height
+ assert thumb_height + top_height + bottom_height == maxrow
+
+ # Create scrollbar canvas
+ # Creating SolidCanvases of correct height may result in "cviews do not
+ # fill gaps in shard_tail!" or "cviews overflow gaps in shard_tail!"
+ # exceptions. Stacking the same SolidCanvas is a workaround.
+ # https://github.com/urwid/urwid/issues/226#issuecomment-437176837
+
+ thumb_top = thumb_bottom = None
+ if (self._thumb_indicator_top
+ or self._thumb_indicator_bottom) and hasattr(ow.body, "positions"):
+ if hasattr(ow.body, "focus"):
+ pos = ow.body.focus
+ elif hasattr(ow.body, "get_focus"):
+ pos = ow.body.get_focus()[1]
+
+ try:
+ head = next(iter(ow.body.positions()))
+ except StopIteration:
+ head = None
+ if pos == head:
+ if isinstance(self._thumb_indicator_top, tuple):
+ attr, char = self._thumb_indicator_top
+ else:
+ attr, char = None, self._thumb_indicator_top
+
+ if char:
+ thumb_top = urwid.Text(
+ (attr, char * sb_width),
+ wrap="any"
+ ).render((sb_width,))
+ if thumb_height:
+ thumb_height -= 1
+ try:
+ tail = next(iter(ow.body.positions(reverse=True)))
+ except StopIteration:
+ tail = None
+ if pos == tail:
+ if isinstance(self._thumb_indicator_bottom, tuple):
+ attr, char = self._thumb_indicator_bottom
+ else:
+ attr, char = None, self._thumb_indicator_bottom
+
+ if char:
+ thumb_bottom = urwid.Text(
+ (attr, char * sb_width),
+ wrap="any"
+ ).render((sb_width,))
+ if thumb_height:
+ thumb_height -= 1
+
+ if isinstance(self._trough_char, tuple):
+ trough_attr, trough_char = self._trough_char
+ else:
+ trough_attr, trough_char = None, self._trough_char
+
+ top = urwid.Text(
+ (trough_attr, trough_char * top_height * sb_width),
+ wrap="any"
+ ).render((sb_width,))
+
+ if isinstance(self._thumb_char, tuple):
+ thumb_attr, thumb_char = self._thumb_char
+ else:
+ thumb_attr, thumb_char = (None, self._thumb_char)
+ thumb = urwid.Text(
+ (thumb_attr, thumb_char * thumb_height * sb_width),
+ wrap="any"
+ ).render((sb_width,))
+
+ bottom = urwid.Text(
+ (trough_attr, trough_char * bottom_height * sb_width),
+ wrap="any"
+ ).render((sb_width,))
+
+
+ sb_canv = urwid.CanvasCombine(
+ [ (top, None, False)] * (1 if top_height else 0) +
+ [ (thumb_top, None, False)] * (1 if thumb_top else 0) +
+ [ (thumb, None, False)] * (1 if thumb_height else 0) +
+ [ (thumb_bottom, None, False)] * (1 if thumb_bottom else 0) +
+ [ (bottom, None, False)] * (1 if bottom_height else 0)
+ )
+
+ combinelist = [(ow_canv, None, True, ow_size[0]),
+ (sb_canv, None, False, sb_width)]
+ if self._scrollbar_side != SCROLLBAR_LEFT:
+ return urwid.CanvasJoin(combinelist)
+ else:
+ return urwid.CanvasJoin(reversed(combinelist))
+
+ @property
+ def scrollbar_width(self):
+ """Columns the scrollbar uses"""
+ return max(1, self._scrollbar_width)
+
+ @scrollbar_width.setter
+ def scrollbar_width(self, width):
+ self._scrollbar_width = max(1, int(width))
+ self._invalidate()
+
+ @property
+ def scrollbar_side(self):
+ """Where to display the scrollbar; must be 'left' or 'right'"""
+ return self._scrollbar_side
+
+ @scrollbar_side.setter
+ def scrollbar_side(self, side):
+ if side not in (SCROLLBAR_LEFT, SCROLLBAR_RIGHT):
+ raise ValueError('scrollbar_side must be "left" or "right", not %r' % side)
+ self._scrollbar_side = side
+ self._invalidate()
+
+ @property
+ def scrolling_base_widget(self):
+ """Nearest `original_widget` that is compatible with the scrolling API"""
+ def orig_iter(w):
+ while hasattr(w, 'original_widget'):
+ w = w.original_widget
+ yield w
+ yield w
+
+ def is_scrolling_widget(w):
+ return hasattr(w, 'get_scrollpos') and hasattr(w, 'rows_max')
+
+ for w in orig_iter(self):
+ if is_scrolling_widget(w):
+ return w
+ raise ValueError('Not compatible to be wrapped by ScrollBar: %r' % w)
+
+ def keypress(self, size, key):
+ return self._original_widget.keypress(self._original_widget_size, key)
+
+ def mouse_event(self, size, event, button, col, row, focus):
+ ow = self._original_widget
+ ow_size = self._original_widget_size
+ handled = False
+ if hasattr(ow, 'mouse_event'):
+ handled = ow.mouse_event(ow_size, event, button, col, row, focus)
+
+ if not handled and hasattr(ow, 'set_scrollpos'):
+ if button == 4: # scroll wheel up
+ pos = ow.get_scrollpos(ow_size)
+ ow.set_scrollpos(pos - 1)
+ return True
+ elif button == 5: # scroll wheel down
+ pos = ow.get_scrollpos(ow_size)
+ ow.set_scrollpos(pos + 1)
+ return True
+
+ return False
+
+
+__all__ = ["Scrollable", "ScrollBar"]
diff --git a/bin/deg/src/util.py b/bin/deg/src/util.py
new file mode 100644
index 000000000..3e74565d8
--- /dev/null
+++ b/bin/deg/src/util.py
@@ -0,0 +1,67 @@
+# This file is part of DarkFi (https://dark.fi)
+#
+# Copyright (C) 2020-2024 Dyne.org foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import os
+import sys
+import toml
+import platform
+
+def get_os():
+ if sys.platform.startswith('java'):
+ os_name = platform.java_ver()[3][0]
+ if os_name.startswith('Windows'):
+ system = 'win32'
+ elif os_name.startswith('Mac'):
+ system = 'macOS'
+ else:
+ system = 'linux'
+ else:
+ system = sys.platform
+ return system
+
+def user_config_dir(appname, system):
+ if system == "win32":
+ path = windows_dir(appname)
+ elif system == 'macOS':
+ path = os.path.expanduser('~/Library/Preferences/')
+ path = os.path.join(path, appname)
+ else:
+ path = os.getenv('XDG_CONFIG_HOME', os.path.expanduser("~/.config"))
+ path = os.path.join(path, appname)
+ return path
+
+def windows_dir(appname):
+ appauthor = appname
+ const = "CSIDL_APPDATA"
+ path = os.path.normpath(_get_win_folder(const))
+ path = os.path.join(path, appname)
+ return path
+
+def spawn_config(path):
+ file_exists = os.path.exists(path)
+ if file_exists:
+ with open(path) as f:
+ cfg = toml.load(f)
+ return cfg
+ else:
+ with open('deg_config.toml') as f:
+ cfg = toml.load(f)
+ with open(path, 'w') as f:
+ toml.dump(cfg, f)
+ print(f"Config file created in {path}. Please review it and try again.")
+ sys.exit(0)
+
diff --git a/bin/deg/src/view.py b/bin/deg/src/view.py
new file mode 100644
index 000000000..e304ae918
--- /dev/null
+++ b/bin/deg/src/view.py
@@ -0,0 +1,273 @@
+# This file is part of DarkFi (https://dark.fi)
+#
+# Copyright (C) 2020-2024 Dyne.org foundation
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import urwid
+import logging
+import asyncio
+
+from src.scroll import ScrollBar, Scrollable
+
+class DegWidget(urwid.WidgetWrap):
+ def __init__(self, node_name, session):
+ self.node_name = node_name
+ self.session = session
+
+ def selectable(self):
+ return True
+
+ def keypress(self, size, key):
+ return key
+
+ def update(self, txt):
+ super().__init__(txt)
+ self._w = urwid.AttrWrap(self._w, None)
+ self._w.focus_attr = 'line'
+
+
+class Node(DegWidget):
+ def set_txt(self, is_empty: bool):
+ if is_empty:
+ txt = urwid.Text(f"{self.node_name} (offline)")
+ super().update(txt)
+ else:
+ txt = urwid.Text(f"{self.node_name}")
+ super().update(txt)
+
+
+class Session(DegWidget):
+ def set_txt(self):
+ txt = urwid.Text(f" {self.session}")
+ super().update(txt)
+
+
+class View():
+ palette = [
+ ('body','light gray','default', 'standout'),
+ ('line','dark cyan','default','standout'),
+ ]
+
+ def __init__(self, model):
+ self.model = model
+ info_text = urwid.Text("")
+ self.pile = urwid.Pile([info_text])
+ scroll = ScrollBar(Scrollable(self.pile))
+ rightbox = urwid.LineBox(scroll)
+ self.listbox_content = []
+ self.listwalker = urwid.SimpleListWalker(self.listbox_content)
+ self.listw = self.listwalker.contents
+ self.list = urwid.ListBox(self.listwalker)
+ leftbox = urwid.LineBox(self.list)
+ columns = urwid.Columns([leftbox, rightbox], focus_column=0)
+ self.ui = urwid.Frame(urwid.AttrWrap( columns, 'body' ))
+ self.known_nodes = []
+ self.live_nodes = []
+ self.dead_nodes = []
+ self.refresh = False
+
+ #-----------------------------------------------------------------
+ # Render deg.get_info() RPC call
+ #-----------------------------------------------------------------
+ def draw_info(self, node_name, info):
+ logging.debug(f'info {info}')
+ # info = info['result']
+ # if 'eventgraph_info' in info:
+ # logging.debug(f'info {info}')
+ node = Node(node_name, "node")
+ node.set_txt(False)
+ self.listw.append(node)
+
+ # info = info['eventgraph_info']
+
+ if 'current_genesis' in info and info['current_genesis']:
+ session = Session(node_name, "current_genesis")
+ session.set_txt()
+ self.listw.append(session)
+
+ if 'broadcasted_ids' in info and info['broadcasted_ids']:
+ session = Session(node_name, "broadcasted_ids")
+ session.set_txt()
+ self.listw.append(session)
+
+ if 'synced' in info and info['synced']:
+ session = Session(node_name, "synced")
+ session.set_txt()
+ self.listw.append(session)
+
+ if 'unreferenced_tips' in info and info['unreferenced_tips']:
+ session = Session(node_name, "unreferenced_tips")
+ session.set_txt()
+ self.listw.append(session)
+
+
+ def draw_empty(self, node_name, info):
+ node = Node(node_name, "node")
+ node.set_txt(True)
+ self.listw.append(node)
+
+ #-----------------------------------------------------------------
+ # Render deg.subscribe_events() RPC call
+ # Right hand menu only
+ #-----------------------------------------------------------------
+ def fill_right_box(self):
+ self.pile.contents.clear()
+ focus_w = self.list.get_focus()
+ if focus_w[0] is None:
+ return
+ session = focus_w[0].session
+
+ if session == "node":
+ node_name = focus_w[0].node_name
+ info = self.model.nodes.get(node_name)
+ if info['msgs']:
+ msg = info['msgs'].get(node_name)
+ for m in msg:
+ time = m[0]
+ event = m[1]
+ event_info = m[2]
+ msg = m[3]
+ self.pile.contents.append((urwid.Text(
+ f"{time}: {event} {event_info}: {msg}"),
+ self.pile.options()))
+
+ if session == "current_genesis":
+ key = "current_genesis"
+ node_name = focus_w[0].node_name
+ info = self.model.nodes.get(node_name)
+
+ if key in info:
+ ev = info.get(key)
+ self.pile.contents.append((
+ urwid.Text(f" {ev}"),
+ self.pile.options()))
+
+ if session == "broadcasted_ids":
+ key = "broadcasted_ids"
+ node_name = focus_w[0].node_name
+ info = self.model.nodes.get(node_name)
+
+ if key in info:
+ ev = list(info.get(key))
+ if info['msgs']:
+ msg = info['msgs'].get(node_name)
+ for m in msg:
+ event = m[1]
+ event_info = m[2]
+ msg = m[3]
+ if event_info == "EventPut" and event == "send" and msg not in ev:
+ ev.extend(msg)
+ self.pile.contents.append((
+ urwid.Text(f" {ev}"),
+ self.pile.options()))
+
+ if session == "synced":
+ key = "synced"
+ node_name = focus_w[0].node_name
+ info = self.model.nodes.get(node_name)
+
+ if key in info:
+ ev = info.get(key)
+ self.pile.contents.append((
+ urwid.Text(f" {ev}"),
+ self.pile.options()))
+
+ if session == "unreferenced_tips":
+ key = "unreferenced_tips"
+ node_name = focus_w[0].node_name
+ info = self.model.nodes.get(node_name)
+
+ if key in info:
+ ev = info.get(key)
+ self.pile.contents.append((
+ urwid.Text(f" {ev}"),
+ self.pile.options()))
+
+ #-----------------------------------------------------------------
+ # Sort through node info, checking whether we are already
+ # tracking this node or if the node's state has changed.
+ #-----------------------------------------------------------------
+ def sort(self, nodes):
+ for name, info in nodes:
+ if bool(info) and name not in self.live_nodes:
+ self.live_nodes.append(name)
+ if not bool(info) and name not in self.dead_nodes:
+ self.dead_nodes.append(name)
+ if bool(info) and name in self.dead_nodes:
+ logging.debug("Refresh: dead node online.")
+ self.refresh = True
+ if not bool(info) and name in self.live_nodes:
+ logging.debug("Refresh: online node offline.")
+ self.refresh = True
+
+ #-----------------------------------------------------------------
+ # Checks whether we are already displaying this node, and draw
+ # it if not.
+ #-----------------------------------------------------------------
+ async def display(self, nodes):
+ for name, info in nodes:
+ if name in self.live_nodes and name not in self.known_nodes:
+ self.draw_info(name, info)
+ if name in self.dead_nodes and name not in self.known_nodes:
+ self.draw_empty(name, info)
+ if self.refresh:
+ logging.debug("Refresh initiated.")
+ await asyncio.sleep(0.1)
+ self.known_nodes.clear()
+ self.live_nodes.clear()
+ self.dead_nodes.clear()
+ self.refresh = False
+ self.listw.clear()
+ logging.debug("Refresh complete.")
+ #-----------------------------------------------------------------
+ # Handle events.
+ #-----------------------------------------------------------------
+ def draw_events(self, nodes):
+ for name, info in nodes:
+ if bool(info) and name in self.known_nodes:
+ # info = info ['result']
+ self.fill_right_box()
+
+ # logging.debug(f"info: {info}")
+
+ if 'unreferenced_tips' in info:
+ val = info['unreferenced_tips']
+ if not bool(val) or not val == None:
+ continue
+ logging.debug(f"Refresh: unreferenced_tips online")
+ self.refresh = True
+
+ async def update_view(self, evloop: asyncio.AbstractEventLoop,
+ loop: urwid.MainLoop):
+ while True:
+ await asyncio.sleep(0.1)
+
+ nodes = self.model.nodes.items()
+ evloop.call_soon(loop.draw_screen)
+
+ # We first ensure that we are keeping track
+ # of all the displayed widgets.
+ for index, item in enumerate(self.listw):
+ # Keep track of known nodes.
+ if item.node_name not in self.known_nodes:
+ self.known_nodes.append(item.node_name)
+
+ self.sort(nodes)
+
+ await self.display(nodes)
+
+ # logging.debug(f"nodes: {nodes}")
+
+ self.draw_events(nodes)