mirror of
https://github.com/danielmiessler/Fabric.git
synced 2026-01-10 06:48:04 -05:00
- Improved pattern creation, editing, and deletion functionalities. - Enhanced logging configuration for better debugging and user feedback. - Updated input validation and sanitization processes to ensure safe pattern processing. - Streamlined session state initialization for improved performance. - Added new UI components for better user experience in pattern management and output analysis.
1737 lines
72 KiB
Python
1737 lines
72 KiB
Python
import shutil
|
|
import json
|
|
import os
|
|
import streamlit as st
|
|
from subprocess import run, CalledProcessError
|
|
from dotenv import load_dotenv
|
|
import re
|
|
import time
|
|
import logging
|
|
from typing import Dict, List, Optional, Tuple
|
|
from datetime import datetime
|
|
import sys
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
import numpy as np
|
|
|
|
# Create formatters
|
|
console_formatter = logging.Formatter(
|
|
'\033[92m%(asctime)s\033[0m - ' # Green timestamp
|
|
'\033[94m%(levelname)s\033[0m - ' # Blue level
|
|
'\033[95m[%(funcName)s]\033[0m ' # Purple function name
|
|
'%(message)s' # Regular message
|
|
)
|
|
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - [%(funcName)s] %(message)s')
|
|
|
|
# Configure root logger
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
# Clear any existing handlers
|
|
logger.handlers = []
|
|
|
|
# Console Handler
|
|
console_handler = logging.StreamHandler(sys.stdout)
|
|
console_handler.setFormatter(console_formatter)
|
|
console_handler.setLevel(logging.INFO)
|
|
logger.addHandler(console_handler)
|
|
|
|
# File Handler
|
|
log_dir = os.path.expanduser("~/.config/fabric/logs")
|
|
os.makedirs(log_dir, exist_ok=True)
|
|
log_file = os.path.join(log_dir, f"fabric_ui_{datetime.now().strftime('%Y%m%d')}.log")
|
|
file_handler = logging.FileHandler(log_file)
|
|
file_handler.setFormatter(file_formatter)
|
|
file_handler.setLevel(logging.DEBUG) # More detailed logging in file
|
|
logger.addHandler(file_handler)
|
|
|
|
# Log startup message
|
|
logger.info("🚀 Fabric UI Starting Up")
|
|
logger.info(f"💾 Log file: {log_file}")
|
|
|
|
# Global variables
|
|
pattern_dir = os.path.expanduser("~/.config/fabric/patterns")
|
|
MAX_RETRIES = 3
|
|
RETRY_DELAY = 1 # seconds
|
|
|
|
def initialize_session_state():
|
|
"""Initialize necessary session state attributes.
|
|
|
|
Error handling:
|
|
- Ensures all required session state variables are initialized
|
|
- Loads saved outputs from persistent storage
|
|
- Handles missing or corrupted saved output files
|
|
"""
|
|
logger.info("Initializing session state")
|
|
default_configs = {
|
|
# Configuration state
|
|
"config_loaded": False,
|
|
"vendors": {},
|
|
"available_models": [],
|
|
"selected_vendor": None,
|
|
"selected_model": None,
|
|
|
|
# Pattern execution state
|
|
"input_content": "",
|
|
"selected_patterns": [],
|
|
"chat_output": [],
|
|
"current_view": "run",
|
|
|
|
# Pattern creation state
|
|
"wizard_step": "Basic Info",
|
|
"session_name": "",
|
|
"context_name": "",
|
|
|
|
# Model configuration
|
|
"config": {
|
|
"vendor": "",
|
|
"model": "",
|
|
"context_length": "2048"
|
|
},
|
|
|
|
# Model caching
|
|
"cached_models": None,
|
|
"last_model_fetch": 0,
|
|
|
|
# UI state
|
|
"active_tab": 0,
|
|
|
|
# Output management
|
|
"output_logs": [],
|
|
"starred_outputs": [],
|
|
"starring_output": None,
|
|
"temp_star_name": ""
|
|
}
|
|
|
|
for key, value in default_configs.items():
|
|
if key not in st.session_state:
|
|
st.session_state[key] = value
|
|
|
|
# Load saved outputs if they exist
|
|
load_saved_outputs()
|
|
|
|
def parse_models_output(output: str) -> Dict[str, List[str]]:
|
|
"""Parse the output of fabric --listmodels command."""
|
|
logger.debug("Parsing models output")
|
|
providers = {}
|
|
current_provider = None
|
|
|
|
lines = output.split('\n')
|
|
for line in lines:
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
|
|
if line == "Available models:":
|
|
continue
|
|
|
|
if not line.startswith('\t') and not line.startswith('['):
|
|
current_provider = line.strip()
|
|
providers[current_provider] = []
|
|
elif current_provider and (line.startswith('\t') or line.startswith('[')):
|
|
model = line.strip()
|
|
if '[' in model and ']' in model:
|
|
model = model.split(']', 1)[1].strip()
|
|
providers[current_provider].append(model)
|
|
|
|
logger.debug(f"Found providers: {list(providers.keys())}")
|
|
return providers
|
|
|
|
def safe_run_command(command: List[str], retry: bool = True) -> Tuple[bool, str, str]:
|
|
"""Safely run a command with retries."""
|
|
cmd_str = " ".join(command)
|
|
logger.info(f"Executing command: {cmd_str}")
|
|
|
|
for attempt in range(MAX_RETRIES if retry else 1):
|
|
try:
|
|
logger.debug(f"Attempt {attempt + 1}/{MAX_RETRIES if retry else 1}")
|
|
result = run(command, capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
logger.debug("Command executed successfully")
|
|
return True, result.stdout, ""
|
|
if attempt == MAX_RETRIES - 1 or not retry:
|
|
logger.error(f"Command failed with return code {result.returncode}: {result.stderr}")
|
|
return False, "", result.stderr
|
|
except Exception as e:
|
|
if attempt == MAX_RETRIES - 1 or not retry:
|
|
logger.error(f"Command execution failed: {str(e)}")
|
|
return False, "", str(e)
|
|
logger.debug(f"Retrying in {RETRY_DELAY} seconds...")
|
|
time.sleep(RETRY_DELAY)
|
|
logger.error("Max retries exceeded")
|
|
return False, "", "Max retries exceeded"
|
|
|
|
def fetch_models_once() -> Dict[str, List[str]]:
|
|
"""Fetch models once and cache the results."""
|
|
logger.info("Fetching models")
|
|
current_time = time.time()
|
|
cache_timeout = 300 # 5 minutes
|
|
|
|
if (st.session_state.cached_models is not None and
|
|
current_time - st.session_state.last_model_fetch < cache_timeout):
|
|
logger.debug("Using cached models")
|
|
return st.session_state.cached_models
|
|
|
|
logger.debug("Cache expired or not available, fetching new models")
|
|
success, stdout, stderr = safe_run_command(["fabric", "--listmodels"])
|
|
if not success:
|
|
logger.error(f"Failed to fetch models: {stderr}")
|
|
st.error(f"Failed to fetch models: {stderr}")
|
|
return {}
|
|
|
|
providers = parse_models_output(stdout)
|
|
logger.info(f"Found {len(providers)} providers")
|
|
st.session_state.cached_models = providers
|
|
st.session_state.last_model_fetch = current_time
|
|
return providers
|
|
|
|
def get_configured_providers() -> Dict[str, List[str]]:
|
|
"""Get list of configured providers using fabric --listmodels."""
|
|
return fetch_models_once()
|
|
|
|
def update_provider_selection(new_provider: str) -> None:
|
|
"""Update provider and reset related states."""
|
|
logger.info(f"Updating provider selection to: {new_provider}")
|
|
if new_provider != st.session_state.config["vendor"]:
|
|
logger.debug("Provider changed, resetting model selection")
|
|
st.session_state.config["vendor"] = new_provider
|
|
st.session_state.selected_vendor = new_provider
|
|
st.session_state.config["model"] = None
|
|
st.session_state.selected_model = None
|
|
st.session_state.available_models = []
|
|
if "model_select" in st.session_state:
|
|
del st.session_state.model_select
|
|
logger.debug("Model state reset completed")
|
|
|
|
def load_configuration() -> bool:
|
|
"""Load environment variables and initialize configuration."""
|
|
logger.info("Loading configuration")
|
|
try:
|
|
env_path = os.path.expanduser("~/.config/fabric/.env")
|
|
logger.debug(f"Looking for .env file at: {env_path}")
|
|
|
|
if not os.path.exists(env_path):
|
|
logger.error(f"Configuration file not found at {env_path}")
|
|
st.error(f"Configuration file not found at {env_path}")
|
|
return False
|
|
|
|
load_dotenv(dotenv_path=env_path)
|
|
logger.debug("Environment variables loaded")
|
|
|
|
with st.spinner("Loading providers and models..."):
|
|
providers = get_configured_providers()
|
|
|
|
if not providers:
|
|
logger.error("No providers configured")
|
|
st.error("No providers configured. Please run 'fabric --setup' first.")
|
|
return False
|
|
|
|
default_vendor = os.getenv("DEFAULT_VENDOR")
|
|
default_model = os.getenv("DEFAULT_MODEL")
|
|
context_length = os.getenv("DEFAULT_MODEL_CONTEXT_LENGTH", "2048")
|
|
|
|
logger.debug(f"Default configuration - Vendor: {default_vendor}, Model: {default_model}")
|
|
|
|
if not default_vendor or default_vendor not in providers:
|
|
default_vendor = next(iter(providers))
|
|
default_model = providers[default_vendor][0] if providers[default_vendor] else None
|
|
logger.info(f"Using fallback configuration - Vendor: {default_vendor}, Model: {default_model}")
|
|
|
|
st.session_state.config = {
|
|
"vendor": default_vendor,
|
|
"model": default_model,
|
|
"context_length": context_length
|
|
}
|
|
st.session_state.vendors = providers
|
|
st.session_state.config_loaded = True
|
|
|
|
logger.info("Configuration loaded successfully")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Configuration error: {str(e)}", exc_info=True)
|
|
st.error(f"Configuration error: {str(e)}")
|
|
return False
|
|
|
|
def load_models_and_providers() -> None:
|
|
"""Load models and providers from fabric configuration."""
|
|
try:
|
|
st.sidebar.header("Model and Provider Selection")
|
|
|
|
providers: Dict[str, List[str]] = fetch_models_once()
|
|
|
|
if not providers:
|
|
st.sidebar.error("No providers configured")
|
|
return
|
|
|
|
current_vendor = st.session_state.config.get("vendor", "")
|
|
available_providers = list(providers.keys())
|
|
|
|
try:
|
|
provider_index = available_providers.index(current_vendor) if current_vendor in available_providers else 0
|
|
except ValueError:
|
|
provider_index = 0
|
|
logger.warning(f"Current vendor {current_vendor} not found in available providers")
|
|
|
|
selected_provider = st.sidebar.selectbox(
|
|
"Provider",
|
|
available_providers,
|
|
index=provider_index,
|
|
key="provider_select",
|
|
on_change=lambda: update_provider_selection(st.session_state.provider_select)
|
|
)
|
|
|
|
if selected_provider != st.session_state.config.get("vendor"):
|
|
update_provider_selection(selected_provider)
|
|
st.sidebar.success(f"Using {selected_provider}")
|
|
|
|
available_models = providers.get(selected_provider, [])
|
|
if not available_models:
|
|
st.sidebar.warning(f"No models available for {selected_provider}")
|
|
return
|
|
|
|
current_model = st.session_state.config.get("model")
|
|
try:
|
|
model_index = available_models.index(current_model) if current_model in available_models else 0
|
|
except ValueError:
|
|
model_index = 0
|
|
logger.warning(f"Current model {current_model} not found in available models for {selected_provider}")
|
|
|
|
model_key = f"model_select_{selected_provider}"
|
|
selected_model = st.sidebar.selectbox(
|
|
"Model",
|
|
available_models,
|
|
index=model_index,
|
|
key=model_key
|
|
)
|
|
|
|
if selected_model != st.session_state.config.get("model"):
|
|
logger.debug(f"Updating model selection to: {selected_model}")
|
|
st.session_state.config["model"] = selected_model
|
|
st.session_state.selected_model = selected_model
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error loading models and providers: {str(e)}", exc_info=True)
|
|
st.sidebar.error(f"Error loading models and providers: {str(e)}")
|
|
st.session_state.selected_model = None
|
|
st.session_state.config["model"] = None
|
|
|
|
def get_pattern_metadata(pattern_name):
|
|
"""Get pattern metadata from system.md."""
|
|
pattern_path = os.path.join(pattern_dir, pattern_name, "system.md")
|
|
if os.path.exists(pattern_path):
|
|
with open(pattern_path, "r") as f:
|
|
return f.read()
|
|
return None
|
|
|
|
def get_patterns():
|
|
"""Get the list of available patterns from the specified directory."""
|
|
if not os.path.exists(pattern_dir):
|
|
st.error(f"Pattern directory not found: {pattern_dir}")
|
|
return []
|
|
try:
|
|
patterns = [item for item in os.listdir(pattern_dir)
|
|
if os.path.isdir(os.path.join(pattern_dir, item))]
|
|
return patterns
|
|
except PermissionError:
|
|
st.error(f"Permission error accessing pattern directory: {pattern_dir}")
|
|
return []
|
|
except Exception as e:
|
|
st.error(f"An unexpected error occurred: {e}")
|
|
return []
|
|
|
|
def create_pattern(pattern_name: str, content: Optional[str] = None) -> Tuple[bool, str]:
|
|
"""Create a new pattern with necessary files and structure."""
|
|
new_pattern_path = None
|
|
try:
|
|
# Validate pattern name
|
|
if not pattern_name:
|
|
logger.error("Pattern name cannot be empty")
|
|
return False, "Pattern name cannot be empty."
|
|
|
|
# Check if pattern already exists
|
|
new_pattern_path = os.path.join(pattern_dir, pattern_name)
|
|
if os.path.exists(new_pattern_path):
|
|
logger.error(f"Pattern {pattern_name} already exists")
|
|
return False, "Pattern already exists."
|
|
|
|
# Create pattern directory
|
|
os.makedirs(new_pattern_path)
|
|
logger.info(f"Created pattern directory: {new_pattern_path}")
|
|
|
|
# If content is provided, use fabric create_pattern to structure it
|
|
if content:
|
|
logger.info(f"Structuring content for pattern '{pattern_name}' using Fabric")
|
|
try:
|
|
# Get current model and provider configuration
|
|
current_provider = st.session_state.config.get("vendor")
|
|
current_model = st.session_state.config.get("model")
|
|
|
|
if not current_provider or not current_model:
|
|
raise ValueError("Please select a provider and model first.")
|
|
|
|
# Execute fabric create_pattern with input content
|
|
cmd = ["fabric", "--pattern", "create_pattern"]
|
|
if current_provider and current_model:
|
|
cmd.extend(["--vendor", current_provider, "--model", current_model])
|
|
|
|
logger.debug(f"Running command: {' '.join(cmd)}")
|
|
logger.debug(f"Input content:\n{content}")
|
|
|
|
# Execute pattern
|
|
result = run(cmd, input=content, capture_output=True, text=True, check=True)
|
|
structured_content = result.stdout.strip()
|
|
|
|
if not structured_content:
|
|
raise ValueError("No output received from create_pattern")
|
|
|
|
# Save the structured content to system.md
|
|
system_file = os.path.join(new_pattern_path, "system.md")
|
|
with open(system_file, "w") as f:
|
|
f.write(structured_content)
|
|
|
|
# Validate the created pattern
|
|
is_valid, validation_message = validate_pattern(pattern_name)
|
|
if not is_valid:
|
|
raise ValueError(f"Pattern validation failed: {validation_message}")
|
|
|
|
logger.info(f"Successfully created pattern '{pattern_name}' with structured content")
|
|
|
|
except CalledProcessError as e:
|
|
error_msg = f"Error running create_pattern: {e.stderr}"
|
|
logger.error(error_msg)
|
|
if os.path.exists(new_pattern_path):
|
|
shutil.rmtree(new_pattern_path)
|
|
return False, error_msg
|
|
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error during content structuring: {str(e)}"
|
|
logger.error(error_msg)
|
|
if os.path.exists(new_pattern_path):
|
|
shutil.rmtree(new_pattern_path)
|
|
return False, error_msg
|
|
else:
|
|
# Create minimal template for manual editing
|
|
logger.info(f"Creating minimal template for pattern '{pattern_name}'")
|
|
system_file = os.path.join(new_pattern_path, "system.md")
|
|
with open(system_file, "w") as f:
|
|
f.write("# IDENTITY and PURPOSE\n\n# STEPS\n\n# OUTPUT INSTRUCTIONS\n")
|
|
|
|
# Validate the created pattern
|
|
is_valid, validation_message = validate_pattern(pattern_name)
|
|
if not is_valid:
|
|
logger.warning(f"Pattern created but validation failed: {validation_message}")
|
|
|
|
return True, f"Pattern '{pattern_name}' created successfully."
|
|
|
|
except Exception as e:
|
|
error_msg = f"Error creating pattern: {str(e)}"
|
|
logger.error(error_msg)
|
|
# Clean up on any error
|
|
if new_pattern_path and os.path.exists(new_pattern_path):
|
|
shutil.rmtree(new_pattern_path)
|
|
return False, error_msg
|
|
|
|
def delete_pattern(pattern_name):
|
|
"""Delete an existing pattern."""
|
|
try:
|
|
if not pattern_name:
|
|
return False, "Pattern name cannot be empty."
|
|
|
|
pattern_path = os.path.join(pattern_dir, pattern_name)
|
|
if not os.path.exists(pattern_path):
|
|
return False, "Pattern does not exist."
|
|
|
|
shutil.rmtree(pattern_path)
|
|
return True, f"Pattern '{pattern_name}' deleted successfully."
|
|
except Exception as e:
|
|
return False, f"Error deleting pattern: {str(e)}"
|
|
|
|
def pattern_creation_wizard():
|
|
"""Multi-step wizard for creating a new pattern."""
|
|
st.header("Create New Pattern")
|
|
|
|
pattern_name = st.text_input("Pattern Name")
|
|
if pattern_name:
|
|
edit_mode = st.radio(
|
|
"Edit Mode",
|
|
["Simple Editor", "Advanced (Wizard)"],
|
|
key="pattern_creation_edit_mode",
|
|
horizontal=True
|
|
)
|
|
|
|
if edit_mode == "Simple Editor":
|
|
new_content = st.text_area("Enter Pattern Content", height=400)
|
|
|
|
if st.button("Create Pattern", type="primary"):
|
|
success, message = create_pattern(pattern_name, new_content)
|
|
if success:
|
|
st.success(message)
|
|
st.experimental_rerun()
|
|
else:
|
|
st.error(message)
|
|
|
|
else:
|
|
sections = ["IDENTITY", "GOAL", "OUTPUT", "OUTPUT INSTRUCTIONS"]
|
|
current_section = st.radio(
|
|
"Edit Section",
|
|
sections,
|
|
key="pattern_creation_section_select"
|
|
)
|
|
|
|
if current_section == "IDENTITY":
|
|
identity = st.text_area("Define the IDENTITY", height=200)
|
|
st.session_state.new_pattern_identity = identity
|
|
|
|
elif current_section == "GOAL":
|
|
goal = st.text_area("Define the GOAL", height=200)
|
|
st.session_state.new_pattern_goal = goal
|
|
|
|
elif current_section == "OUTPUT":
|
|
output = st.text_area("Define the OUTPUT", height=200)
|
|
st.session_state.new_pattern_output = output
|
|
|
|
elif current_section == "OUTPUT INSTRUCTIONS":
|
|
instructions = st.text_area("Define the OUTPUT INSTRUCTIONS", height=200)
|
|
st.session_state.new_pattern_instructions = instructions
|
|
|
|
pattern_content = f"""# IDENTITY
|
|
{st.session_state.get('new_pattern_identity', '')}
|
|
|
|
# GOAL
|
|
{st.session_state.get('new_pattern_goal', '')}
|
|
|
|
# OUTPUT
|
|
{st.session_state.get('new_pattern_output', '')}
|
|
|
|
# OUTPUT INSTRUCTIONS
|
|
{st.session_state.get('new_pattern_instructions', '')}"""
|
|
|
|
if st.button("Create Pattern", type="primary"):
|
|
success, message = create_pattern(pattern_name, pattern_content)
|
|
if success:
|
|
st.success(message)
|
|
for key in ["new_pattern_identity", "new_pattern_goal", "new_pattern_output", "new_pattern_instructions"]:
|
|
if key in st.session_state:
|
|
del st.session_state[key]
|
|
st.experimental_rerun()
|
|
else:
|
|
st.error(message)
|
|
else:
|
|
st.info("Enter a pattern name to create a new pattern")
|
|
|
|
def bulk_edit_patterns(patterns_to_edit, field_to_update, new_value):
|
|
"""Perform bulk edits on multiple patterns."""
|
|
results = []
|
|
for pattern in patterns_to_edit:
|
|
try:
|
|
pattern_path = os.path.join(pattern_dir, pattern)
|
|
system_file = os.path.join(pattern_path, "system.md")
|
|
|
|
if not os.path.exists(system_file):
|
|
results.append((pattern, False, "system.md not found"))
|
|
continue
|
|
|
|
with open(system_file, "r") as f:
|
|
content = f.read()
|
|
|
|
if field_to_update == "purpose":
|
|
sections = content.split("#")
|
|
updated_sections = []
|
|
for section in sections:
|
|
if section.strip().startswith("IDENTITY and PURPOSE"):
|
|
lines = section.split("\n")
|
|
for i, line in enumerate(lines):
|
|
if "You are an AI assistant designed to" in line:
|
|
lines[i] = f"You are an AI assistant designed to {new_value}."
|
|
updated_sections.append("\n".join(lines))
|
|
else:
|
|
updated_sections.append(section)
|
|
|
|
new_content = "#".join(updated_sections)
|
|
with open(system_file, "w") as f:
|
|
f.write(new_content)
|
|
results.append((pattern, True, "Updated successfully"))
|
|
else:
|
|
results.append((pattern, False, f"Field {field_to_update} not supported for bulk edit"))
|
|
|
|
except Exception as e:
|
|
results.append((pattern, False, str(e)))
|
|
|
|
return results
|
|
|
|
def pattern_creation_ui():
|
|
"""UI component for creating patterns with simple and wizard modes."""
|
|
pattern_name = st.text_input("Pattern Name")
|
|
if not pattern_name:
|
|
st.info("Enter a pattern name to create a new pattern")
|
|
return
|
|
|
|
system_content = """# IDENTITY and PURPOSE
|
|
|
|
You are an AI assistant designed to {purpose}.
|
|
|
|
# STEPS
|
|
|
|
- Step 1
|
|
- Step 2
|
|
- Step 3
|
|
|
|
# OUTPUT INSTRUCTIONS
|
|
|
|
- Output format instructions here
|
|
"""
|
|
new_content = st.text_area("Edit Pattern Content", system_content, height=400)
|
|
|
|
if st.button("Create Pattern", type="primary"):
|
|
if not pattern_name:
|
|
st.error("Pattern name cannot be empty.")
|
|
else:
|
|
success, message = create_pattern(pattern_name)
|
|
if success:
|
|
system_file = os.path.join(pattern_dir, pattern_name, "system.md")
|
|
with open(system_file, "w") as f:
|
|
f.write(new_content)
|
|
st.success(f"Pattern '{pattern_name}' created successfully!")
|
|
st.experimental_rerun()
|
|
else:
|
|
st.error(message)
|
|
|
|
def pattern_management_ui():
|
|
"""UI component for pattern management."""
|
|
st.sidebar.title("Pattern Management")
|
|
|
|
def save_output_log(pattern_name: str, input_content: str, output_content: str, timestamp: str):
|
|
"""Save pattern execution log."""
|
|
log_entry = {
|
|
"timestamp": timestamp,
|
|
"pattern_name": pattern_name,
|
|
"input": input_content,
|
|
"output": output_content,
|
|
"is_starred": False,
|
|
"custom_name": ""
|
|
}
|
|
st.session_state.output_logs.append(log_entry)
|
|
# Save outputs after each new log entry
|
|
save_outputs()
|
|
|
|
def star_output(log_index: int, custom_name: str = "") -> bool:
|
|
"""Star/favorite an output log.
|
|
|
|
Args:
|
|
log_index: Index of the output log to star
|
|
custom_name: Optional custom name for the starred output
|
|
|
|
Returns:
|
|
bool: True if output was starred successfully, False otherwise
|
|
"""
|
|
try:
|
|
if 0 <= log_index < len(st.session_state.output_logs):
|
|
log_entry = st.session_state.output_logs[log_index].copy()
|
|
log_entry["is_starred"] = True
|
|
log_entry["custom_name"] = custom_name or f"Starred Output #{len(st.session_state.starred_outputs) + 1}"
|
|
|
|
# Check if this output is already starred (by timestamp)
|
|
if not any(s["timestamp"] == log_entry["timestamp"] for s in st.session_state.starred_outputs):
|
|
st.session_state.starred_outputs.append(log_entry)
|
|
save_outputs() # Save after starring
|
|
return True
|
|
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error starring output: {str(e)}")
|
|
return False
|
|
|
|
def unstar_output(log_index: int):
|
|
"""Remove an output from starred/favorites."""
|
|
if 0 <= log_index < len(st.session_state.starred_outputs):
|
|
st.session_state.starred_outputs.pop(log_index)
|
|
# Save outputs after unstarring
|
|
save_outputs()
|
|
|
|
def validate_input_content(input_text: str) -> Tuple[bool, str]:
|
|
"""Validate input content for potentially problematic characters or patterns.
|
|
|
|
Args:
|
|
input_text: The input text to validate
|
|
|
|
Returns:
|
|
Tuple[bool, str]: (is_valid, error_message)
|
|
"""
|
|
if not input_text or input_text.isspace():
|
|
return False, "Input content cannot be empty or only whitespace."
|
|
|
|
# Check for minimum length
|
|
if len(input_text.strip()) < 2:
|
|
return False, "Input content must be at least 2 characters long."
|
|
|
|
# Check for maximum length (e.g., 100KB)
|
|
if len(input_text.encode('utf-8')) > 100 * 1024:
|
|
return False, "Input content exceeds maximum size of 100KB."
|
|
|
|
# Check for high concentration of special characters
|
|
special_chars = set('!@#$%^&*()_+[]{}|\\;:\'",.<>?`~')
|
|
special_char_count = sum(1 for c in input_text if c in special_chars)
|
|
special_char_ratio = special_char_count / len(input_text)
|
|
|
|
if special_char_ratio > 0.3: # More than 30% special characters
|
|
return False, "Input contains too many special characters. Please check your input."
|
|
|
|
# Check for control characters
|
|
control_chars = set(chr(i) for i in range(32) if i not in [9, 10, 13]) # Allow tab, newline, carriage return
|
|
if any(c in control_chars for c in input_text):
|
|
return False, "Input contains invalid control characters."
|
|
|
|
# Check for proper UTF-8 encoding
|
|
try:
|
|
input_text.encode('utf-8').decode('utf-8')
|
|
except UnicodeError:
|
|
return False, "Input contains invalid Unicode characters."
|
|
|
|
return True, ""
|
|
|
|
def sanitize_input_content(input_text: str) -> str:
|
|
"""Sanitize input content by removing or replacing problematic characters.
|
|
|
|
Args:
|
|
input_text: The input text to sanitize
|
|
|
|
Returns:
|
|
str: Sanitized input text
|
|
"""
|
|
# Remove null bytes
|
|
text = input_text.replace('\0', '')
|
|
|
|
# Replace control characters with spaces (except newlines and tabs)
|
|
allowed_chars = {'\n', '\t', '\r'}
|
|
sanitized_chars = []
|
|
for c in text:
|
|
if c in allowed_chars or ord(c) >= 32:
|
|
sanitized_chars.append(c)
|
|
else:
|
|
sanitized_chars.append(' ')
|
|
|
|
# Join characters and normalize whitespace
|
|
text = ''.join(sanitized_chars)
|
|
text = ' '.join(text.split())
|
|
|
|
return text
|
|
|
|
def execute_patterns(patterns_to_run: List[str], chain_mode: bool = False, initial_input: Optional[str] = None) -> List[str]:
|
|
"""Execute the selected patterns and capture their outputs."""
|
|
logger.info(f"Executing {len(patterns_to_run)} patterns")
|
|
|
|
st.session_state.chat_output = []
|
|
all_outputs = []
|
|
current_input = initial_input or st.session_state.input_content
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
# Validate configuration
|
|
current_provider = st.session_state.config.get("vendor")
|
|
current_model = st.session_state.config.get("model")
|
|
|
|
if not current_provider or not current_model:
|
|
error_msg = "Please select a provider and model first."
|
|
logger.error(error_msg)
|
|
st.error(error_msg)
|
|
return all_outputs
|
|
|
|
# Validate input content
|
|
is_valid, error_message = validate_input_content(current_input)
|
|
if not is_valid:
|
|
logger.error(f"Input validation failed: {error_message}")
|
|
st.error(f"Input validation failed: {error_message}")
|
|
return all_outputs
|
|
|
|
# Sanitize input content
|
|
try:
|
|
sanitized_input = sanitize_input_content(current_input)
|
|
if sanitized_input != current_input:
|
|
logger.info("Input content was sanitized")
|
|
st.warning("Input content was automatically sanitized for better compatibility.")
|
|
|
|
current_input = sanitized_input
|
|
except Exception as e:
|
|
logger.error(f"Error sanitizing input: {str(e)}")
|
|
st.error(f"Error processing input: {str(e)}")
|
|
return all_outputs
|
|
|
|
execution_info = f"**Using Model:** {current_provider} - {current_model}"
|
|
all_outputs.append(execution_info)
|
|
logger.info(f"Using model: {current_model} from provider: {current_provider}")
|
|
|
|
try:
|
|
for pattern in patterns_to_run:
|
|
logger.info(f"Running pattern: {pattern}")
|
|
try:
|
|
cmd = ["fabric", "--pattern", pattern]
|
|
logger.debug(f"Executing command: {' '.join(cmd)}")
|
|
|
|
message = current_input if chain_mode else st.session_state.input_content
|
|
logger.debug(f"Input for pattern {pattern}:\n{message}")
|
|
|
|
# Ensure input_data is a string
|
|
input_data = str(message)
|
|
|
|
# Run the command with text=True and string input
|
|
result = run(
|
|
cmd,
|
|
input=input_data,
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
|
|
pattern_output = result.stdout.strip()
|
|
logger.debug(f"Raw output from pattern {pattern}:\n{pattern_output}")
|
|
|
|
if pattern_output:
|
|
# Format output as markdown
|
|
output_msg = f"""### {pattern}
|
|
|
|
{pattern_output}"""
|
|
all_outputs.append(output_msg)
|
|
# Save to output logs with markdown formatting
|
|
save_output_log(pattern, message, pattern_output, timestamp)
|
|
if chain_mode:
|
|
current_input = pattern_output
|
|
else:
|
|
logger.warning(f"Pattern {pattern} generated no output")
|
|
all_outputs.append(f"### {pattern}\n\nNo output generated.")
|
|
|
|
except UnicodeEncodeError as e:
|
|
error_msg = f"### {pattern}\n\n❌ Error: Input contains invalid characters: {str(e)}"
|
|
logger.error(f"Unicode encoding error for pattern {pattern}: {str(e)}")
|
|
all_outputs.append(error_msg)
|
|
if chain_mode:
|
|
break
|
|
|
|
except CalledProcessError as e:
|
|
error_msg = f"### {pattern}\n\n❌ Error executing: {e.stderr.strip()}"
|
|
logger.error(f"Pattern {pattern} failed: {e.stderr.strip()}")
|
|
all_outputs.append(error_msg)
|
|
if chain_mode:
|
|
break
|
|
|
|
except Exception as e:
|
|
error_msg = f"### {pattern}\n\n❌ Failed to execute: {str(e)}"
|
|
logger.error(f"Pattern {pattern} failed: {str(e)}", exc_info=True)
|
|
all_outputs.append(error_msg)
|
|
if chain_mode:
|
|
break
|
|
|
|
except Exception as e:
|
|
error_msg = f"### Error\n\n❌ Error in pattern execution: {str(e)}"
|
|
logger.error(error_msg, exc_info=True)
|
|
st.error(error_msg)
|
|
|
|
logger.info("Pattern execution completed")
|
|
return all_outputs
|
|
|
|
def validate_pattern(pattern_name):
|
|
"""Validate a pattern's structure and content."""
|
|
try:
|
|
pattern_path = os.path.join(pattern_dir, pattern_name)
|
|
|
|
if not os.path.exists(os.path.join(pattern_path, "system.md")):
|
|
return False, f"Missing required file: system.md."
|
|
|
|
with open(os.path.join(pattern_path, "system.md")) as f:
|
|
content = f.read()
|
|
required_sections = [
|
|
"# IDENTITY",
|
|
"# STEPS",
|
|
"# OUTPUT"
|
|
]
|
|
missing_sections = []
|
|
for section in required_sections:
|
|
if section.lower() not in content.lower():
|
|
missing_sections.append(section)
|
|
|
|
if missing_sections:
|
|
return True, f"Warning: Missing sections in system.md: {', '.join(missing_sections)}"
|
|
|
|
return True, "Pattern is valid."
|
|
except Exception as e:
|
|
return False, f"Error validating pattern: {str(e)}"
|
|
|
|
def pattern_editor(pattern_name):
|
|
"""Edit pattern content with simple and advanced editing options."""
|
|
if not pattern_name:
|
|
return
|
|
|
|
pattern_path = os.path.join(pattern_dir, pattern_name)
|
|
system_file = os.path.join(pattern_path, "system.md")
|
|
user_file = os.path.join(pattern_path, "user.md")
|
|
|
|
st.markdown(f"### Editing Pattern: {pattern_name}")
|
|
is_valid, message = validate_pattern(pattern_name)
|
|
if not is_valid:
|
|
st.error(message)
|
|
elif message != "Pattern is valid.":
|
|
st.warning(message)
|
|
else:
|
|
st.success("Pattern structure is valid")
|
|
|
|
edit_mode = st.radio(
|
|
"Edit Mode",
|
|
["Simple Editor", "Advanced (Wizard)"],
|
|
key=f"edit_mode_{pattern_name}",
|
|
horizontal=True
|
|
)
|
|
|
|
if edit_mode == "Simple Editor":
|
|
if os.path.exists(system_file):
|
|
with open(system_file) as f:
|
|
content = f.read()
|
|
new_content = st.text_area("Edit system.md", content, height=600)
|
|
if st.button("Save system.md"):
|
|
with open(system_file, "w") as f:
|
|
f.write(new_content)
|
|
st.success("Saved successfully!")
|
|
else:
|
|
st.error("system.md file not found")
|
|
|
|
if os.path.exists(user_file):
|
|
with open(user_file) as f:
|
|
content = f.read()
|
|
new_content = st.text_area("Edit user.md", content, height=300)
|
|
if st.button("Save user.md"):
|
|
with open(user_file, "w") as f:
|
|
f.write(new_content)
|
|
st.success("Saved successfully!")
|
|
|
|
else:
|
|
if os.path.exists(system_file):
|
|
with open(system_file) as f:
|
|
content = f.read()
|
|
|
|
sections = content.split("#")
|
|
edited_sections = []
|
|
|
|
for section in sections:
|
|
if not section.strip():
|
|
continue
|
|
|
|
lines = section.strip().split("\n", 1)
|
|
if len(lines) > 1:
|
|
title, content = lines
|
|
else:
|
|
title, content = lines[0], ""
|
|
|
|
st.markdown(f"#### {title}")
|
|
new_content = st.text_area(
|
|
f"Edit {title} section",
|
|
value=content.strip(),
|
|
height=200,
|
|
key=f"section_{title}"
|
|
)
|
|
edited_sections.append(f"# {title}\n\n{new_content}")
|
|
|
|
if st.button("Save Changes"):
|
|
new_content = "\n\n".join(edited_sections)
|
|
with open(system_file, "w") as f:
|
|
f.write(new_content)
|
|
st.success("Changes saved successfully!")
|
|
|
|
is_valid, message = validate_pattern(pattern_name)
|
|
if not is_valid:
|
|
st.error(message)
|
|
elif message != "Pattern is valid.":
|
|
st.warning(message)
|
|
else:
|
|
st.error("system.md file not found")
|
|
|
|
def get_outputs_dir() -> str:
|
|
"""Get the directory for storing outputs."""
|
|
outputs_dir = os.path.expanduser("~/.config/fabric/outputs")
|
|
os.makedirs(outputs_dir, exist_ok=True)
|
|
return outputs_dir
|
|
|
|
def save_outputs():
|
|
"""Save pattern outputs and starred outputs to files.
|
|
|
|
Error handling:
|
|
- Creates output directory if it doesn't exist
|
|
- Handles file write permissions
|
|
- Handles JSON serialization errors
|
|
- Logs all errors for debugging
|
|
"""
|
|
logger.info("Saving outputs to persistent storage")
|
|
outputs_dir = get_outputs_dir()
|
|
|
|
output_logs_file = os.path.join(outputs_dir, "output_logs.json")
|
|
starred_outputs_file = os.path.join(outputs_dir, "starred_outputs.json")
|
|
|
|
try:
|
|
# Save output logs
|
|
with open(output_logs_file, "w") as f:
|
|
json.dump(st.session_state.output_logs, f, indent=2)
|
|
logger.debug(f"Saved output logs to {output_logs_file}")
|
|
|
|
# Save starred outputs
|
|
with open(starred_outputs_file, "w") as f:
|
|
json.dump(st.session_state.starred_outputs, f, indent=2)
|
|
logger.debug(f"Saved starred outputs to {starred_outputs_file}")
|
|
|
|
except PermissionError as e:
|
|
error_msg = f"Permission denied when saving outputs: {str(e)}"
|
|
logger.error(error_msg)
|
|
st.error(error_msg)
|
|
except json.JSONEncodeError as e:
|
|
error_msg = f"Error encoding outputs to JSON: {str(e)}"
|
|
logger.error(error_msg)
|
|
st.error(error_msg)
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error saving outputs: {str(e)}"
|
|
logger.error(error_msg)
|
|
st.error(error_msg)
|
|
|
|
def load_saved_outputs():
|
|
"""Load saved pattern outputs from files.
|
|
|
|
Error handling:
|
|
- Handles missing output files
|
|
- Handles corrupted JSON files
|
|
- Handles file read permissions
|
|
- Initializes empty state if files don't exist
|
|
"""
|
|
logger.info("Loading saved outputs")
|
|
outputs_dir = get_outputs_dir()
|
|
output_logs_file = os.path.join(outputs_dir, "output_logs.json")
|
|
starred_outputs_file = os.path.join(outputs_dir, "starred_outputs.json")
|
|
|
|
try:
|
|
# Load output logs
|
|
if os.path.exists(output_logs_file):
|
|
with open(output_logs_file, "r") as f:
|
|
st.session_state.output_logs = json.load(f)
|
|
logger.debug(f"Loaded output logs from {output_logs_file}")
|
|
|
|
# Load starred outputs
|
|
if os.path.exists(starred_outputs_file):
|
|
with open(starred_outputs_file, "r") as f:
|
|
st.session_state.starred_outputs = json.load(f)
|
|
logger.debug(f"Loaded starred outputs from {starred_outputs_file}")
|
|
|
|
except json.JSONDecodeError as e:
|
|
error_msg = f"Error decoding saved outputs (corrupted files): {str(e)}"
|
|
logger.error(error_msg)
|
|
st.error(error_msg)
|
|
# Initialize empty state
|
|
st.session_state.output_logs = []
|
|
st.session_state.starred_outputs = []
|
|
except PermissionError as e:
|
|
error_msg = f"Permission denied when loading outputs: {str(e)}"
|
|
logger.error(error_msg)
|
|
st.error(error_msg)
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error loading saved outputs: {str(e)}"
|
|
logger.error(error_msg)
|
|
st.error(error_msg)
|
|
# Initialize empty state
|
|
st.session_state.output_logs = []
|
|
st.session_state.starred_outputs = []
|
|
|
|
def handle_star_name_input(log_index: int, name: str):
|
|
"""Handle the starring process when a name is input.
|
|
|
|
Args:
|
|
log_index: Index of the output to star
|
|
name: Name to give the starred output
|
|
"""
|
|
try:
|
|
if star_output(log_index, name):
|
|
st.success("Output starred successfully!")
|
|
else:
|
|
st.error("Failed to star output. Please try again.")
|
|
except Exception as e:
|
|
logger.error(f"Error handling star name input: {str(e)}")
|
|
st.error(f"Error starring output: {str(e)}")
|
|
|
|
def execute_pattern_chain(patterns_sequence: List[str], initial_input: str) -> Dict:
|
|
"""Execute a sequence of patterns in a chain, passing output from each to the next.
|
|
|
|
Args:
|
|
patterns_sequence: List of pattern names to execute in sequence
|
|
initial_input: Initial input text to start the chain
|
|
|
|
Returns:
|
|
Dict containing results from each stage of the chain
|
|
"""
|
|
logger.info(f"Starting pattern chain execution with {len(patterns_sequence)} patterns")
|
|
chain_results = {
|
|
"sequence": patterns_sequence,
|
|
"stages": [],
|
|
"final_output": None,
|
|
"metadata": {
|
|
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|
"success": False
|
|
}
|
|
}
|
|
|
|
current_input = initial_input
|
|
|
|
try:
|
|
for i, pattern in enumerate(patterns_sequence, 1):
|
|
logger.info(f"Chain Stage {i}: Executing pattern '{pattern}'")
|
|
stage_result = {
|
|
"pattern": pattern,
|
|
"input": current_input,
|
|
"output": None,
|
|
"success": False,
|
|
"error": None
|
|
}
|
|
|
|
try:
|
|
cmd = ["fabric", "--pattern", pattern]
|
|
result = run(cmd, input=current_input, capture_output=True, text=True, check=True)
|
|
output = result.stdout.strip()
|
|
|
|
if output:
|
|
stage_result["output"] = output
|
|
stage_result["success"] = True
|
|
current_input = output # Use this output as input for next pattern
|
|
logger.debug(f"Stage {i} completed successfully")
|
|
else:
|
|
stage_result["error"] = "Pattern generated no output"
|
|
logger.warning(f"Pattern {pattern} generated no output")
|
|
|
|
except CalledProcessError as e:
|
|
error_msg = f"Error executing pattern: {e.stderr.strip()}"
|
|
stage_result["error"] = error_msg
|
|
logger.error(error_msg)
|
|
break
|
|
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error: {str(e)}"
|
|
stage_result["error"] = error_msg
|
|
logger.error(error_msg)
|
|
break
|
|
|
|
chain_results["stages"].append(stage_result)
|
|
|
|
# Save stage output to logs
|
|
save_output_log(
|
|
pattern,
|
|
stage_result["input"],
|
|
stage_result["output"] or stage_result["error"],
|
|
chain_results["metadata"]["timestamp"]
|
|
)
|
|
|
|
# Set final output and success status
|
|
successful_stages = [s for s in chain_results["stages"] if s["success"]]
|
|
if successful_stages:
|
|
chain_results["final_output"] = successful_stages[-1]["output"]
|
|
chain_results["metadata"]["success"] = True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Chain execution failed: {str(e)}", exc_info=True)
|
|
chain_results["metadata"]["error"] = str(e)
|
|
|
|
return chain_results
|
|
|
|
def enhance_input_preview():
|
|
"""Display a preview of the input content with basic statistics.
|
|
|
|
Shows:
|
|
- Input text preview
|
|
- Character count
|
|
- Word count
|
|
"""
|
|
if 'input_content' in st.session_state and st.session_state.input_content:
|
|
with st.expander("Input Preview", expanded=True):
|
|
st.markdown("### Current Input")
|
|
st.code(st.session_state.input_content, language="text")
|
|
|
|
# Basic statistics
|
|
char_count = len(st.session_state.input_content)
|
|
word_count = len(st.session_state.input_content.split())
|
|
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
st.metric("Characters", char_count)
|
|
with col2:
|
|
st.metric("Words", word_count)
|
|
|
|
def get_clipboard_content() -> Tuple[bool, str, str]:
|
|
"""Get content from clipboard with proper error handling.
|
|
|
|
Returns:
|
|
Tuple[bool, str, str]: (success, content, error_message)
|
|
"""
|
|
try:
|
|
result = run(
|
|
["xclip", "-selection", "clipboard", "-o"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True
|
|
)
|
|
content = result.stdout
|
|
# Validate the content is proper UTF-8
|
|
try:
|
|
content.encode('utf-8').decode('utf-8')
|
|
return True, content, ""
|
|
except UnicodeError:
|
|
return False, "", "Clipboard contains invalid Unicode characters"
|
|
except FileNotFoundError:
|
|
return False, "", "xclip is not installed. Please install it with: sudo apt-get install xclip"
|
|
except CalledProcessError as e:
|
|
return False, "", f"Failed to read clipboard: {e.stderr}"
|
|
except Exception as e:
|
|
return False, "", f"Unexpected error reading clipboard: {str(e)}"
|
|
|
|
def set_clipboard_content(content: str) -> Tuple[bool, str]:
|
|
"""Set content to clipboard with proper error handling.
|
|
|
|
Args:
|
|
content: The content to copy to clipboard
|
|
|
|
Returns:
|
|
Tuple[bool, str]: (success, error_message)
|
|
"""
|
|
try:
|
|
# Validate content is proper UTF-8 before attempting to copy
|
|
try:
|
|
input_bytes = content.encode('utf-8')
|
|
except UnicodeError:
|
|
return False, "Content contains invalid Unicode characters"
|
|
|
|
run(
|
|
["xclip", "-selection", "clipboard"],
|
|
input=input_bytes,
|
|
check=True
|
|
)
|
|
return True, ""
|
|
except FileNotFoundError:
|
|
return False, "xclip is not installed. Please install it with: sudo apt-get install xclip"
|
|
except CalledProcessError as e:
|
|
return False, f"Failed to copy to clipboard: {e.stderr}"
|
|
except Exception as e:
|
|
return False, f"Unexpected error copying to clipboard: {str(e)}"
|
|
|
|
def main():
|
|
"""Main function to run the Streamlit app."""
|
|
logger.info("Starting Fabric Pattern Studio")
|
|
try:
|
|
# Set page config
|
|
st.set_page_config(
|
|
page_title="Fabric Pattern Studio",
|
|
page_icon="🧬",
|
|
layout="wide",
|
|
initial_sidebar_state="expanded"
|
|
)
|
|
|
|
# Add title with gradient styling and footer signature
|
|
st.markdown("""
|
|
<style>
|
|
[data-testid="stHeader"] {
|
|
background-color: rgba(0,0,0,0);
|
|
}
|
|
.fabric-header {
|
|
padding: 1rem;
|
|
margin-bottom: 1rem;
|
|
background: linear-gradient(90deg, rgba(155, 108, 255, 0.1) 0%, rgba(76, 181, 255, 0.1) 100%);
|
|
border-radius: 8px;
|
|
}
|
|
.fabric-title {
|
|
font-size: 2.5em;
|
|
margin: 0;
|
|
background: linear-gradient(90deg, #9B6CFF 0%, #4CB5FF 100%);
|
|
-webkit-background-clip: text;
|
|
-webkit-text-fill-color: transparent;
|
|
font-weight: 600;
|
|
text-align: center;
|
|
}
|
|
.assistant-container {
|
|
position: fixed;
|
|
right: 20px;
|
|
bottom: 40px;
|
|
display: flex;
|
|
flex-direction: column;
|
|
align-items: center;
|
|
gap: 8px;
|
|
z-index: 1000;
|
|
}
|
|
.assistant-avatar {
|
|
width: 42px;
|
|
height: 42px;
|
|
background: rgba(155, 108, 255, 0.05);
|
|
border: 2px solid rgba(155, 108, 255, 0.1);
|
|
border-radius: 50%;
|
|
display: flex;
|
|
align-items: center;
|
|
justify-content: center;
|
|
cursor: pointer;
|
|
transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1);
|
|
backdrop-filter: blur(8px);
|
|
-webkit-backdrop-filter: blur(8px);
|
|
}
|
|
.assistant-avatar:hover {
|
|
background: rgba(155, 108, 255, 0.1);
|
|
border-color: rgba(155, 108, 255, 0.2);
|
|
transform: translateY(-2px);
|
|
}
|
|
.assistant-avatar::before {
|
|
content: "🤖";
|
|
font-size: 20px;
|
|
opacity: 0.7;
|
|
transition: opacity 0.3s ease;
|
|
}
|
|
.assistant-avatar:hover::before {
|
|
opacity: 0.9;
|
|
}
|
|
.signature {
|
|
position: fixed;
|
|
right: 10px;
|
|
bottom: 10px;
|
|
font-size: 0.7em;
|
|
color: rgba(155, 108, 255, 0.3);
|
|
z-index: 999;
|
|
text-decoration: none;
|
|
font-family: monospace;
|
|
}
|
|
.signature:hover {
|
|
color: rgba(155, 108, 255, 0.8);
|
|
transition: color 0.3s ease;
|
|
}
|
|
.stApp {
|
|
background: linear-gradient(180deg, rgba(25, 25, 35, 0.95) 0%, rgba(35, 35, 45, 0.95) 100%);
|
|
}
|
|
</style>
|
|
<div class="fabric-header">
|
|
<h1 class="fabric-title">Pattern Studio</h1>
|
|
</div>
|
|
<div class="assistant-container">
|
|
<div class="assistant-avatar" onclick="window.open('https://github.com/danielmiessler/fabric', '_blank')"></div>
|
|
</div>
|
|
<a href="https://github.com/sosacrazy126" target="_blank" class="signature">made by zo6</a>
|
|
""", unsafe_allow_html=True)
|
|
|
|
initialize_session_state()
|
|
|
|
if not st.session_state.config_loaded:
|
|
logger.info("Loading initial configuration")
|
|
success = load_configuration()
|
|
if not success:
|
|
logger.error("Failed to load configuration")
|
|
st.error("Failed to load configuration. Please check your .env file.")
|
|
st.stop()
|
|
|
|
with st.sidebar:
|
|
# Add GitHub link
|
|
st.markdown("""
|
|
<div style='text-align: center; margin-bottom: 1rem;'>
|
|
<a href="https://github.com/danielmiessler/fabric" target="_blank">
|
|
<img src="https://img.shields.io/github/stars/danielmiessler/fabric?style=social" alt="GitHub Repo">
|
|
</a>
|
|
</div>
|
|
""",
|
|
unsafe_allow_html=True
|
|
)
|
|
|
|
st.title("Configuration")
|
|
load_models_and_providers()
|
|
|
|
st.markdown("---")
|
|
st.title("Navigation")
|
|
view = st.radio(
|
|
"Select View",
|
|
["Run Patterns", "Pattern Management", "Analysis Dashboard"],
|
|
key="view_selector"
|
|
)
|
|
logger.debug(f"Selected view: {view}")
|
|
|
|
if view != st.session_state.get("current_view"):
|
|
st.session_state["current_view"] = view
|
|
|
|
if view == "Run Patterns":
|
|
patterns = get_patterns()
|
|
logger.debug(f"Available patterns: {patterns}")
|
|
|
|
if not patterns:
|
|
logger.warning("No patterns available")
|
|
st.warning("No patterns available. Create a pattern first.")
|
|
return
|
|
|
|
tabs = st.tabs(["Run", "Analysis"])
|
|
|
|
with tabs[0]:
|
|
st.header("Run Patterns")
|
|
selected_patterns = st.multiselect(
|
|
"Select Patterns to Run",
|
|
patterns,
|
|
default=st.session_state.selected_patterns,
|
|
key="selected_patterns_widget"
|
|
)
|
|
st.session_state.selected_patterns = selected_patterns
|
|
|
|
if selected_patterns:
|
|
for pattern in selected_patterns:
|
|
with st.expander(f"📝 {pattern} Details", expanded=False):
|
|
metadata = get_pattern_metadata(pattern)
|
|
if metadata:
|
|
st.markdown(metadata)
|
|
else:
|
|
st.info("No description available")
|
|
|
|
st.subheader("Input")
|
|
input_method = st.radio(
|
|
"Input Method",
|
|
["Clipboard", "Manual Input"],
|
|
horizontal=True
|
|
)
|
|
|
|
if input_method == "Clipboard":
|
|
col_load, col_preview = st.columns([2, 1])
|
|
with col_load:
|
|
if st.button("📋 Load from Clipboard", use_container_width=True):
|
|
success, content, error = get_clipboard_content()
|
|
if success:
|
|
# Validate clipboard content
|
|
is_valid, error_message = validate_input_content(content)
|
|
if not is_valid:
|
|
st.error(f"Invalid clipboard content: {error_message}")
|
|
else:
|
|
# Sanitize clipboard content
|
|
sanitized_content = sanitize_input_content(content)
|
|
if sanitized_content != content:
|
|
st.warning("Clipboard content was automatically sanitized for better compatibility.")
|
|
|
|
st.session_state.input_content = sanitized_content
|
|
st.session_state.show_preview = True
|
|
st.success("Content loaded from clipboard!")
|
|
else:
|
|
st.error(error)
|
|
|
|
with col_preview:
|
|
if st.button("👁 Toggle Preview", use_container_width=True):
|
|
st.session_state.show_preview = not st.session_state.get('show_preview', False)
|
|
else:
|
|
st.session_state.input_content = st.text_area(
|
|
"Enter Input Text",
|
|
value=st.session_state.get('input_content', ''),
|
|
height=200
|
|
)
|
|
|
|
if st.session_state.get('show_preview', False) or input_method == "Manual Input":
|
|
if st.session_state.get('input_content'):
|
|
enhance_input_preview()
|
|
|
|
# Move chain mode checkbox before the run button
|
|
chain_mode = st.checkbox(
|
|
"Chain Mode",
|
|
help="Execute patterns in sequence, passing output of each pattern as input to the next"
|
|
)
|
|
|
|
if chain_mode and len(selected_patterns) > 1:
|
|
st.info("Patterns will be executed in the order selected above")
|
|
st.markdown("##### Drag to reorder patterns:")
|
|
# Convert patterns list to DataFrame for data editor
|
|
patterns_df = pd.DataFrame({
|
|
"Pattern": selected_patterns
|
|
})
|
|
|
|
edited_df = st.data_editor(
|
|
patterns_df,
|
|
use_container_width=True,
|
|
key="pattern_reorder",
|
|
hide_index=True,
|
|
column_config={
|
|
"Pattern": st.column_config.TextColumn(
|
|
"Pattern",
|
|
help="Drag to reorder patterns"
|
|
)
|
|
}
|
|
)
|
|
|
|
# Update selected patterns if order changed
|
|
new_patterns = edited_df["Pattern"].tolist()
|
|
if new_patterns != selected_patterns:
|
|
st.session_state.selected_patterns = new_patterns
|
|
|
|
col1, col2 = st.columns([3, 1])
|
|
with col1:
|
|
if st.button("🚀 Run Patterns", type="primary", use_container_width=True):
|
|
if not st.session_state.input_content:
|
|
st.warning("Please provide input content.")
|
|
else:
|
|
with st.spinner("Running patterns..."):
|
|
if chain_mode:
|
|
# Execute pattern chain
|
|
chain_results = execute_pattern_chain(
|
|
selected_patterns,
|
|
st.session_state.input_content
|
|
)
|
|
|
|
# Display chain results
|
|
st.markdown("## Chain Execution Results")
|
|
|
|
# Show sequence
|
|
st.markdown("### Pattern Sequence")
|
|
st.code(" → ".join(chain_results["sequence"]))
|
|
|
|
# Show each stage
|
|
st.markdown("### Execution Stages")
|
|
for i, stage in enumerate(chain_results["stages"], 1):
|
|
with st.expander(f"Stage {i}: {stage['pattern']}", expanded=False):
|
|
st.markdown("#### Input")
|
|
st.code(stage["input"])
|
|
st.markdown("#### Output")
|
|
if stage["success"]:
|
|
st.markdown(stage["output"])
|
|
else:
|
|
st.error(stage["error"])
|
|
|
|
# Show final output
|
|
if chain_results["metadata"]["success"]:
|
|
st.markdown("### Final Output")
|
|
st.markdown(chain_results["final_output"])
|
|
st.session_state.chat_output.append(chain_results["final_output"])
|
|
else:
|
|
st.error("Chain execution failed. Check the stages above for details.")
|
|
else:
|
|
# Normal pattern execution
|
|
outputs = execute_patterns(selected_patterns)
|
|
st.session_state.chat_output.extend(outputs)
|
|
|
|
# Display outputs after execution
|
|
if st.session_state.chat_output:
|
|
st.markdown("---")
|
|
st.header("Pattern Outputs")
|
|
for message in st.session_state.chat_output:
|
|
st.markdown(message)
|
|
st.markdown("---") # Add separator between outputs
|
|
|
|
# Output Actions
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
if st.button("📋 Copy All Outputs"):
|
|
all_outputs = "\n\n".join(st.session_state.chat_output)
|
|
success, error = set_clipboard_content(all_outputs)
|
|
if success:
|
|
st.success("All outputs copied to clipboard!")
|
|
else:
|
|
st.error(error)
|
|
|
|
with col2:
|
|
if st.button("❌ Clear Outputs"):
|
|
st.session_state.chat_output = []
|
|
st.success("Outputs cleared!")
|
|
st.experimental_rerun()
|
|
|
|
with col2:
|
|
st.write("") # Empty space for layout balance
|
|
|
|
else:
|
|
st.info("Select one or more patterns to run.")
|
|
|
|
with tabs[1]:
|
|
st.header("Output Analysis")
|
|
if st.session_state.chat_output:
|
|
# Display pattern outputs in chronological order
|
|
for i, output in enumerate(reversed(st.session_state.chat_output), 1):
|
|
with st.expander(f"Output #{i}", expanded=False):
|
|
st.markdown(output)
|
|
else:
|
|
st.info("Run some patterns to see output analysis.")
|
|
|
|
elif view == "Pattern Management":
|
|
create_tab, edit_tab, delete_tab = st.tabs(["Create", "Edit", "Delete"])
|
|
|
|
with create_tab:
|
|
st.header("Create New Pattern")
|
|
creation_mode = st.radio(
|
|
"Creation Mode",
|
|
["Simple Editor", "Advanced (Wizard)"],
|
|
key="creation_mode_main",
|
|
horizontal=True
|
|
)
|
|
|
|
if creation_mode == "Simple Editor":
|
|
pattern_creation_ui()
|
|
else:
|
|
pattern_creation_wizard()
|
|
|
|
with edit_tab:
|
|
st.header("Edit Patterns")
|
|
patterns = get_patterns()
|
|
if not patterns:
|
|
st.warning("No patterns available. Create a pattern first.")
|
|
else:
|
|
selected_pattern = st.selectbox("Select Pattern to Edit", [""] + patterns)
|
|
if selected_pattern:
|
|
pattern_editor(selected_pattern)
|
|
|
|
with delete_tab:
|
|
st.header("Delete Patterns")
|
|
patterns = get_patterns()
|
|
if not patterns:
|
|
st.warning("No patterns available.")
|
|
else:
|
|
patterns_to_delete = st.multiselect(
|
|
"Select Patterns to Delete",
|
|
patterns,
|
|
key="delete_patterns_selector"
|
|
)
|
|
|
|
if patterns_to_delete:
|
|
st.warning(f"You are about to delete {len(patterns_to_delete)} pattern(s):")
|
|
for pattern in patterns_to_delete:
|
|
st.markdown(f"- {pattern}")
|
|
|
|
confirm_delete = st.checkbox("I understand that this action cannot be undone")
|
|
|
|
if st.button("🗑️ Delete Selected Patterns", type="primary", disabled=not confirm_delete):
|
|
if confirm_delete:
|
|
for pattern in patterns_to_delete:
|
|
success, message = delete_pattern(pattern)
|
|
if success:
|
|
st.success(f"✓ {pattern}: {message}")
|
|
else:
|
|
st.error(f"✗ {pattern}: {message}")
|
|
st.experimental_rerun()
|
|
else:
|
|
st.error("Please confirm deletion by checking the box above.")
|
|
else:
|
|
st.info("Select one or more patterns to delete.")
|
|
|
|
else:
|
|
st.header("Pattern Output History")
|
|
|
|
# Create tabs for All Outputs and Starred Outputs
|
|
all_tab, starred_tab = st.tabs(["All Outputs", "⭐ Starred"])
|
|
|
|
with all_tab:
|
|
if not st.session_state.output_logs:
|
|
st.info("No pattern outputs recorded yet. Run some patterns to see their logs here.")
|
|
else:
|
|
for i, log in enumerate(reversed(st.session_state.output_logs)):
|
|
with st.expander(
|
|
f"Output #{len(st.session_state.output_logs)-i} - {log['pattern_name']} ({log['timestamp']})",
|
|
expanded=False
|
|
):
|
|
st.markdown("### Input")
|
|
st.code(log["input"], language="text")
|
|
st.markdown("### Output")
|
|
st.markdown(log["output"])
|
|
|
|
# Check if this output is already starred
|
|
is_starred = any(s["timestamp"] == log["timestamp"] for s in st.session_state.starred_outputs)
|
|
|
|
col1, col2 = st.columns([1, 4])
|
|
with col1:
|
|
if not is_starred:
|
|
if st.button("⭐ Star", key=f"star_{i}", use_container_width=True):
|
|
st.session_state.starring_output = len(st.session_state.output_logs) - i - 1
|
|
st.session_state.temp_star_name = ""
|
|
else:
|
|
st.write("⭐ Starred")
|
|
|
|
with col2:
|
|
if st.button("📋 Copy Output", key=f"copy_{i}"):
|
|
success, error = set_clipboard_content(log["output"])
|
|
if success:
|
|
st.success("Output copied to clipboard!")
|
|
else:
|
|
st.error(error)
|
|
|
|
# Show starring form inside the expander if this is the output being starred
|
|
if st.session_state.starring_output == len(st.session_state.output_logs) - i - 1:
|
|
st.markdown("---")
|
|
with st.form(key=f"star_name_form_{i}"):
|
|
name_input = st.text_input(
|
|
"Enter a name for this output (optional):",
|
|
key=f"star_name_input_{i}"
|
|
)
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
submit = st.form_submit_button("Save", use_container_width=True)
|
|
with col2:
|
|
cancel = st.form_submit_button("Cancel", use_container_width=True)
|
|
|
|
if submit:
|
|
handle_star_name_input(st.session_state.starring_output, name_input)
|
|
# Reset starring state after handling
|
|
st.session_state.starring_output = None
|
|
st.experimental_rerun()
|
|
elif cancel:
|
|
# Reset starring state
|
|
st.session_state.starring_output = None
|
|
st.experimental_rerun()
|
|
|
|
# Remove the old starring form from the bottom
|
|
st.markdown("---")
|
|
|
|
with starred_tab:
|
|
if not st.session_state.starred_outputs:
|
|
st.info("No starred outputs yet. Star some outputs to see them here!")
|
|
else:
|
|
for i, starred in enumerate(st.session_state.starred_outputs):
|
|
with st.expander(
|
|
f"⭐ {starred.get('custom_name', f'Starred Output #{i+1}')} ({starred['timestamp']})",
|
|
expanded=False
|
|
):
|
|
col1, col2 = st.columns([3, 1])
|
|
with col1:
|
|
st.markdown(f"### {starred.get('custom_name', f'Starred Output #{i+1}')}")
|
|
with col2:
|
|
if st.button("✏️ Edit Name", key=f"edit_name_{i}"):
|
|
st.session_state[f"editing_name_{i}"] = True
|
|
|
|
if st.session_state.get(f"editing_name_{i}", False):
|
|
new_name = st.text_input(
|
|
"Enter new name:",
|
|
value=starred.get('custom_name', ''),
|
|
key=f"new_name_{i}"
|
|
)
|
|
col1, col2 = st.columns([1, 1])
|
|
with col1:
|
|
if st.button("Save", key=f"save_name_{i}"):
|
|
st.session_state.starred_outputs[i]['custom_name'] = new_name
|
|
del st.session_state[f"editing_name_{i}"]
|
|
st.success("Name updated!")
|
|
st.experimental_rerun()
|
|
with col2:
|
|
if st.button("Cancel", key=f"cancel_name_{i}"):
|
|
del st.session_state[f"editing_name_{i}"]
|
|
st.experimental_rerun()
|
|
|
|
st.markdown("### Pattern")
|
|
st.code(starred["pattern_name"], language="text")
|
|
st.markdown("### Input")
|
|
st.code(starred["input"], language="text") # Display input as code block
|
|
st.markdown("### Output")
|
|
st.markdown(starred["output"]) # Display output as markdown
|
|
|
|
col1, col2 = st.columns([1, 4])
|
|
with col1:
|
|
if st.button("❌ Remove Star", key=f"unstar_{i}"):
|
|
unstar_output(i)
|
|
st.success("Output unstarred!")
|
|
st.experimental_rerun()
|
|
|
|
with col2:
|
|
if st.button("📋 Copy Output", key=f"copy_starred_{i}"):
|
|
try:
|
|
run(["xclip", "-selection", "clipboard"], input=starred["output"].encode(), check=True)
|
|
st.success("Output copied to clipboard!")
|
|
except Exception as e:
|
|
st.error(f"Error copying to clipboard: {e}")
|
|
|
|
if st.button("Clear All Starred"):
|
|
if st.checkbox("Confirm clearing all starred outputs"):
|
|
st.session_state.starred_outputs = []
|
|
save_outputs() # Save after clearing
|
|
st.success("All starred outputs cleared!")
|
|
st.experimental_rerun()
|
|
|
|
except Exception as e:
|
|
logger.error("Unexpected error in main function", exc_info=True)
|
|
st.error(f"An unexpected error occurred: {str(e)}")
|
|
st.stop()
|
|
|
|
if __name__ == "__main__":
|
|
logger.info("Application startup")
|
|
main() |