mirror of
https://github.com/ParisNeo/lollms_hub.git
synced 2026-05-04 03:01:01 -04:00
- Refactor instance state checking in admin toggle endpoint - Add AI processing option to document ingestion pipeline - Enhance instance manager with cross-platform signal handling - Fix node registry path resolution for robust module loading - Update admin templates with CSRF protection and UI polish
378 lines
17 KiB
Python
378 lines
17 KiB
Python
import subprocess
|
|
import os
|
|
import sys
|
|
import signal
|
|
import logging
|
|
import platform
|
|
import asyncio
|
|
import socket
|
|
import httpx
|
|
import psutil
|
|
import shutil
|
|
import pipmaster as pm
|
|
from typing import Dict, Optional, List, Tuple
|
|
from pathlib import Path
|
|
from app.core.binary_manager import LLAMACPP_DIR
|
|
import pipmaster as pm
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class InstanceSupervisor:
|
|
def __init__(self):
|
|
self.processes: Dict[int, subprocess.Popen] = {}
|
|
|
|
async def get_instance_state(self, instance) -> Tuple[str, Optional[int]]:
|
|
"""
|
|
Determines the true state of an instance.
|
|
Returns: (state, pid)
|
|
States: 'RUNNING' (Managed), 'SYSTEM' (Unmanaged/Service), 'STOPPED', 'CONFLICT' (Port busy by non-ollama)
|
|
"""
|
|
# 1. Check if we are currently managing this process
|
|
managed_proc = self.processes.get(instance.id)
|
|
if managed_proc and managed_proc.poll() is None:
|
|
return "RUNNING", managed_proc.pid
|
|
|
|
# 2. Check if the port is busy physically
|
|
pid_on_port = self._get_pid_on_port(instance.port)
|
|
if pid_on_port:
|
|
# 3. Verify if it's actually an Ollama instance
|
|
# FIX: _is_ollama_responding is async, so we must await it.
|
|
# However, _get_pid_on_port is synchronous.
|
|
if await self._is_ollama_responding(instance.port):
|
|
return "SYSTEM", pid_on_port
|
|
return "CONFLICT", pid_on_port
|
|
|
|
return "STOPPED", None
|
|
|
|
def _get_pid_on_port(self, port: int) -> Optional[int]:
|
|
"""Finds the PID of the process listening on a specific port."""
|
|
try:
|
|
for conn in psutil.net_connections(kind='inet'):
|
|
if conn.laddr.port == port and conn.status == psutil.CONN_LISTEN:
|
|
return conn.pid
|
|
except (psutil.AccessDenied, psutil.NoSuchProcess):
|
|
# Fallback for systems where net_connections requires root
|
|
return None
|
|
return None
|
|
|
|
async def _is_ollama_responding(self, port: int) -> bool:
|
|
"""Probes the port to see if an Ollama API is active without blocking the loop."""
|
|
def probe():
|
|
try:
|
|
# Short timeout to prevent UI lag
|
|
with socket.create_connection(('127.0.0.1', port), timeout=0.2):
|
|
return True
|
|
except (socket.timeout, ConnectionRefusedError, OSError):
|
|
return False
|
|
|
|
from fastapi.concurrency import run_in_threadpool
|
|
return await run_in_threadpool(probe)
|
|
|
|
async def start_instance(self, instance) -> Tuple[bool, str]:
|
|
"""
|
|
Starts the managed instance and returns (success, message).
|
|
"""
|
|
state, pid = await self.get_instance_state(instance)
|
|
if state in ("RUNNING", "SYSTEM", "CONFLICT"):
|
|
return False, f"Port {instance.port} is already occupied ({state})."
|
|
|
|
env = os.environ.copy()
|
|
if instance.gpu_ids:
|
|
env["CUDA_VISIBLE_DEVICES"] = str(instance.gpu_ids)
|
|
|
|
is_win = platform.system() == "Windows"
|
|
cmd = []
|
|
|
|
if instance.backend_type == "ollama":
|
|
binary = "ollama.exe" if is_win else "ollama"
|
|
cmd = [binary, "serve"]
|
|
env["OLLAMA_HOST"] = f"127.0.0.1:{instance.port}"
|
|
if instance.model_path:
|
|
env["OLLAMA_MODELS"] = str(Path(instance.model_path).absolute())
|
|
|
|
elif instance.backend_type == "llamacpp":
|
|
# Target the internal binary from Binary Hub
|
|
binary = str(LLAMACPP_DIR / ("llama-server.exe" if is_win else "llama-server"))
|
|
if not Path(binary).exists():
|
|
logger.error(f"Llama.cpp binary not found at {binary}. Please check Binary Hub.")
|
|
return False
|
|
|
|
cmd = [
|
|
binary,
|
|
"--model", str(instance.model_path),
|
|
"--port", str(instance.port),
|
|
"--ctx-size", str(instance.ctx_size or 4096),
|
|
"--threads", str(instance.threads or 8),
|
|
"--n-gpu-layers", str(instance.n_gpu_layers or 99),
|
|
"--host", "127.0.0.1"
|
|
]
|
|
|
|
elif instance.backend_type == "vllm":
|
|
# JIT Dependency Check via PipMaster
|
|
if not self.is_vllm_installed():
|
|
logger.info(f"Instance '{instance.name}' requires vLLM. Installing...")
|
|
success, msg = await self.install_vllm()
|
|
if not success:
|
|
return False, f"Dependency Error: Failed to install vLLM automatically. {msg}"
|
|
|
|
cmd = [
|
|
sys.executable, "-m", "vllm.entrypoints.openai.api_server",
|
|
"--model", str(instance.model_path),
|
|
"--port", str(instance.port),
|
|
"--tensor-parallel-size", str(instance.tensor_parallel_size or 1),
|
|
"--host", "127.0.0.1"
|
|
]
|
|
|
|
try:
|
|
logger.info(f"Launching {instance.backend_type} instance '{instance.name}' on port {instance.port}")
|
|
# We use PIPE for stderr to capture immediate crashes
|
|
proc = subprocess.Popen(
|
|
cmd,
|
|
env=env,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if is_win else 0
|
|
)
|
|
self.processes[instance.id] = proc
|
|
|
|
# Wait for the instance to actually start listening on the port
|
|
for attempt in range(30):
|
|
await asyncio.sleep(1)
|
|
|
|
# Check for premature exit
|
|
if proc.poll() is not None:
|
|
stderr_output = proc.stderr.read()
|
|
logger.error(f"Instance '{instance.name}' crashed on startup:\n{stderr_output}")
|
|
if instance.id in self.processes:
|
|
del self.processes[instance.id]
|
|
|
|
# Distinguish common errors
|
|
if "FileNotFoundError" in stderr_output or "No such file" in stderr_output:
|
|
return False, f"Model Error: File not found at '{instance.model_path}'"
|
|
if "address already in use" in stderr_output.lower():
|
|
return False, f"Network Error: Port {instance.port} is blocked."
|
|
|
|
return False, f"Crash Log: {stderr_output[:200]}..."
|
|
|
|
if await self._is_ollama_responding(instance.port):
|
|
logger.info(f"Instance '{instance.name}' is ready on port {instance.port}")
|
|
return True, "Started successfully."
|
|
|
|
logger.warning(f"Instance '{instance.name}' slow to respond. Proceeding as background task.")
|
|
return True, "Instance started but still initializing."
|
|
|
|
except Exception as e:
|
|
logger.error(f"Supervisor Fault: {e}")
|
|
return False, f"System Error: {str(e)}"
|
|
|
|
async def stop_instance(self, instance_id: int):
|
|
proc = self.processes.get(instance_id)
|
|
if not proc:
|
|
return False
|
|
|
|
if platform.system() == "Windows":
|
|
proc.send_signal(signal.CTRL_BREAK_EVENT)
|
|
else:
|
|
proc.terminate()
|
|
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
|
|
if instance_id in self.processes:
|
|
del self.processes[instance_id]
|
|
return True
|
|
|
|
def is_ollama_installed(self) -> bool:
|
|
"""Checks if the ollama binary is reachable in the system PATH."""
|
|
binary = "ollama.exe" if platform.system() == "Windows" else "ollama"
|
|
return shutil.which(binary) is not None
|
|
|
|
def is_vllm_installed(self) -> bool:
|
|
"""Checks if vLLM is installed in the current Python environment."""
|
|
return pm.is_installed("vllm")
|
|
|
|
def is_openllm_installed(self) -> bool:
|
|
"""Checks if OpenLLM is installed in the current Python environment."""
|
|
return pm.is_installed("openllm")
|
|
|
|
async def install_openllm(self, upgrade=False) -> Tuple[bool, str]:
|
|
"""Attempts to install or update OpenLLM."""
|
|
from app.core.events import event_manager, ProxyEvent
|
|
import sys, asyncio
|
|
task_name = "Updating" if upgrade else "Installing"
|
|
req_id = "sys_openllm"
|
|
try:
|
|
event_manager.emit(ProxyEvent("received", req_id, "OpenLLM", "Local", "Admin", error_message=f"{task_name} OpenLLM..."))
|
|
cmd = f"{sys.executable} -m pip install {'--upgrade' if upgrade else ''} openllm"
|
|
process = await asyncio.create_subprocess_shell(cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
|
|
while True:
|
|
line = await process.stdout.readline()
|
|
if not line: break
|
|
event_manager.emit(ProxyEvent("active", req_id, "OpenLLM", "Local", "Admin", error_message=line.decode().strip()))
|
|
await process.wait()
|
|
if process.returncode == 0:
|
|
msg = f"OpenLLM {'updated' if upgrade else 'installed'} successfully."
|
|
event_manager.emit(ProxyEvent("completed", req_id, "OpenLLM", "Local", "Admin", error_message=msg))
|
|
return True, msg
|
|
err = "pip install failed."
|
|
event_manager.emit(ProxyEvent("error", req_id, "OpenLLM", "Local", "Admin", error_message=err))
|
|
return False, err
|
|
except Exception as e:
|
|
event_manager.emit(ProxyEvent("error", req_id, "OpenLLM", "Local", "Admin", error_message=str(e)))
|
|
return False, str(e)
|
|
|
|
async def install_vllm(self, upgrade=False) -> Tuple[bool, str]:
|
|
"""Attempts to install or update vLLM."""
|
|
from app.core.events import event_manager, ProxyEvent
|
|
import sys
|
|
import asyncio
|
|
task_name = "Updating" if upgrade else "Installing"
|
|
req_id = "sys_vllm"
|
|
|
|
try:
|
|
event_manager.emit(ProxyEvent("received", req_id, "vLLM", "Local", "Admin", error_message=f"{task_name} vLLM..."))
|
|
|
|
cmd = f"{sys.executable} -m pip install {'--upgrade' if upgrade else ''} vllm"
|
|
process = await asyncio.create_subprocess_shell(
|
|
cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT
|
|
)
|
|
|
|
while True:
|
|
line = await process.stdout.readline()
|
|
if not line: break
|
|
event_manager.emit(ProxyEvent("active", req_id, "vLLM", "Local", "Admin", error_message=line.decode().strip()))
|
|
|
|
await process.wait()
|
|
|
|
if process.returncode == 0:
|
|
msg = f"vLLM {'updated' if upgrade else 'installed'} successfully."
|
|
event_manager.emit(ProxyEvent("completed", req_id, "vLLM", "Local", "Admin", error_message=msg))
|
|
return True, msg
|
|
|
|
err = "pip install failed."
|
|
event_manager.emit(ProxyEvent("error", req_id, "vLLM", "Local", "Admin", error_message=err))
|
|
return False, err
|
|
except Exception as e:
|
|
event_manager.emit(ProxyEvent("error", req_id, "vLLM", "Local", "Admin", error_message=str(e)))
|
|
return False, str(e)
|
|
|
|
async def install_ollama(self) -> Tuple[bool, str]:
|
|
"""Attempts to install/update Ollama based on the operating system."""
|
|
from app.core.events import event_manager, ProxyEvent
|
|
sys_name = platform.system()
|
|
req_id = "sys_ollama"
|
|
|
|
try:
|
|
if sys_name == "Linux" or sys_name == "Darwin":
|
|
event_manager.emit(ProxyEvent("received", req_id, "Ollama", "Local", "Admin", error_message="Downloading Ollama installation script..."))
|
|
cmd = "curl -fsSL https://ollama.com/install.sh | sh"
|
|
|
|
process = await asyncio.create_subprocess_shell(
|
|
cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.STDOUT
|
|
)
|
|
|
|
# Stream logs to UI
|
|
while True:
|
|
line = await process.stdout.readline()
|
|
if not line: break
|
|
event_manager.emit(ProxyEvent("active", req_id, "Ollama", "Local", "Admin", error_message=line.decode().strip()))
|
|
|
|
await process.wait()
|
|
|
|
if process.returncode == 0:
|
|
msg = "Ollama processed successfully via shell script."
|
|
event_manager.emit(ProxyEvent("completed", req_id, "Ollama", "Local", "Admin", error_message=msg))
|
|
return True, msg
|
|
return False, "Installation script failed."
|
|
|
|
elif sys_name == "Windows":
|
|
event_manager.emit(ProxyEvent("received", req_id, "Ollama", "Local", "Admin", error_message="Downloading OllamaSetup.exe to Temp..."))
|
|
installer_url = "https://ollama.com/download/OllamaSetup.exe"
|
|
temp_dir = Path(os.environ.get("TEMP", "."))
|
|
target_path = temp_dir / f"OllamaSetup_{secrets.token_hex(4)}.exe"
|
|
|
|
async with httpx.AsyncClient(follow_redirects=True, timeout=300.0) as client:
|
|
async with client.stream("GET", installer_url) as resp:
|
|
resp.raise_for_status()
|
|
with open(target_path, "wb") as f:
|
|
async for chunk in resp.aiter_bytes():
|
|
f.write(chunk)
|
|
|
|
event_manager.emit(ProxyEvent("active", req_id, "Ollama", "Local", "Admin", error_message="Download finished. Launching UI..."))
|
|
# Use DETACHED_PROCESS flag on Windows to allow installer to outlive the hub
|
|
subprocess.Popen([str(target_path)], shell=True, creationflags=subprocess.CREATE_NEW_CONSOLE | subprocess.DETACHED_PROCESS)
|
|
|
|
msg = "Windows Installer UI visible. Please follow the instructions on your desktop."
|
|
event_manager.emit(ProxyEvent("completed", req_id, "Ollama", "Local", "Admin", error_message=msg))
|
|
return True, msg
|
|
|
|
return False, f"Automatic installation not supported on {sys_name}."
|
|
except Exception as e:
|
|
event_manager.emit(ProxyEvent("error", req_id, "Ollama", "Local", "Admin", error_message=str(e)))
|
|
return False, str(e)
|
|
|
|
async def discover_local_instances(self, managed_ports: List[int], start_port: int, end_port: int) -> List[dict]:
|
|
discovered = []
|
|
for port in range(start_port, end_port + 1):
|
|
if port in managed_ports: continue
|
|
|
|
# Use psutil to check process name if possible
|
|
pid = self._get_pid_on_port(port)
|
|
if pid:
|
|
try:
|
|
p = psutil.Process(pid)
|
|
p_name = p.name().lower()
|
|
# Catch Windows 'ollama app.exe' and Linux/Mac 'ollama'
|
|
if "ollama" in p_name and await self._is_ollama_responding(port):
|
|
discovered.append({"port": port, "pid": pid, "name": p.name()})
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied):
|
|
# Fallback to pure socket probe
|
|
if await self._is_ollama_responding(port):
|
|
discovered.append({"port": port, "pid": "Unknown", "name": "System Ollama"})
|
|
return discovered
|
|
|
|
async def download_hf_model(self, repo_id: str, filenames: List[str], local_dir: str, task_id: str, sender: str):
|
|
"""Background task to download one or more files from a HF repo."""
|
|
from app.core.events import event_manager, ProxyEvent
|
|
pm.ensure_packages(["huggingface_hub"], verbose=True)
|
|
from huggingface_hub import hf_hub_download
|
|
|
|
dest_path = Path(local_dir)
|
|
dest_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
event_manager.emit(ProxyEvent("active", task_id, repo_id, "HF-Hub", sender, error_message=f"Starting download of {len(filenames)} file(s)..."))
|
|
|
|
try:
|
|
for i, filename in enumerate(filenames):
|
|
event_manager.emit(ProxyEvent("active", task_id, repo_id, "HF-Hub", sender, error_message=f"Downloading [{i+1}/{len(filenames)}]: {filename}"))
|
|
|
|
# We use a simple callback to track progress if needed,
|
|
# but hf_hub_download is blocking. run_in_threadpool handles the event loop.
|
|
def _download():
|
|
return hf_hub_download(
|
|
repo_id=repo_id,
|
|
filename=filename,
|
|
local_dir=str(dest_path),
|
|
local_dir_use_symlinks=False
|
|
)
|
|
|
|
from fastapi.concurrency import run_in_threadpool
|
|
await run_in_threadpool(_download)
|
|
|
|
final_path = str((dest_path / filenames[0]).absolute())
|
|
event_manager.emit(ProxyEvent("completed", task_id, repo_id, "HF-Hub", sender, error_message=f"Success! Model saved to: {final_path}"))
|
|
return final_path
|
|
except Exception as e:
|
|
logger.error(f"HF Download Failed: {e}")
|
|
event_manager.emit(ProxyEvent("error", task_id, repo_id, "HF-Hub", sender, error_message=f"Download failed: {str(e)}"))
|
|
raise
|
|
|
|
supervisor = InstanceSupervisor()
|