mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-01-10 15:17:57 -05:00
160 lines
5.4 KiB
Python
160 lines
5.4 KiB
Python
from collections import namedtuple
|
|
from pydrk import Api, HostApi, PropertyType, PropertySubType, Property, serial
|
|
import zmq
|
|
|
|
api = Api()
|
|
host = HostApi(api)
|
|
print("Node status:", api.hello())
|
|
|
|
def make_sub_socket():
|
|
context = zmq.Context()
|
|
socket = context.socket(zmq.SUB)
|
|
socket.setsockopt(zmq.SUBSCRIBE, b'')
|
|
socket.connect("tcp://localhost:9485")
|
|
return socket
|
|
|
|
def rename_node(node, name):
|
|
node_id = lookup_node(node)
|
|
api.rename_node(node_id, name)
|
|
|
|
def remove_all_slots(node_path, sig):
|
|
node_id = api.lookup_node_id(node_path)
|
|
for slot_id, slot in api.get_slots(node_id, sig):
|
|
print(f"{node_path}:{sig}(): Unregistering slot '{slot}':{slot_id}")
|
|
api.unregister_slot(node_id, sig, slot_id)
|
|
|
|
def register_slot(node_path, sig, tag):
|
|
#remove_all_slots(node_path, sig)
|
|
node_id = api.lookup_node_id(node_path)
|
|
api.register_slot(node_id, sig, "", tag)
|
|
|
|
def get_property(node_id, prop):
|
|
node_id = lookup_node(node_id)
|
|
return api.get_property_value(node_id, prop)
|
|
|
|
def set_property(node_id, prop, val):
|
|
node_id = lookup_node(node_id)
|
|
match val:
|
|
case float():
|
|
api.set_property_f32(node_id, prop, val)
|
|
case int():
|
|
api.set_property_u32(node_id, prop, val)
|
|
def set_property_bool(node_id, prop, val):
|
|
node_id = lookup_node(node_id)
|
|
api.set_property_bool(node_id, prop, val)
|
|
def set_property_f32(node_id, prop, val):
|
|
node_id = lookup_node(node_id)
|
|
api.set_property_f32(node_id, prop, float(val))
|
|
def set_property_u32(node_id, prop, val):
|
|
node_id = lookup_node(node_id)
|
|
api.set_property_u32(node_id, prop, int(val))
|
|
|
|
def add_property_bool(node_id, prop, val=None):
|
|
api.add_property(node_id, prop, PropertyType.BOOL)
|
|
if val is not None:
|
|
api.set_property_bool(node_id, prop, val)
|
|
def add_property_f32(node_id, prop, val=None):
|
|
api.add_property(node_id, prop, PropertyType.FLOAT32)
|
|
if val is not None:
|
|
api.set_property_f32(node_id, prop, val)
|
|
def add_property_u32(node_id, prop, val=None):
|
|
api.add_property(node_id, prop, PropertyType.UINT32)
|
|
if val is not None:
|
|
api.set_property_u32(node_id, prop, val)
|
|
|
|
def lookup_node(node_id):
|
|
if isinstance(node_id, str):
|
|
node_id = api.lookup_node_id(node_id)
|
|
return node_id
|
|
|
|
def link_node(child_id, parent_id):
|
|
child_id = lookup_node(child_id)
|
|
parent_id = lookup_node(parent_id)
|
|
api.link_node(child_id, parent_id)
|
|
def unlink_node(child_id, parent_id):
|
|
child_id = lookup_node(child_id)
|
|
parent_id = lookup_node(parent_id)
|
|
api.unlink_node(child_id, parent_id)
|
|
|
|
def unlink_from_parents(node_id):
|
|
node_id = lookup_node(node_id)
|
|
for (_, parent_id, _) in api.get_parents(node_id):
|
|
api.unlink_node(node_id, parent_id)
|
|
|
|
def remove_node_recursive(node_id):
|
|
node_id = lookup_node(node_id)
|
|
|
|
for (_, child_id, _) in api.get_children(node_id):
|
|
# Unlink the child
|
|
api.unlink_node(child_id, node_id)
|
|
# Remove the node
|
|
remove_node_recursive(child_id)
|
|
|
|
# Garbage collection
|
|
if not api.get_parents(node_id):
|
|
api.remove_node(node_id)
|
|
|
|
def garbage_collect():
|
|
dangling = api.scan_dangling()
|
|
for node_id in dangling:
|
|
remove_node_recursive(node_id)
|
|
print(f"Garbage collect: removed {len(dangling)} nodes")
|
|
|
|
KeyMods = namedtuple("KeyMods", ["shift", "ctrl", "alt", "logo"])
|
|
|
|
class EventLoop:
|
|
|
|
def __init__(self):
|
|
self.subsock = make_sub_socket()
|
|
#register_slot("/window", "resize", b"rs")
|
|
#register_slot("/window/input/mouse", "button_down", b"ck")
|
|
#register_slot("/window/input/mouse", "wheel", b"wh")
|
|
#register_slot("/window/input/mouse", "move", b"mm")
|
|
register_slot("/window/input/keyboard", "key_down", b"kd")
|
|
|
|
def run(self):
|
|
while True:
|
|
signal_data, user_data = self.subsock.recv_multipart()
|
|
cur = serial.Cursor(signal_data)
|
|
match user_data:
|
|
#case b"rs":
|
|
# w = get_property("/window", "width")
|
|
# h = get_property("/window", "height")
|
|
# self.resize_event(w, h)
|
|
#case b"ck":
|
|
# x = get_property("/window/input/mouse", "click_x")
|
|
# y = get_property("/window/input/mouse", "click_y")
|
|
# self.mouse_click(x, y)
|
|
#case b"wh":
|
|
# y = get_property("/window/input/mouse", "wheel_y")
|
|
# self.mouse_wheel(y)
|
|
#case b"mm":
|
|
# pass
|
|
case b"kd":
|
|
shift = bool(serial.read_u8(cur))
|
|
ctrl = bool(serial.read_u8(cur))
|
|
alt = bool(serial.read_u8(cur))
|
|
logo = bool(serial.read_u8(cur))
|
|
repeat = bool(serial.read_u8(cur))
|
|
keycode = serial.decode_str(cur)
|
|
|
|
keymods = KeyMods(shift, ctrl, alt, logo)
|
|
# Sometimes these get stuck when exiting the window.
|
|
# We don't need these anyway
|
|
if keycode in ("LeftShift", "LeftSuper"):
|
|
continue
|
|
self.key_down(keycode, keymods, repeat)
|
|
|
|
def resize_event(self, w, h):
|
|
pass
|
|
|
|
def mouse_click(self, x, y):
|
|
pass
|
|
|
|
def mouse_wheel(self, y):
|
|
pass
|
|
|
|
def key_down(self, keycode, keymods, repeat):
|
|
pass
|
|
|