Files
darkfi/bin/app/gui/api.py
2025-01-21 09:31:52 +01:00

160 lines
5.4 KiB
Python

from collections import namedtuple
from pydrk import Api, HostApi, PropertyType, PropertySubType, Property, serial
import zmq
api = Api()
host = HostApi(api)
print("Node status:", api.hello())
def make_sub_socket():
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.SUBSCRIBE, b'')
socket.connect("tcp://localhost:9485")
return socket
def rename_node(node, name):
node_id = lookup_node(node)
api.rename_node(node_id, name)
def remove_all_slots(node_path, sig):
node_id = api.lookup_node_id(node_path)
for slot_id, slot in api.get_slots(node_id, sig):
print(f"{node_path}:{sig}(): Unregistering slot '{slot}':{slot_id}")
api.unregister_slot(node_id, sig, slot_id)
def register_slot(node_path, sig, tag):
#remove_all_slots(node_path, sig)
node_id = api.lookup_node_id(node_path)
api.register_slot(node_id, sig, "", tag)
def get_property(node_id, prop):
node_id = lookup_node(node_id)
return api.get_property_value(node_id, prop)
def set_property(node_id, prop, val):
node_id = lookup_node(node_id)
match val:
case float():
api.set_property_f32(node_id, prop, val)
case int():
api.set_property_u32(node_id, prop, val)
def set_property_bool(node_id, prop, val):
node_id = lookup_node(node_id)
api.set_property_bool(node_id, prop, val)
def set_property_f32(node_id, prop, val):
node_id = lookup_node(node_id)
api.set_property_f32(node_id, prop, float(val))
def set_property_u32(node_id, prop, val):
node_id = lookup_node(node_id)
api.set_property_u32(node_id, prop, int(val))
def add_property_bool(node_id, prop, val=None):
api.add_property(node_id, prop, PropertyType.BOOL)
if val is not None:
api.set_property_bool(node_id, prop, val)
def add_property_f32(node_id, prop, val=None):
api.add_property(node_id, prop, PropertyType.FLOAT32)
if val is not None:
api.set_property_f32(node_id, prop, val)
def add_property_u32(node_id, prop, val=None):
api.add_property(node_id, prop, PropertyType.UINT32)
if val is not None:
api.set_property_u32(node_id, prop, val)
def lookup_node(node_id):
if isinstance(node_id, str):
node_id = api.lookup_node_id(node_id)
return node_id
def link_node(child_id, parent_id):
child_id = lookup_node(child_id)
parent_id = lookup_node(parent_id)
api.link_node(child_id, parent_id)
def unlink_node(child_id, parent_id):
child_id = lookup_node(child_id)
parent_id = lookup_node(parent_id)
api.unlink_node(child_id, parent_id)
def unlink_from_parents(node_id):
node_id = lookup_node(node_id)
for (_, parent_id, _) in api.get_parents(node_id):
api.unlink_node(node_id, parent_id)
def remove_node_recursive(node_id):
node_id = lookup_node(node_id)
for (_, child_id, _) in api.get_children(node_id):
# Unlink the child
api.unlink_node(child_id, node_id)
# Remove the node
remove_node_recursive(child_id)
# Garbage collection
if not api.get_parents(node_id):
api.remove_node(node_id)
def garbage_collect():
dangling = api.scan_dangling()
for node_id in dangling:
remove_node_recursive(node_id)
print(f"Garbage collect: removed {len(dangling)} nodes")
KeyMods = namedtuple("KeyMods", ["shift", "ctrl", "alt", "logo"])
class EventLoop:
def __init__(self):
self.subsock = make_sub_socket()
#register_slot("/window", "resize", b"rs")
#register_slot("/window/input/mouse", "button_down", b"ck")
#register_slot("/window/input/mouse", "wheel", b"wh")
#register_slot("/window/input/mouse", "move", b"mm")
register_slot("/window/input/keyboard", "key_down", b"kd")
def run(self):
while True:
signal_data, user_data = self.subsock.recv_multipart()
cur = serial.Cursor(signal_data)
match user_data:
#case b"rs":
# w = get_property("/window", "width")
# h = get_property("/window", "height")
# self.resize_event(w, h)
#case b"ck":
# x = get_property("/window/input/mouse", "click_x")
# y = get_property("/window/input/mouse", "click_y")
# self.mouse_click(x, y)
#case b"wh":
# y = get_property("/window/input/mouse", "wheel_y")
# self.mouse_wheel(y)
#case b"mm":
# pass
case b"kd":
shift = bool(serial.read_u8(cur))
ctrl = bool(serial.read_u8(cur))
alt = bool(serial.read_u8(cur))
logo = bool(serial.read_u8(cur))
repeat = bool(serial.read_u8(cur))
keycode = serial.decode_str(cur)
keymods = KeyMods(shift, ctrl, alt, logo)
# Sometimes these get stuck when exiting the window.
# We don't need these anyway
if keycode in ("LeftShift", "LeftSuper"):
continue
self.key_down(keycode, keymods, repeat)
def resize_event(self, w, h):
pass
def mouse_click(self, x, y):
pass
def mouse_wheel(self, y):
pass
def key_down(self, keycode, keymods, repeat):
pass