app: add forgotten vid submodule

This commit is contained in:
darkfi
2025-12-26 02:22:26 -03:00
parent a6d8104b97
commit 2129017031
2 changed files with 577 additions and 0 deletions

327
bin/app/src/ui/vid/mod.rs Normal file
View File

@@ -0,0 +1,327 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use async_trait::async_trait;
use parking_lot::Mutex as SyncMutex;
use rand::{rngs::OsRng, Rng};
use std::sync::{mpsc, Arc};
use tracing::instrument;
use crate::{
gfx::{
anim::Frame, gfxtag, DrawCall, DrawInstruction, DrawMesh, ManagedSeqAnimPtr,
ManagedTexturePtr, Rectangle, RenderApi,
},
mesh::{MeshBuilder, MeshInfo, COLOR_WHITE},
prop::{BatchGuardPtr, PropertyAtomicGuard, PropertyRect, PropertyStr, PropertyUint32, Role},
scene::{Pimpl, SceneNodeWeak},
ExecutorPtr,
};
use super::{DrawUpdate, OnModify, UIObject};
mod ivf;
mod threads;
use threads::{spawn_decoder_thread, spawn_loader_demuxer_thread};
macro_rules! t { ($($arg:tt)*) => { trace!(target: "ui:video", $($arg)*); } }
pub type VideoPtr = Arc<Video>;
#[derive(Clone)]
pub struct Av1VideoData {
textures: Vec<Option<ManagedTexturePtr>>,
anim: ManagedSeqAnimPtr,
textures_pub: async_broadcast::Sender<(usize, ManagedTexturePtr)>,
textures_sub: async_broadcast::Receiver<(usize, ManagedTexturePtr)>,
}
impl Av1VideoData {
fn new(len: usize, render_api: &RenderApi) -> Self {
let (textures_pub, textures_sub) = async_broadcast::broadcast(len);
let anim = render_api.new_anim(len, false, gfxtag!("video"));
Self { textures: vec![None; len], anim, textures_pub, textures_sub }
}
}
pub struct Video {
node: SceneNodeWeak,
render_api: RenderApi,
tasks: SyncMutex<Vec<smol::Task<()>>>,
load_tasks: SyncMutex<Vec<smol::Task<()>>>,
ex: ExecutorPtr,
dc_key: u64,
vid_data: Arc<SyncMutex<Option<Av1VideoData>>>,
_load_handle: SyncMutex<Option<std::thread::JoinHandle<()>>>,
_decoder_handle: SyncMutex<Option<std::thread::JoinHandle<()>>>,
rect: PropertyRect,
uv: PropertyRect,
z_index: PropertyUint32,
priority: PropertyUint32,
path: PropertyStr,
parent_rect: SyncMutex<Option<Rectangle>>,
}
impl Video {
pub async fn new(node: SceneNodeWeak, render_api: RenderApi, ex: ExecutorPtr) -> Pimpl {
t!("Video::new()");
let node_ref = &node.upgrade().unwrap();
let rect = PropertyRect::wrap(node_ref, Role::Internal, "rect").unwrap();
let uv = PropertyRect::wrap(node_ref, Role::Internal, "uv").unwrap();
let z_index = PropertyUint32::wrap(node_ref, Role::Internal, "z_index", 0).unwrap();
let priority = PropertyUint32::wrap(node_ref, Role::Internal, "priority", 0).unwrap();
let path = PropertyStr::wrap(node_ref, Role::Internal, "path", 0).unwrap();
let self_ = Arc::new(Self {
node,
render_api,
tasks: SyncMutex::new(vec![]),
load_tasks: SyncMutex::new(vec![]),
ex,
dc_key: OsRng.gen(),
vid_data: Arc::new(SyncMutex::new(None)),
_load_handle: SyncMutex::new(None),
_decoder_handle: SyncMutex::new(None),
rect,
uv,
z_index,
priority,
path,
parent_rect: SyncMutex::new(None),
});
Pimpl::Video(self_)
}
async fn reload(self: Arc<Self>, batch: BatchGuardPtr) {
self.load_video();
self.redraw(batch).await;
}
fn load_video(&self) {
let path = self.path.get();
// Thread 1 -> thread 2 channel: raw AV1 encoded frames
let (frame_tx, frame_rx) = mpsc::channel();
// Thread 1 (loader + demuxer):
// loads chunks, demuxes IVF -> AV1 frames, initializes vid_data
let loader_handle = spawn_loader_demuxer_thread(
path,
frame_tx,
self.vid_data.clone(),
self.render_api.clone(),
);
// Thread 2 (decoder):
// blocks on frame_rx, decodes AV1 -> RGB, creates textures directly
let decoder_handle =
spawn_decoder_thread(frame_rx, self.vid_data.clone(), self.render_api.clone());
*self._load_handle.lock() = Some(loader_handle);
*self._decoder_handle.lock() = Some(decoder_handle);
}
#[instrument(target = "ui::video")]
async fn redraw(self: Arc<Self>, batch: BatchGuardPtr) {
let Some(parent_rect) = self.parent_rect.lock().clone() else { return };
let atom = &mut batch.spawn();
let Some(draw_update) = self.get_draw_calls(atom, parent_rect).await else {
error!(target: "ui:video", "Video failed to draw");
return
};
self.render_api.replace_draw_calls(batch.id, draw_update.draw_calls);
}
fn regen_mesh(&self) -> MeshInfo {
let rect = self.rect.get();
let uv = self.uv.get();
let mesh_rect = Rectangle::from([0., 0., rect.w, rect.h]);
let mut mesh = MeshBuilder::new(gfxtag!("img"));
mesh.draw_box(&mesh_rect, COLOR_WHITE, &uv);
mesh.alloc(&self.render_api)
}
async fn get_draw_calls(
&self,
atom: &mut PropertyAtomicGuard,
parent_rect: Rectangle,
) -> Option<DrawUpdate> {
self.rect.eval(atom, &parent_rect).ok()?;
let rect = self.rect.get();
self.uv.eval(atom, &rect).ok()?;
let mesh = self.regen_mesh();
let (vid_data, tsubs) = {
let vid_data_lock = self.vid_data.lock();
let Some(vid_data) = vid_data_lock.as_ref() else {
// Video not loaded yet, skip draw
return None;
};
let tsubs = vec![vid_data.textures_sub.clone(); vid_data.textures.len()];
// Clone the data before the lock is released
let vid_data_clone = Av1VideoData {
textures: vid_data.textures.clone(),
anim: vid_data.anim.clone(),
textures_pub: vid_data.textures_pub.clone(),
textures_sub: vid_data.textures_sub.clone(),
};
(vid_data_clone, tsubs)
};
assert_eq!(vid_data.textures.len(), tsubs.len());
let mut load_tasks = self.load_tasks.lock();
load_tasks.clear();
let mut loaded_n_frames = 0;
let total_frames = vid_data.textures.len();
for (texture_idx, (mut texture, mut tsub)) in
vid_data.textures.into_iter().zip(tsubs.into_iter()).enumerate()
{
let vertex_buffer = mesh.vertex_buffer.clone();
let index_buffer = mesh.index_buffer.clone();
let Some(texture) = texture.take() else {
let anim = vid_data.anim.clone();
let task = self.ex.spawn(async move {
while let Ok((frame_idx, texture)) = tsub.recv().await {
if frame_idx != texture_idx {
continue
}
let mesh = DrawMesh {
vertex_buffer,
index_buffer,
texture: Some(texture),
num_elements: mesh.num_elements,
};
let dc = DrawCall {
instrs: vec![DrawInstruction::Draw(mesh)],
dcs: vec![],
z_index: 0,
debug_str: "video",
};
anim.update(frame_idx, Frame::new(40, dc));
break
}
});
load_tasks.push(task);
continue
};
let mesh = DrawMesh {
vertex_buffer,
index_buffer,
texture: Some(texture),
num_elements: mesh.num_elements,
};
let dc = DrawCall {
instrs: vec![DrawInstruction::Draw(mesh)],
dcs: vec![],
z_index: 0,
debug_str: "video",
};
vid_data.anim.update(texture_idx, Frame::new(40, dc));
loaded_n_frames += 1;
}
debug!(target: "ui::video", "Loaded {loaded_n_frames} / {total_frames} frames");
Some(DrawUpdate {
key: self.dc_key,
draw_calls: vec![(
self.dc_key,
DrawCall::new(
vec![
DrawInstruction::Move(rect.pos()),
DrawInstruction::Animation(vid_data.anim.id),
],
vec![],
self.z_index.get(),
"vid",
),
)],
})
}
}
#[async_trait]
impl UIObject for Video {
fn priority(&self) -> u32 {
self.priority.get()
}
fn init(&self) {
self.load_video();
}
async fn start(self: Arc<Self>, ex: ExecutorPtr) {
let me = Arc::downgrade(&self);
let mut on_modify = OnModify::new(ex, self.node.clone(), me.clone());
on_modify.when_change(self.rect.prop(), Self::redraw);
on_modify.when_change(self.uv.prop(), Self::redraw);
on_modify.when_change(self.z_index.prop(), Self::redraw);
on_modify.when_change(self.path.prop(), Self::reload);
*self.tasks.lock() = on_modify.tasks;
}
fn stop(&self) {
self.tasks.lock().clear();
*self.parent_rect.lock() = None;
*self.vid_data.lock() = None;
// Threads terminate naturally when channels close
}
#[instrument(target = "ui::video")]
async fn draw(
&self,
parent_rect: Rectangle,
atom: &mut PropertyAtomicGuard,
) -> Option<DrawUpdate> {
*self.parent_rect.lock() = Some(parent_rect);
self.get_draw_calls(atom, parent_rect).await
}
}
impl Drop for Video {
fn drop(&mut self) {
let atom = self.render_api.make_guard(gfxtag!("Video::drop"));
self.render_api.replace_draw_calls(atom.batch_id, vec![(self.dc_key, Default::default())]);
}
}
impl std::fmt::Debug for Video {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self.node.upgrade().unwrap())
}
}

View File

@@ -0,0 +1,250 @@
/* This file is part of DarkFi (https://dark.fi)
*
* Copyright (C) 2020-2025 Dyne.org foundation
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use miniquad::TextureFormat;
use parking_lot::Mutex as SyncMutex;
use rav1d::{
Decoder as Rav1dDecoder, Picture as Rav1dPicture, PlanarImageComponent, Rav1dError,
Settings as Rav1dSettings,
};
use std::sync::{
mpsc::{Receiver, Sender},
Arc,
};
use crate::{
gfx::{gfxtag, RenderApi},
util::spawn_thread,
};
use super::{ivf::IvfStreamingDemuxer, Av1VideoData};
macro_rules! d { ($($arg:tt)*) => { debug!(target: "ui:video", $($arg)*); } }
macro_rules! w { ($($arg:tt)*) => { warn!(target: "ui:video", $($arg)*); } }
/// Spawn the loader-demuxer thread (Thread 1 of 2)
///
/// This thread loads video file chunks sequentially and demuxes AV1 frames.
///
/// # Thread Coordination
/// - Loads chunks from disk using format `path.{000, 001, 002, ...}` where `{frame}` is replaced
/// - Demuxes IVF container to extract raw AV1 frames
/// - Initializes vid_data with header info from first chunk
/// - Sends frames to decoder thread via `frame_tx` channel
/// - Signals completion by dropping `frame_tx`
///
/// # Arguments
/// * `path` - Base path with `{frame}` placeholder, e.g. `"assets/video.ivf.{frame}"`
/// * `frame_tx` - Channel sender for raw AV1 frames to decoder thread
/// * `vid_data` - Shared video data storage to initialize
/// * `render_api` - Render API for creating animation
///
/// # Returns
/// JoinHandle for the spawned thread
pub fn spawn_loader_demuxer_thread(
path: String,
frame_tx: Sender<Vec<u8>>,
vid_data: Arc<SyncMutex<Option<Av1VideoData>>>,
render_api: RenderApi,
) -> std::thread::JoinHandle<()> {
spawn_thread("video-loader-demuxer", move || {
let mut chunk_idx: usize = 0;
let mut demuxer: Option<IvfStreamingDemuxer> = None;
loop {
// Replace {frame} placeholder with zero-padded chunk number
let chunk_path = path.replace("{frame}", &format!("{chunk_idx:03}"));
d!("Loading video chunk: {chunk_path}");
// Load chunk asynchronously via miniquad callback
let data = Arc::new(SyncMutex::new(None));
let data2 = data.clone();
miniquad::fs::load_file(&chunk_path, {
let chunk_path = chunk_path.clone();
move |res| match res {
Ok(chunk) => *data2.lock() = Some(chunk),
Err(err) => {
error!("Failed to load chunk {chunk_path}: {err}");
}
}
});
let data = std::mem::take(&mut *data.lock());
// Empty data means file not found - end of chunk sequence
let Some(data) = data else {
// Close channel to signal decoder thread
drop(frame_tx);
d!("Video demuxer finished");
return
};
if let Some(demuxer) = demuxer.as_mut() {
demuxer.feed_data(data);
} else {
// First chunk: initialize demuxer from IVF header
let dem = IvfStreamingDemuxer::from_first_chunk(data).unwrap();
let num_frames = dem.header.num_frames as usize;
demuxer = Some(dem);
// Initialize vid_data with header info
*vid_data.lock() = Some(Av1VideoData::new(num_frames, &render_api));
}
let demuxer = demuxer.as_mut().unwrap();
// Extract all complete frames from this chunk
while let Some(frame) = demuxer.try_read_frame() {
frame_tx.send(frame).unwrap();
d!("Sent video chunk {chunk_idx}");
}
chunk_idx += 1;
}
})
}
/// Spawn the decoder thread (Thread 2 of 2)
///
/// This thread decodes AV1 frames and creates GPU textures directly.
///
/// # Thread Coordination
/// - Receives raw AV1 frames from loader-demuxer thread via `frame_rx` channel
/// - Uses optimistic decoding strategy:
/// 1. Drain all pending frames with `try_recv()`
/// 2. When queue is empty, block on `recv()` for next frame
/// 3. Repeat - this minimizes latency by never waiting when data is available
/// - Creates textures directly via render_api and stores in vid_data
/// - Triggers draw updates when frames are ready
/// - On channel close, flushes decoder
///
/// # Arguments
/// * `frame_rx` - Channel receiver for raw AV1 frames from loader-demuxer thread
/// * `vid_data` - Shared video data storage to update with textures
/// * `render_api` - Render API for creating textures
/// * `dc_key` - Draw call key for triggering updates
///
/// # Returns
/// JoinHandle for the spawned thread
pub fn spawn_decoder_thread(
frame_rx: Receiver<Vec<u8>>,
vid_data: Arc<SyncMutex<Option<Av1VideoData>>>,
render_api: RenderApi,
) -> std::thread::JoinHandle<()> {
spawn_thread("video-decoder", move || {
let mut settings = Rav1dSettings::new();
// 0 is auto detect
settings.set_n_threads(4);
// 0 is auto
settings.set_max_frame_delay(0);
let mut decoder = Rav1dDecoder::with_settings(&settings).unwrap();
//let mut decoder = Rav1dDecoder::new().unwrap();
let mut frame_idx = 0;
loop {
// Blocking receive - returns Err when channel closes
let Ok(av1_frame) = frame_rx.recv() else {
// Channel closed - drain decoder (like dav1dplay)
while let Ok(pic) = decoder.get_picture() {
process(&mut frame_idx, &pic, &vid_data, &render_api);
}
assert_eq!(frame_idx, vid_data.lock().as_ref().unwrap().textures.len());
return;
};
let mut try_again = match decoder.send_data(av1_frame, None, None, None) {
Ok(()) => false,
Err(Rav1dError::TryAgain) => true,
Err(_) => continue,
};
// Try to get decoded pictures
loop {
match decoder.get_picture() {
Ok(pic) => process(&mut frame_idx, &pic, &vid_data, &render_api),
Err(Rav1dError::TryAgain) => {
try_again = true;
break
}
Err(_) => break,
}
}
// If we have pending data, retry sending it
if try_again {
while let Err(Rav1dError::TryAgain) = decoder.send_pending_data() {
let Ok(pic) = decoder.get_picture() else { continue };
process(&mut frame_idx, &pic, &vid_data, &render_api);
}
}
}
d!("Video decode finished, total frames: {frame_idx}");
})
}
fn process(
frame_idx: &mut usize,
pic: &Rav1dPicture,
vid_data: &SyncMutex<Option<Av1VideoData>>,
render_api: &RenderApi,
) {
// rav1d stores data as planar GBR (Y=G, U=B, V=R)
let g_plane = pic.plane(PlanarImageComponent::Y);
let b_plane = pic.plane(PlanarImageComponent::U);
let r_plane = pic.plane(PlanarImageComponent::V);
let g_stride = pic.stride(PlanarImageComponent::Y) as usize;
let b_stride = pic.stride(PlanarImageComponent::U) as usize;
let r_stride = pic.stride(PlanarImageComponent::V) as usize;
let width = pic.width() as usize;
let height = pic.height() as usize;
let mut buf = Vec::with_capacity(width * height * 3);
// Pack planar RGB into RGB format
for y in 0..height {
for x in 0..width {
let g_idx = y * g_stride + x;
let b_idx = y * b_stride + x;
let r_idx = y * r_stride + x;
buf.push(r_plane[r_idx]);
buf.push(g_plane[g_idx]);
buf.push(b_plane[b_idx]);
}
}
// Create texture with RGB data
let tex = render_api.new_texture(
width as u16,
height as u16,
buf,
TextureFormat::RGB8,
gfxtag!("video"),
);
{
// Store in vid_data
let mut vd_guard = vid_data.lock();
let vd = vd_guard.as_mut().unwrap();
vd.textures[*frame_idx] = Some(tex.clone());
let _ = vd.textures_pub.try_broadcast((*frame_idx, tex));
}
//d!("Loaded video frame {frame_idx}");
*frame_idx += 1;
}