mirror of
https://github.com/ParisNeo/lollms_hub.git
synced 2026-05-04 03:01:01 -04:00
This commit updates several core components related to administration, proxy handling, bot management, and application startup sequence. **Changes include:** * **`app/api/v1/routes/admin.py`**: Added handling for Redis password updates in the admin settings endpoint. * **`app/api/v1/routes/proxy.py`**: Updated the shared vectorizer function to cache model description embeddings for performance. * **`app/core/bot_manager.py`**: Refined the logic for generating unique request IDs for bots. * **`app/main.py`**: Adjusted the application lifespan hook to ensure necessary directories are created during startup.
490 lines
24 KiB
Python
490 lines
24 KiB
Python
import asyncio
|
|
import logging
|
|
import json
|
|
import secrets
|
|
from typing import Dict, Any, List, Optional
|
|
from sqlalchemy import select
|
|
from app.database.session import AsyncSessionLocal
|
|
from app.database.models import BotConfig
|
|
from app.core.encryption import decrypt_data
|
|
import pipmaster as pm
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class BotManager:
|
|
def __init__(self, app):
|
|
self.app = app
|
|
self.active_tasks: Dict[int, asyncio.Task] = {}
|
|
self._shutdown = False
|
|
self.chat_histories: Dict[str, List[Dict[str, str]]] = {}
|
|
|
|
async def start_all_active_bots(self):
|
|
"""Initial check and start of bots marked as active."""
|
|
# REPAIR MISSION: Use a more robust settings lookup to prevent race conditions
|
|
# during singleton initialization.
|
|
settings = getattr(self.app.state, 'settings', None)
|
|
if not settings:
|
|
from app.main import shared_state
|
|
settings = shared_state.settings
|
|
|
|
if not settings or not settings.enable_bot_mode:
|
|
logger.debug("Bot Manager: enable_bot_mode is disabled. Skipping startup.")
|
|
return
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
res = await db.execute(select(BotConfig).filter(BotConfig.is_active == True))
|
|
configs = res.scalars().all()
|
|
for cfg in configs:
|
|
await self.start_bot(cfg)
|
|
|
|
async def start_bot(self, config: BotConfig):
|
|
if config.id in self.active_tasks:
|
|
return
|
|
|
|
token = decrypt_data(config.encrypted_token)
|
|
if not token: return
|
|
|
|
if config.platform == 'telegram':
|
|
self.active_tasks[config.id] = asyncio.create_task(self._run_telegram_bot(config, token))
|
|
elif config.platform == 'discord':
|
|
self.active_tasks[config.id] = asyncio.create_task(self._run_discord_bot(config, token))
|
|
elif config.platform == 'slack':
|
|
self.active_tasks[config.id] = asyncio.create_task(self._run_slack_bot(config, token))
|
|
elif config.platform == 'whatsapp':
|
|
logger.warning("WhatsApp requires a public webhook. Configure your Meta App to point to /api/bot/whatsapp/webhook")
|
|
|
|
async def stop_bot(self, bot_id: int):
|
|
task = self.active_tasks.pop(bot_id, None)
|
|
if task:
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def _process_bot_request(self, user_text, username, platform_name, target_workflow, attachments=None, notify_cb=None):
|
|
"""Shared logic to route bot messages through the workflow engine."""
|
|
import json, secrets, re, copy
|
|
import base64
|
|
from app.api.v1.routes.proxy import _resolve_target, _reverse_proxy
|
|
from app.core.memory_manager import CognitiveMemoryManager
|
|
from app.crud import user_crud
|
|
from app.core import knowledge_importer as kit
|
|
|
|
async with AsyncSessionLocal() as db:
|
|
req_id = f"bot_{platform_name}_{secrets.token_hex(4)}"
|
|
|
|
# SECURITY FIX: Create a localized MockRequest instance for this specific message
|
|
# to prevent state leakage (source_platform, depth) between concurrent bot threads.
|
|
local_request = self.app.state.dummy_request.__class__()
|
|
local_request.app = self.app
|
|
local_request.state.source_platform = platform_name
|
|
|
|
# Bridge the engine's stream_callback to the platform's notify_cb
|
|
if notify_cb:
|
|
async def bridge_cb(text):
|
|
# Strip XML tags used for Web UI and pass raw text to bot notification
|
|
clean_text = re.sub(r'<[^>]*>', '', text).strip()
|
|
if clean_text:
|
|
# Prepend prefix to distinguish from final answer
|
|
await notify_cb(f"_{clean_text}_")
|
|
local_request.state.stream_callback = bridge_cb
|
|
|
|
# --- CROSS-PLATFORM IDENTITY RESOLUTION ---
|
|
# Try to find a Hub user that matches the bot handle
|
|
hub_user = await user_crud.get_user_by_username(db, username)
|
|
if hub_user:
|
|
# Use the permanent database ID to sync memory across Web/API/Bots
|
|
user_identifier = str(hub_user.id)
|
|
logger.info(f"Bot Handshake: Resolved platform user '{username}' to Hub ID {user_identifier}")
|
|
else:
|
|
# Fallback to prefixed name if the user doesn't have a Hub account
|
|
user_identifier = f"{platform_name}_{username}"
|
|
|
|
# 1. Update Short-Term Rolling Memory
|
|
if user_identifier not in self.chat_histories:
|
|
self.chat_histories[user_identifier] =[]
|
|
|
|
# Format message content (handle potential multimodal data)
|
|
if attachments:
|
|
content =[]
|
|
if user_text:
|
|
content.append({"type": "text", "text": user_text})
|
|
|
|
doc_files =[]
|
|
class MockFile:
|
|
def __init__(self, name, data, ct):
|
|
self.filename = name
|
|
self._data = data
|
|
self.content_type = ct
|
|
async def read(self): return self._data
|
|
|
|
for att in attachments:
|
|
if att['type'] == 'image':
|
|
b64 = base64.b64encode(att['data']).decode('utf-8')
|
|
mime = att.get('mime', 'image/jpeg')
|
|
content.append({"type": "image_url", "image_url": {"url": f"data:{mime};base64,{b64}"}})
|
|
elif att['type'] == 'doc':
|
|
doc_files.append(MockFile(att['filename'], att['data'], att.get('mime', 'application/octet-stream')))
|
|
|
|
if doc_files:
|
|
try:
|
|
extracted = await kit.extract_local_file_content(doc_files)
|
|
if extracted:
|
|
content.insert(0, {"type": "text", "text": f"Attached Documents:\n{extracted}\n\n"})
|
|
except Exception as e:
|
|
logger.error(f"Doc extraction failed: {e}")
|
|
|
|
if not content:
|
|
content = user_text
|
|
else:
|
|
content = user_text
|
|
|
|
self.chat_histories[user_identifier].append({"role": "user", "content": content})
|
|
if len(self.chat_histories[user_identifier]) > 10:
|
|
self.chat_histories[user_identifier] = self.chat_histories[user_identifier][-10:]
|
|
|
|
# Memory logic is now handled by the 'hub/agent' node inside the workflow itself.
|
|
# We only manage the rolling history window for consistency.
|
|
messages = copy.deepcopy(self.chat_histories[user_identifier])
|
|
|
|
# 4. Agentic Retrieval Loop
|
|
for turn in range(3):
|
|
resolution = await _resolve_target(
|
|
db,
|
|
target_workflow,
|
|
messages,
|
|
request=local_request,
|
|
request_id=req_id,
|
|
sender=username
|
|
)
|
|
real_model, final_msgs = resolution
|
|
|
|
# --- STATIC RESULT INTERCEPTION ---
|
|
# If the workflow produced a result (e.g. from an AGENT or COMPOSER node),
|
|
# return it immediately.
|
|
if real_model == "__result__":
|
|
final_content = final_msgs[-1]["content"] if final_msgs else "Empty workflow result."
|
|
|
|
# Log the output for debugging
|
|
if self.app.state.settings.enable_debug_mode:
|
|
logger.info(f"[DEBUG] BOT WORKFLOW RESULT:\n{final_content}")
|
|
|
|
# Process memory tags from the result
|
|
clean_res = await CognitiveMemoryManager.process_tags(db, user_identifier, target_workflow, final_content)
|
|
|
|
# Save to short-term history
|
|
self.chat_histories[user_identifier].append({"role": "assistant", "content": clean_res})
|
|
return clean_res
|
|
|
|
from app.crud import server_crud
|
|
servers = await server_crud.get_servers_with_model(db, real_model)
|
|
if not servers:
|
|
logger.error(f"Bot check failed: No servers found for resolved model '{real_model}'")
|
|
return f"❌ Error: Compute nodes offline for '{real_model}'."
|
|
|
|
resp, _ = await _reverse_proxy(
|
|
local_request, "chat", servers,
|
|
json.dumps({"model": real_model, "messages": final_msgs, "stream": False}).encode(),
|
|
is_subrequest=True, request_id=req_id, model=target_workflow, sender=username
|
|
)
|
|
|
|
if hasattr(resp, 'body'):
|
|
data = json.loads(resp.body.decode())
|
|
raw_response = data.get("message", {}).get("content", "Empty response.")
|
|
|
|
if self.app.state.settings.enable_debug_mode:
|
|
logger.info(f"[DEBUG] BOT RAW RESPONSE (Turn {turn}):\n{raw_response}")
|
|
|
|
clean_response = await CognitiveMemoryManager.process_tags(db, user_identifier, target_workflow, raw_response)
|
|
|
|
# Handle internal ROM Digging (RLM Search)
|
|
dig_match = re.search(r'<memory_dig\s+regex=["\']([^"\']+)["\']\s*(?:/>|></memory_dig>)', raw_response)
|
|
if dig_match:
|
|
pattern = dig_match.group(1)
|
|
# Search Immutable ROM using Regex
|
|
res_rom = await db.execute(
|
|
select(MemoryEntry).filter(
|
|
MemoryEntry.agent_name == "lollms",
|
|
MemoryEntry.is_immutable == True,
|
|
MemoryEntry.content.op('REGEXP')(pattern)
|
|
).limit(3)
|
|
)
|
|
found = res_rom.scalars().all()
|
|
memory_text = "\n".join([f"RECOVERED ROM: {f.title} - {f.content}" for f in found])
|
|
|
|
# Loop back into context
|
|
messages.append({"role": "assistant", "content": raw_response})
|
|
messages.append({"role": "user", "content": f"INTERNAL ROM SEARCH RESULTS:\n{memory_text}\n\nApply this knowledge to your final answer."})
|
|
continue
|
|
|
|
# Handle internal memory search requests
|
|
search_match = re.search(r'<memory_search\s+category=["\']([^"\']+)["\']\s*(?:/>|></memory_search>)', raw_response)
|
|
if search_match:
|
|
category = search_match.group(1)
|
|
search_results = await CognitiveMemoryManager.search_category(db, user_identifier, target_workflow, category)
|
|
messages.append({"role": "assistant", "content": raw_response})
|
|
messages.append({"role": "user", "content": f"SYSTEM MEMORY RESULTS FOR '{category}':\n{search_results}\n\nNow, continue answering or perform memory operations."})
|
|
continue
|
|
|
|
if not clean_response.strip():
|
|
clean_response = "Memory updated."
|
|
|
|
# Save final output to pure history
|
|
self.chat_histories[user_identifier].append({"role": "assistant", "content": clean_response})
|
|
return clean_response
|
|
else:
|
|
return "⚠️ Error: Request failed."
|
|
return "⚠️ Error: Too many memory turns."
|
|
|
|
async def _run_discord_bot(self, config: BotConfig, token: str):
|
|
pm.ensure_packages(["discord.py"], verbose=True)
|
|
import discord
|
|
|
|
intents = discord.Intents.default()
|
|
intents.messages = True
|
|
intents.message_content = True
|
|
client = discord.Client(intents=intents)
|
|
|
|
def split_message(text, limit=1900):
|
|
"""Splits a string into chunks, preferably at newlines."""
|
|
if len(text) <= limit:
|
|
return [text]
|
|
|
|
chunks = []
|
|
while len(text) > limit:
|
|
# Find the last newline within the limit to avoid breaking formatting
|
|
split_idx = text.rfind('\n', 0, limit)
|
|
if split_idx == -1:
|
|
# No newline found, force split at the limit
|
|
split_idx = limit
|
|
|
|
chunks.append(text[:split_idx].strip())
|
|
text = text[split_idx:].strip()
|
|
|
|
if text:
|
|
chunks.append(text)
|
|
return chunks
|
|
|
|
@client.event
|
|
async def on_message(message):
|
|
if message.author == client.user: return
|
|
|
|
attachments =[]
|
|
for att in message.attachments:
|
|
try:
|
|
data = await att.read()
|
|
mime = att.content_type or 'application/octet-stream'
|
|
if mime.startswith('image/'):
|
|
attachments.append({"type": "image", "data": data, "mime": mime})
|
|
else:
|
|
attachments.append({"type": "doc", "data": data, "filename": att.filename, "mime": mime})
|
|
except Exception as e:
|
|
logger.error(f"Failed to read discord attachment: {e}")
|
|
|
|
# --- PERMISSION SAFETY: TYPING INDICATOR ---
|
|
typing_active = False
|
|
try:
|
|
# Use a timeout for the typing context to prevent hanging if the loop is laggy
|
|
typing_ctx = message.channel.typing()
|
|
await asyncio.wait_for(typing_ctx.__aenter__(), timeout=2.0)
|
|
typing_active = True
|
|
except discord.errors.Forbidden:
|
|
logger.warning(f"Bot lacks 'Send Messages' or 'Read History' in {message.channel}. Continuing without typing indicator.")
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in typing indicator: {e}")
|
|
|
|
try:
|
|
# PROGRESS TRACKER: Use a single message and update it
|
|
status_msg = None
|
|
status_history = []
|
|
|
|
async def discord_notifier(text):
|
|
nonlocal status_msg, status_history
|
|
try:
|
|
# Strip technical finalized messages from bot stream
|
|
if "Task finalized" in text: return
|
|
|
|
status_history.append(text)
|
|
content = "\n".join(status_history)
|
|
|
|
if not status_msg:
|
|
status_msg = await message.channel.send(content)
|
|
else:
|
|
await status_msg.edit(content=content)
|
|
except: pass
|
|
|
|
response = await self._process_bot_request(
|
|
message.content, message.author.name, "discord",
|
|
config.target_workflow, attachments, notify_cb=discord_notifier
|
|
)
|
|
|
|
import re, os
|
|
artifacts = re.findall(r'<artifact\s+.*?path=["\'](.*?)["\'].*?>', response)
|
|
clean_response = re.sub(r'<artifact\s+.*?/>', '', response).strip()
|
|
clean_response = re.sub(r'<artifact\s+.*?>.*?</artifact>', '', clean_response).strip()
|
|
|
|
discord_files =[]
|
|
for path in artifacts:
|
|
local_path = path
|
|
if path.startswith('/static/'):
|
|
local_path = os.path.join("app", path.lstrip('/'))
|
|
if os.path.exists(local_path):
|
|
discord_files.append(discord.File(local_path))
|
|
|
|
if not clean_response and discord_files:
|
|
clean_response = "Here are your files:"
|
|
|
|
# Cleanup status message before final answer
|
|
if status_msg:
|
|
try: await status_msg.delete()
|
|
except: pass
|
|
|
|
# CHUNKING LOGIC: Prevent Discord API 'Content too long' errors
|
|
message_parts = split_message(clean_response)
|
|
for i, part in enumerate(message_parts):
|
|
if part or (i == len(message_parts)-1 and discord_files):
|
|
try:
|
|
if i == len(message_parts) - 1 and discord_files:
|
|
await message.reply(part, files=discord_files)
|
|
else:
|
|
await message.reply(part)
|
|
except discord.errors.Forbidden:
|
|
logger.error(f"CRITICAL: Failed to reply to {message.author}. Check bot permissions in this channel.")
|
|
break
|
|
finally:
|
|
if typing_active:
|
|
try:
|
|
await typing_ctx.__aexit__(None, None, None)
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
await client.start(token)
|
|
except Exception as e:
|
|
logger.error(f"Discord Bot Error: {e}")
|
|
|
|
async def _run_slack_bot(self, config: BotConfig, token: str):
|
|
pm.ensure_packages(["slack_sdk"], verbose=True)
|
|
from slack_sdk.web.async_client import AsyncWebClient
|
|
from slack_sdk.socket_mode.async_client import AsyncSocketModeClient
|
|
from slack_sdk.socket_mode.response import SocketModeResponse
|
|
from slack_sdk.socket_mode.request import SocketModeRequest
|
|
|
|
app_token = decrypt_data(config.extra_settings.get("app_token")) if config.extra_settings else None
|
|
if not app_token:
|
|
logger.error("Slack requires an App Token (xapp-...) in Extra Settings.")
|
|
return
|
|
|
|
web_client = AsyncWebClient(token=token)
|
|
sm_client = AsyncSocketModeClient(app_token=app_token, web_client=web_client)
|
|
|
|
async def process(client, req: SocketModeRequest):
|
|
if req.type == "events_api":
|
|
event = req.payload["event"]
|
|
if event.get("type") == "message" and not event.get("bot_id"):
|
|
# Ack immediately
|
|
await client.send_socket_mode_response(SocketModeResponse(envelope_id=req.envelope_id))
|
|
|
|
# Process
|
|
response = await self._process_bot_request(event.get("text", ""), event["user"], "slack", config.target_workflow, None)
|
|
|
|
import re, os
|
|
artifacts = re.findall(r'<artifact\s+.*?path=["\'](.*?)["\'].*?>', response)
|
|
clean_response = re.sub(r'<artifact\s+.*?/>', '', response).strip()
|
|
clean_response = re.sub(r'<artifact\s+.*?>.*?</artifact>', '', clean_response).strip()
|
|
|
|
await web_client.chat_postMessage(channel=event["channel"], text=clean_response or "Files attached:", thread_ts=event.get("ts"))
|
|
|
|
for path in artifacts:
|
|
local_path = path
|
|
if path.startswith('/static/'): local_path = os.path.join("app", path.lstrip('/'))
|
|
if os.path.exists(local_path):
|
|
await web_client.files_upload_v2(channel=event["channel"], file=local_path, thread_ts=event.get("ts"))
|
|
|
|
sm_client.socket_mode_request_listeners.append(process)
|
|
await sm_client.connect()
|
|
while not self._shutdown: await asyncio.sleep(1)
|
|
|
|
async def _run_telegram_bot(self, config: BotConfig, token: str):
|
|
"""Standard Telegram integration via python-telegram-bot."""
|
|
pm.ensure_packages(["python-telegram-bot"], verbose=True)
|
|
from telegram import Update
|
|
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
|
|
from app.api.v1.routes.proxy import _resolve_target, _reverse_proxy, _async_log_usage
|
|
from fastapi import Request
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
|
|
if not update.message: return
|
|
|
|
user_text = update.message.caption or update.message.text or ""
|
|
chat_id = update.message.chat_id
|
|
username = update.message.from_user.username or "tg_user"
|
|
|
|
attachments =[]
|
|
if update.message.photo:
|
|
try:
|
|
photo_file = await update.message.photo[-1].get_file()
|
|
data = await photo_file.download_as_bytearray()
|
|
attachments.append({"type": "image", "data": data, "mime": "image/jpeg"})
|
|
except Exception as e: logger.error(f"TG Photo error: {e}")
|
|
if update.message.document:
|
|
try:
|
|
doc_file = await update.message.document.get_file()
|
|
data = await doc_file.download_as_bytearray()
|
|
attachments.append({"type": "doc", "data": data, "filename": update.message.document.file_name, "mime": update.message.document.mime_type or 'application/octet-stream'})
|
|
except Exception as e: logger.error(f"TG Doc error: {e}")
|
|
|
|
if not user_text and not attachments: return
|
|
|
|
# Send placeholder
|
|
placeholder = await update.message.reply_text("✨ Thinking...")
|
|
|
|
try:
|
|
response = await self._process_bot_request(user_text, username, "telegram", config.target_workflow, attachments)
|
|
|
|
import re, os
|
|
artifacts = re.findall(r'<artifact\s+.*?path=["\'](.*?)["\'].*?>', response)
|
|
clean_response = re.sub(r'<artifact\s+.*?/>', '', response).strip()
|
|
clean_response = re.sub(r'<artifact\s+.*?>.*?</artifact>', '', clean_response).strip()
|
|
|
|
if not clean_response and artifacts: clean_response = "Here are your files:"
|
|
|
|
if clean_response:
|
|
await placeholder.edit_text(clean_response, parse_mode='Markdown')
|
|
else:
|
|
await placeholder.delete()
|
|
|
|
for path in artifacts:
|
|
local_path = path
|
|
if path.startswith('/static/'): local_path = os.path.join("app", path.lstrip('/'))
|
|
if os.path.exists(local_path):
|
|
with open(local_path, 'rb') as f:
|
|
if local_path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.webp')):
|
|
await context.bot.send_photo(chat_id=chat_id, photo=f)
|
|
else:
|
|
await context.bot.send_document(chat_id=chat_id, document=f)
|
|
except Exception as e:
|
|
logger.error(f"Bot execution error: {e}")
|
|
await placeholder.edit_text(f"⚠️ Internal Error: {str(e)[:100]}...")
|
|
|
|
application = Application.builder().token(token).build()
|
|
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
|
|
|
try:
|
|
await application.initialize()
|
|
await application.start()
|
|
await application.updater.start_polling()
|
|
|
|
while not self._shutdown:
|
|
await asyncio.sleep(1)
|
|
|
|
finally:
|
|
await application.updater.stop()
|
|
await application.stop()
|
|
await application.shutdown()
|
|
|
|
async def shutdown(self):
|
|
self._shutdown = True
|
|
for bid in list(self.active_tasks.keys()):
|
|
await self.stop_bot(bid) |