import shutil import json import os import streamlit as st from subprocess import run, CalledProcessError from dotenv import load_dotenv import re import time import logging from typing import Dict, List, Optional, Tuple from datetime import datetime import sys import pandas as pd import matplotlib.pyplot as plt import seaborn as sns import numpy as np # Create formatters console_formatter = logging.Formatter( "\033[92m%(asctime)s\033[0m - " # Green timestamp "\033[94m%(levelname)s\033[0m - " # Blue level "\033[95m[%(funcName)s]\033[0m " # Purple function name "%(message)s" # Regular message ) file_formatter = logging.Formatter( "%(asctime)s - %(levelname)s - [%(funcName)s] %(message)s" ) # Configure root logger logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) # Clear any existing handlers logger.handlers = [] # Console Handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(console_formatter) console_handler.setLevel(logging.INFO) logger.addHandler(console_handler) # File Handler log_dir = os.path.expanduser("~/.config/fabric/logs") os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f"fabric_ui_{datetime.now().strftime('%Y%m%d')}.log") file_handler = logging.FileHandler(log_file) file_handler.setFormatter(file_formatter) file_handler.setLevel(logging.DEBUG) # More detailed logging in file logger.addHandler(file_handler) # Detect operating system PLATFORM = sys.platform # Log startup message logger.info("šŸš€ Fabric UI Starting Up") logger.info(f"šŸ’¾ Log file: {log_file}") logger.info(f"šŸ–„ļø Platform detected: {PLATFORM}") # Global variables pattern_dir = os.path.expanduser("~/.config/fabric/patterns") MAX_RETRIES = 3 RETRY_DELAY = 1 # seconds def initialize_session_state(): """Initialize necessary session state attributes. Error handling: - Ensures all required session state variables are initialized - Loads saved outputs from persistent storage - Handles missing or corrupted saved output files """ logger.info("Initializing session state") default_configs = { # Configuration state "config_loaded": False, "vendors": {}, "available_models": [], "selected_vendor": None, "selected_model": None, # Pattern execution state "input_content": "", "selected_patterns": [], "chat_output": [], "current_view": "run", # Pattern creation state "wizard_step": "Basic Info", "session_name": "", "context_name": "", # Model configuration "config": {"vendor": "", "model": "", "context_length": "2048"}, # Model caching "cached_models": None, "last_model_fetch": 0, # UI state "active_tab": 0, # Output management "output_logs": [], "starred_outputs": [], "starring_output": None, "temp_star_name": "", } for key, value in default_configs.items(): if key not in st.session_state: st.session_state[key] = value # Load saved outputs if they exist load_saved_outputs() def parse_models_output(output: str) -> Dict[str, List[str]]: """Parse the output of fabric --listmodels command.""" logger.debug("Parsing models output") providers = {} current_provider = None lines = output.split("\n") for line in lines: line = line.strip() if not line: continue if line == "Available models:": continue if not line.startswith("\t") and not line.startswith("["): current_provider = line.strip() providers[current_provider] = [] elif current_provider and (line.startswith("\t") or line.startswith("[")): model = line.strip() if "[" in model and "]" in model: model = model.split("]", 1)[1].strip() providers[current_provider].append(model) logger.debug(f"Found providers: {list(providers.keys())}") return providers def safe_run_command(command: List[str], retry: bool = True) -> Tuple[bool, str, str]: """Safely run a command with retries.""" cmd_str = " ".join(command) logger.info(f"Executing command: {cmd_str}") for attempt in range(MAX_RETRIES if retry else 1): try: logger.debug(f"Attempt {attempt + 1}/{MAX_RETRIES if retry else 1}") result = run(command, capture_output=True, text=True) if result.returncode == 0: logger.debug("Command executed successfully") return True, result.stdout, "" if attempt == MAX_RETRIES - 1 or not retry: logger.error( f"Command failed with return code {result.returncode}: {result.stderr}" ) return False, "", result.stderr except Exception as e: if attempt == MAX_RETRIES - 1 or not retry: logger.error(f"Command execution failed: {str(e)}") return False, "", str(e) logger.debug(f"Retrying in {RETRY_DELAY} seconds...") time.sleep(RETRY_DELAY) logger.error("Max retries exceeded") return False, "", "Max retries exceeded" def fetch_models_once() -> Dict[str, List[str]]: """Fetch models once and cache the results.""" logger.info("Fetching models") current_time = time.time() cache_timeout = 300 # 5 minutes if ( st.session_state.cached_models is not None and current_time - st.session_state.last_model_fetch < cache_timeout ): logger.debug("Using cached models") return st.session_state.cached_models logger.debug("Cache expired or not available, fetching new models") success, stdout, stderr = safe_run_command(["fabric", "--listmodels"]) if not success: logger.error(f"Failed to fetch models: {stderr}") st.error(f"Failed to fetch models: {stderr}") return {} providers = parse_models_output(stdout) logger.info(f"Found {len(providers)} providers") st.session_state.cached_models = providers st.session_state.last_model_fetch = current_time return providers def get_configured_providers() -> Dict[str, List[str]]: """Get list of configured providers using fabric --listmodels.""" return fetch_models_once() def update_provider_selection(new_provider: str) -> None: """Update provider and reset related states.""" logger.info(f"Updating provider selection to: {new_provider}") if new_provider != st.session_state.config["vendor"]: logger.debug("Provider changed, resetting model selection") st.session_state.config["vendor"] = new_provider st.session_state.selected_vendor = new_provider st.session_state.config["model"] = None st.session_state.selected_model = None st.session_state.available_models = [] if "model_select" in st.session_state: del st.session_state.model_select logger.debug("Model state reset completed") def load_configuration() -> bool: """Load environment variables and initialize configuration.""" logger.info("Loading configuration") try: env_path = os.path.expanduser("~/.config/fabric/.env") logger.debug(f"Looking for .env file at: {env_path}") if not os.path.exists(env_path): logger.error(f"Configuration file not found at {env_path}") st.error(f"Configuration file not found at {env_path}") return False load_dotenv(dotenv_path=env_path) logger.debug("Environment variables loaded") with st.spinner("Loading providers and models..."): providers = get_configured_providers() if not providers: logger.error("No providers configured") st.error("No providers configured. Please run 'fabric --setup' first.") return False default_vendor = os.getenv("DEFAULT_VENDOR") default_model = os.getenv("DEFAULT_MODEL") context_length = os.getenv("DEFAULT_MODEL_CONTEXT_LENGTH", "2048") logger.debug( f"Default configuration - Vendor: {default_vendor}, Model: {default_model}" ) if not default_vendor or default_vendor not in providers: default_vendor = next(iter(providers)) default_model = ( providers[default_vendor][0] if providers[default_vendor] else None ) logger.info( f"Using fallback configuration - Vendor: {default_vendor}, Model: {default_model}" ) st.session_state.config = { "vendor": default_vendor, "model": default_model, "context_length": context_length, } st.session_state.vendors = providers st.session_state.config_loaded = True logger.info("Configuration loaded successfully") return True except Exception as e: logger.error(f"Configuration error: {str(e)}", exc_info=True) st.error(f"Configuration error: {str(e)}") return False def load_models_and_providers() -> None: """Load models and providers from fabric configuration.""" try: st.sidebar.header("Model and Provider Selection") providers: Dict[str, List[str]] = fetch_models_once() if not providers: st.sidebar.error("No providers configured") return current_vendor = st.session_state.config.get("vendor", "") available_providers = list(providers.keys()) try: provider_index = ( available_providers.index(current_vendor) if current_vendor in available_providers else 0 ) except ValueError: provider_index = 0 logger.warning( f"Current vendor {current_vendor} not found in available providers" ) selected_provider = st.sidebar.selectbox( "Provider", available_providers, index=provider_index, key="provider_select", on_change=lambda: update_provider_selection( st.session_state.provider_select ), ) if selected_provider != st.session_state.config.get("vendor"): update_provider_selection(selected_provider) st.sidebar.success(f"Using {selected_provider}") available_models = providers.get(selected_provider, []) if not available_models: st.sidebar.warning(f"No models available for {selected_provider}") return current_model = st.session_state.config.get("model") try: model_index = ( available_models.index(current_model) if current_model in available_models else 0 ) except ValueError: model_index = 0 logger.warning( f"Current model {current_model} not found in available models for {selected_provider}" ) model_key = f"model_select_{selected_provider}" selected_model = st.sidebar.selectbox( "Model", available_models, index=model_index, key=model_key ) if selected_model != st.session_state.config.get("model"): logger.debug(f"Updating model selection to: {selected_model}") st.session_state.config["model"] = selected_model st.session_state.selected_model = selected_model except Exception as e: logger.error(f"Error loading models and providers: {str(e)}", exc_info=True) st.sidebar.error(f"Error loading models and providers: {str(e)}") st.session_state.selected_model = None st.session_state.config["model"] = None def get_pattern_metadata(pattern_name): """Get pattern metadata from system.md.""" pattern_path = os.path.join(pattern_dir, pattern_name, "system.md") if os.path.exists(pattern_path): with open(pattern_path, "r") as f: return f.read() return None def get_patterns(): """Get the list of available patterns from the specified directory.""" if not os.path.exists(pattern_dir): st.error(f"Pattern directory not found: {pattern_dir}") return [] try: patterns = [ item for item in os.listdir(pattern_dir) if os.path.isdir(os.path.join(pattern_dir, item)) ] return patterns except PermissionError: st.error(f"Permission error accessing pattern directory: {pattern_dir}") return [] except Exception as e: st.error(f"An unexpected error occurred: {e}") return [] def create_pattern( pattern_name: str, content: Optional[str] = None ) -> Tuple[bool, str]: """Create a new pattern with necessary files and structure.""" new_pattern_path = None try: # Validate pattern name if not pattern_name: logger.error("Pattern name cannot be empty") return False, "Pattern name cannot be empty." # Check if pattern already exists new_pattern_path = os.path.join(pattern_dir, pattern_name) if os.path.exists(new_pattern_path): logger.error(f"Pattern {pattern_name} already exists") return False, "Pattern already exists." # Create pattern directory os.makedirs(new_pattern_path) logger.info(f"Created pattern directory: {new_pattern_path}") # If content is provided, use fabric create_pattern to structure it if content: logger.info( f"Structuring content for pattern '{pattern_name}' using Fabric" ) try: # Get current model and provider configuration current_provider = st.session_state.config.get("vendor") current_model = st.session_state.config.get("model") if not current_provider or not current_model: raise ValueError("Please select a provider and model first.") # Execute fabric create_pattern with input content cmd = ["fabric", "--pattern", "create_pattern"] if current_provider and current_model: cmd.extend(["--vendor", current_provider, "--model", current_model]) logger.debug(f"Running command: {' '.join(cmd)}") logger.debug(f"Input content:\n{content}") # Execute pattern result = run( cmd, input=content, capture_output=True, text=True, check=True ) structured_content = result.stdout.strip() if not structured_content: raise ValueError("No output received from create_pattern") # Save the structured content to system.md system_file = os.path.join(new_pattern_path, "system.md") with open(system_file, "w") as f: f.write(structured_content) # Validate the created pattern is_valid, validation_message = validate_pattern(pattern_name) if not is_valid: raise ValueError(f"Pattern validation failed: {validation_message}") logger.info( f"Successfully created pattern '{pattern_name}' with structured content" ) except CalledProcessError as e: error_msg = f"Error running create_pattern: {e.stderr}" logger.error(error_msg) if os.path.exists(new_pattern_path): shutil.rmtree(new_pattern_path) return False, error_msg except Exception as e: error_msg = f"Unexpected error during content structuring: {str(e)}" logger.error(error_msg) if os.path.exists(new_pattern_path): shutil.rmtree(new_pattern_path) return False, error_msg else: # Create minimal template for manual editing logger.info(f"Creating minimal template for pattern '{pattern_name}'") system_file = os.path.join(new_pattern_path, "system.md") with open(system_file, "w") as f: f.write("# IDENTITY and PURPOSE\n\n# STEPS\n\n# OUTPUT INSTRUCTIONS\n") # Validate the created pattern is_valid, validation_message = validate_pattern(pattern_name) if not is_valid: logger.warning( f"Pattern created but validation failed: {validation_message}" ) return True, f"Pattern '{pattern_name}' created successfully." except Exception as e: error_msg = f"Error creating pattern: {str(e)}" logger.error(error_msg) # Clean up on any error if new_pattern_path and os.path.exists(new_pattern_path): shutil.rmtree(new_pattern_path) return False, error_msg def delete_pattern(pattern_name): """Delete an existing pattern.""" try: if not pattern_name: return False, "Pattern name cannot be empty." pattern_path = os.path.join(pattern_dir, pattern_name) if not os.path.exists(pattern_path): return False, "Pattern does not exist." shutil.rmtree(pattern_path) return True, f"Pattern '{pattern_name}' deleted successfully." except Exception as e: return False, f"Error deleting pattern: {str(e)}" def pattern_creation_wizard(): """Multi-step wizard for creating a new pattern.""" st.header("Create New Pattern") pattern_name = st.text_input("Pattern Name") if pattern_name: edit_mode = st.radio( "Edit Mode", ["Simple Editor", "Advanced (Wizard)"], key="pattern_creation_edit_mode", horizontal=True, ) if edit_mode == "Simple Editor": new_content = st.text_area("Enter Pattern Content", height=400) if st.button("Create Pattern", type="primary"): success, message = create_pattern(pattern_name, new_content) if success: st.success(message) st.experimental_rerun() else: st.error(message) else: sections = ["IDENTITY", "GOAL", "OUTPUT", "OUTPUT INSTRUCTIONS"] current_section = st.radio( "Edit Section", sections, key="pattern_creation_section_select" ) if current_section == "IDENTITY": identity = st.text_area("Define the IDENTITY", height=200) st.session_state.new_pattern_identity = identity elif current_section == "GOAL": goal = st.text_area("Define the GOAL", height=200) st.session_state.new_pattern_goal = goal elif current_section == "OUTPUT": output = st.text_area("Define the OUTPUT", height=200) st.session_state.new_pattern_output = output elif current_section == "OUTPUT INSTRUCTIONS": instructions = st.text_area( "Define the OUTPUT INSTRUCTIONS", height=200 ) st.session_state.new_pattern_instructions = instructions pattern_content = f"""# IDENTITY {st.session_state.get('new_pattern_identity', '')} # GOAL {st.session_state.get('new_pattern_goal', '')} # OUTPUT {st.session_state.get('new_pattern_output', '')} # OUTPUT INSTRUCTIONS {st.session_state.get('new_pattern_instructions', '')}""" if st.button("Create Pattern", type="primary"): success, message = create_pattern(pattern_name, pattern_content) if success: st.success(message) for key in [ "new_pattern_identity", "new_pattern_goal", "new_pattern_output", "new_pattern_instructions", ]: if key in st.session_state: del st.session_state[key] st.experimental_rerun() else: st.error(message) else: st.info("Enter a pattern name to create a new pattern") def bulk_edit_patterns(patterns_to_edit, field_to_update, new_value): """Perform bulk edits on multiple patterns.""" results = [] for pattern in patterns_to_edit: try: pattern_path = os.path.join(pattern_dir, pattern) system_file = os.path.join(pattern_path, "system.md") if not os.path.exists(system_file): results.append((pattern, False, "system.md not found")) continue with open(system_file, "r") as f: content = f.read() if field_to_update == "purpose": sections = content.split("#") updated_sections = [] for section in sections: if section.strip().startswith("IDENTITY and PURPOSE"): lines = section.split("\n") for i, line in enumerate(lines): if "You are an AI assistant designed to" in line: lines[i] = ( f"You are an AI assistant designed to {new_value}." ) updated_sections.append("\n".join(lines)) else: updated_sections.append(section) new_content = "#".join(updated_sections) with open(system_file, "w") as f: f.write(new_content) results.append((pattern, True, "Updated successfully")) else: results.append( ( pattern, False, f"Field {field_to_update} not supported for bulk edit", ) ) except Exception as e: results.append((pattern, False, str(e))) return results def pattern_creation_ui(): """UI component for creating patterns with simple and wizard modes.""" pattern_name = st.text_input("Pattern Name") if not pattern_name: st.info("Enter a pattern name to create a new pattern") return system_content = """# IDENTITY and PURPOSE You are an AI assistant designed to {purpose}. # STEPS - Step 1 - Step 2 - Step 3 # OUTPUT INSTRUCTIONS - Output format instructions here """ new_content = st.text_area("Edit Pattern Content", system_content, height=400) if st.button("Create Pattern", type="primary"): if not pattern_name: st.error("Pattern name cannot be empty.") else: success, message = create_pattern(pattern_name) if success: system_file = os.path.join(pattern_dir, pattern_name, "system.md") with open(system_file, "w") as f: f.write(new_content) st.success(f"Pattern '{pattern_name}' created successfully!") st.experimental_rerun() else: st.error(message) def pattern_management_ui(): """UI component for pattern management.""" st.sidebar.title("Pattern Management") def save_output_log( pattern_name: str, input_content: str, output_content: str, timestamp: str ): """Save pattern execution log.""" log_entry = { "timestamp": timestamp, "pattern_name": pattern_name, "input": input_content, "output": output_content, "is_starred": False, "custom_name": "", } st.session_state.output_logs.append(log_entry) # Save outputs after each new log entry save_outputs() def star_output(log_index: int, custom_name: str = "") -> bool: """Star/favorite an output log. Args: log_index: Index of the output log to star custom_name: Optional custom name for the starred output Returns: bool: True if output was starred successfully, False otherwise """ try: if 0 <= log_index < len(st.session_state.output_logs): log_entry = st.session_state.output_logs[log_index].copy() log_entry["is_starred"] = True log_entry["custom_name"] = ( custom_name or f"Starred Output #{len(st.session_state.starred_outputs) + 1}" ) # Check if this output is already starred (by timestamp) if not any( s["timestamp"] == log_entry["timestamp"] for s in st.session_state.starred_outputs ): st.session_state.starred_outputs.append(log_entry) save_outputs() # Save after starring return True return False except Exception as e: logger.error(f"Error starring output: {str(e)}") return False def unstar_output(log_index: int): """Remove an output from starred/favorites.""" if 0 <= log_index < len(st.session_state.starred_outputs): st.session_state.starred_outputs.pop(log_index) # Save outputs after unstarring save_outputs() def validate_input_content(input_text: str) -> Tuple[bool, str]: """Validate input content for potentially problematic characters or patterns. Args: input_text: The input text to validate Returns: Tuple[bool, str]: (is_valid, error_message) """ if not input_text or input_text.isspace(): return False, "Input content cannot be empty or only whitespace." # Check for minimum length if len(input_text.strip()) < 2: return False, "Input content must be at least 2 characters long." # Check for maximum length (e.g., 100KB) if len(input_text.encode("utf-8")) > 100 * 1024: return False, "Input content exceeds maximum size of 100KB." # Check for high concentration of special characters special_chars = set("!@#$%^&*()_+[]{}|\\;:'\",.<>?`~") special_char_count = sum(1 for c in input_text if c in special_chars) special_char_ratio = special_char_count / len(input_text) if special_char_ratio > 0.3: # More than 30% special characters return ( False, "Input contains too many special characters. Please check your input.", ) # Check for control characters control_chars = set( chr(i) for i in range(32) if i not in [9, 10, 13] ) # Allow tab, newline, carriage return if any(c in control_chars for c in input_text): return False, "Input contains invalid control characters." # Check for proper UTF-8 encoding try: input_text.encode("utf-8").decode("utf-8") except UnicodeError: return False, "Input contains invalid Unicode characters." return True, "" def sanitize_input_content(input_text: str) -> str: """Sanitize input content by removing or replacing problematic characters. Args: input_text: The input text to sanitize Returns: str: Sanitized input text """ # Remove null bytes text = input_text.replace("\0", "") # Replace control characters with spaces (except newlines and tabs) allowed_chars = {"\n", "\t", "\r"} sanitized_chars = [] for c in text: if c in allowed_chars or ord(c) >= 32: sanitized_chars.append(c) else: sanitized_chars.append(" ") # Join characters and normalize whitespace text = "".join(sanitized_chars) text = " ".join(text.split()) return text def execute_patterns( patterns_to_run: List[str], chain_mode: bool = False, initial_input: Optional[str] = None, ) -> List[str]: """Execute the selected patterns and capture their outputs.""" logger.info(f"Executing {len(patterns_to_run)} patterns") st.session_state.chat_output = [] all_outputs = [] current_input = initial_input or st.session_state.input_content timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # Validate configuration current_provider = st.session_state.config.get("vendor") current_model = st.session_state.config.get("model") if not current_provider or not current_model: error_msg = "Please select a provider and model first." logger.error(error_msg) st.error(error_msg) return all_outputs # Validate input content is_valid, error_message = validate_input_content(current_input) if not is_valid: logger.error(f"Input validation failed: {error_message}") st.error(f"Input validation failed: {error_message}") return all_outputs # Sanitize input content try: sanitized_input = sanitize_input_content(current_input) if sanitized_input != current_input: logger.info("Input content was sanitized") st.warning( "Input content was automatically sanitized for better compatibility." ) current_input = sanitized_input except Exception as e: logger.error(f"Error sanitizing input: {str(e)}") st.error(f"Error processing input: {str(e)}") return all_outputs execution_info = f"**Using Model:** {current_provider} - {current_model}" all_outputs.append(execution_info) logger.info(f"Using model: {current_model} from provider: {current_provider}") try: for pattern in patterns_to_run: logger.info(f"Running pattern: {pattern}") try: cmd = ["fabric", "--pattern", pattern] logger.debug(f"Executing command: {' '.join(cmd)}") message = ( current_input if chain_mode else st.session_state.input_content ) logger.debug(f"Input for pattern {pattern}:\n{message}") # Ensure input_data is a string input_data = str(message) # Run the command with text=True and string input result = run( cmd, input=input_data, capture_output=True, text=True, check=True ) pattern_output = result.stdout.strip() logger.debug(f"Raw output from pattern {pattern}:\n{pattern_output}") if pattern_output: # Format output as markdown output_msg = f"""### {pattern} {pattern_output}""" all_outputs.append(output_msg) # Save to output logs with markdown formatting save_output_log(pattern, message, pattern_output, timestamp) if chain_mode: current_input = pattern_output else: logger.warning(f"Pattern {pattern} generated no output") all_outputs.append(f"### {pattern}\n\nNo output generated.") except UnicodeEncodeError as e: error_msg = f"### {pattern}\n\nāŒ Error: Input contains invalid characters: {str(e)}" logger.error(f"Unicode encoding error for pattern {pattern}: {str(e)}") all_outputs.append(error_msg) if chain_mode: break except CalledProcessError as e: error_msg = f"### {pattern}\n\nāŒ Error executing: {e.stderr.strip()}" logger.error(f"Pattern {pattern} failed: {e.stderr.strip()}") all_outputs.append(error_msg) if chain_mode: break except Exception as e: error_msg = f"### {pattern}\n\nāŒ Failed to execute: {str(e)}" logger.error(f"Pattern {pattern} failed: {str(e)}", exc_info=True) all_outputs.append(error_msg) if chain_mode: break except Exception as e: error_msg = f"### Error\n\nāŒ Error in pattern execution: {str(e)}" logger.error(error_msg, exc_info=True) st.error(error_msg) logger.info("Pattern execution completed") return all_outputs def validate_pattern(pattern_name): """Validate a pattern's structure and content.""" try: pattern_path = os.path.join(pattern_dir, pattern_name) if not os.path.exists(os.path.join(pattern_path, "system.md")): return False, f"Missing required file: system.md." with open(os.path.join(pattern_path, "system.md")) as f: content = f.read() required_sections = ["# IDENTITY", "# STEPS", "# OUTPUT"] missing_sections = [] for section in required_sections: if section.lower() not in content.lower(): missing_sections.append(section) if missing_sections: return ( True, f"Warning: Missing sections in system.md: {', '.join(missing_sections)}", ) return True, "Pattern is valid." except Exception as e: return False, f"Error validating pattern: {str(e)}" def pattern_editor(pattern_name): """Edit pattern content with simple and advanced editing options.""" if not pattern_name: return pattern_path = os.path.join(pattern_dir, pattern_name) system_file = os.path.join(pattern_path, "system.md") user_file = os.path.join(pattern_path, "user.md") st.markdown(f"### Editing Pattern: {pattern_name}") is_valid, message = validate_pattern(pattern_name) if not is_valid: st.error(message) elif message != "Pattern is valid.": st.warning(message) else: st.success("Pattern structure is valid") edit_mode = st.radio( "Edit Mode", ["Simple Editor", "Advanced (Wizard)"], key=f"edit_mode_{pattern_name}", horizontal=True, ) if edit_mode == "Simple Editor": if os.path.exists(system_file): with open(system_file) as f: content = f.read() new_content = st.text_area("Edit system.md", content, height=600) if st.button("Save system.md"): with open(system_file, "w") as f: f.write(new_content) st.success("Saved successfully!") else: st.error("system.md file not found") if os.path.exists(user_file): with open(user_file) as f: content = f.read() new_content = st.text_area("Edit user.md", content, height=300) if st.button("Save user.md"): with open(user_file, "w") as f: f.write(new_content) st.success("Saved successfully!") else: if os.path.exists(system_file): with open(system_file) as f: content = f.read() sections = content.split("#") edited_sections = [] for section in sections: if not section.strip(): continue lines = section.strip().split("\n", 1) if len(lines) > 1: title, content = lines else: title, content = lines[0], "" st.markdown(f"#### {title}") new_content = st.text_area( f"Edit {title} section", value=content.strip(), height=200, key=f"section_{title}", ) edited_sections.append(f"# {title}\n\n{new_content}") if st.button("Save Changes"): new_content = "\n\n".join(edited_sections) with open(system_file, "w") as f: f.write(new_content) st.success("Changes saved successfully!") is_valid, message = validate_pattern(pattern_name) if not is_valid: st.error(message) elif message != "Pattern is valid.": st.warning(message) else: st.error("system.md file not found") def get_outputs_dir() -> str: """Get the directory for storing outputs.""" outputs_dir = os.path.expanduser("~/.config/fabric/outputs") os.makedirs(outputs_dir, exist_ok=True) return outputs_dir def save_outputs(): """Save pattern outputs and starred outputs to files. Error handling: - Creates output directory if it doesn't exist - Handles file write permissions - Handles JSON serialization errors - Logs all errors for debugging """ logger.info("Saving outputs to persistent storage") outputs_dir = get_outputs_dir() output_logs_file = os.path.join(outputs_dir, "output_logs.json") starred_outputs_file = os.path.join(outputs_dir, "starred_outputs.json") try: # Save output logs with open(output_logs_file, "w") as f: json.dump(st.session_state.output_logs, f, indent=2) logger.debug(f"Saved output logs to {output_logs_file}") # Save starred outputs with open(starred_outputs_file, "w") as f: json.dump(st.session_state.starred_outputs, f, indent=2) logger.debug(f"Saved starred outputs to {starred_outputs_file}") except PermissionError as e: error_msg = f"Permission denied when saving outputs: {str(e)}" logger.error(error_msg) st.error(error_msg) except json.JSONEncodeError as e: error_msg = f"Error encoding outputs to JSON: {str(e)}" logger.error(error_msg) st.error(error_msg) except Exception as e: error_msg = f"Unexpected error saving outputs: {str(e)}" logger.error(error_msg) st.error(error_msg) def load_saved_outputs(): """Load saved pattern outputs from files. Error handling: - Handles missing output files - Handles corrupted JSON files - Handles file read permissions - Initializes empty state if files don't exist """ logger.info("Loading saved outputs") outputs_dir = get_outputs_dir() output_logs_file = os.path.join(outputs_dir, "output_logs.json") starred_outputs_file = os.path.join(outputs_dir, "starred_outputs.json") try: # Load output logs if os.path.exists(output_logs_file): with open(output_logs_file, "r") as f: st.session_state.output_logs = json.load(f) logger.debug(f"Loaded output logs from {output_logs_file}") # Load starred outputs if os.path.exists(starred_outputs_file): with open(starred_outputs_file, "r") as f: st.session_state.starred_outputs = json.load(f) logger.debug(f"Loaded starred outputs from {starred_outputs_file}") except json.JSONDecodeError as e: error_msg = f"Error decoding saved outputs (corrupted files): {str(e)}" logger.error(error_msg) st.error(error_msg) # Initialize empty state st.session_state.output_logs = [] st.session_state.starred_outputs = [] except PermissionError as e: error_msg = f"Permission denied when loading outputs: {str(e)}" logger.error(error_msg) st.error(error_msg) except Exception as e: error_msg = f"Unexpected error loading saved outputs: {str(e)}" logger.error(error_msg) st.error(error_msg) # Initialize empty state st.session_state.output_logs = [] st.session_state.starred_outputs = [] def handle_star_name_input(log_index: int, name: str): """Handle the starring process when a name is input. Args: log_index: Index of the output to star name: Name to give the starred output """ try: if star_output(log_index, name): st.success("Output starred successfully!") else: st.error("Failed to star output. Please try again.") except Exception as e: logger.error(f"Error handling star name input: {str(e)}") st.error(f"Error starring output: {str(e)}") def execute_pattern_chain(patterns_sequence: List[str], initial_input: str) -> Dict: """Execute a sequence of patterns in a chain, passing output from each to the next. Args: patterns_sequence: List of pattern names to execute in sequence initial_input: Initial input text to start the chain Returns: Dict containing results from each stage of the chain """ logger.info( f"Starting pattern chain execution with {len(patterns_sequence)} patterns" ) chain_results = { "sequence": patterns_sequence, "stages": [], "final_output": None, "metadata": { "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "success": False, }, } current_input = initial_input try: for i, pattern in enumerate(patterns_sequence, 1): logger.info(f"Chain Stage {i}: Executing pattern '{pattern}'") stage_result = { "pattern": pattern, "input": current_input, "output": None, "success": False, "error": None, } try: cmd = ["fabric", "--pattern", pattern] result = run( cmd, input=current_input, capture_output=True, text=True, check=True ) output = result.stdout.strip() if output: stage_result["output"] = output stage_result["success"] = True current_input = output # Use this output as input for next pattern logger.debug(f"Stage {i} completed successfully") else: stage_result["error"] = "Pattern generated no output" logger.warning(f"Pattern {pattern} generated no output") except CalledProcessError as e: error_msg = f"Error executing pattern: {e.stderr.strip()}" stage_result["error"] = error_msg logger.error(error_msg) break except Exception as e: error_msg = f"Unexpected error: {str(e)}" stage_result["error"] = error_msg logger.error(error_msg) break chain_results["stages"].append(stage_result) # Save stage output to logs save_output_log( pattern, stage_result["input"], stage_result["output"] or stage_result["error"], chain_results["metadata"]["timestamp"], ) # Set final output and success status successful_stages = [s for s in chain_results["stages"] if s["success"]] if successful_stages: chain_results["final_output"] = successful_stages[-1]["output"] chain_results["metadata"]["success"] = True except Exception as e: logger.error(f"Chain execution failed: {str(e)}", exc_info=True) chain_results["metadata"]["error"] = str(e) return chain_results def enhance_input_preview(): """Display a preview of the input content with basic statistics. Shows: - Input text preview - Character count - Word count """ if "input_content" in st.session_state and st.session_state.input_content: with st.expander("Input Preview", expanded=True): st.markdown("### Current Input") st.code(st.session_state.input_content, language="text") # Basic statistics char_count = len(st.session_state.input_content) word_count = len(st.session_state.input_content.split()) col1, col2 = st.columns(2) with col1: st.metric("Characters", char_count) with col2: st.metric("Words", word_count) def get_clipboard_content() -> Tuple[bool, str, str]: """Get content from clipboard with proper error handling. Returns: Tuple[bool, str, str]: (success, content, error_message) """ try: # macOS - use pbpaste if PLATFORM == "darwin": result = run(["pbpaste"], capture_output=True, text=True, check=True) # Windows - fallback to pyperclip if available elif PLATFORM == "win32": try: import pyperclip content = pyperclip.paste() return True, content, "" except ImportError: return ( False, "", "The pyperclip module is required for clipboard operations on Windows.\nPlease install it with: pip install pyperclip", ) except Exception as e: return False, "", f"Windows clipboard error: {str(e)}" # Linux - use xclip else: result = run( ["xclip", "-selection", "clipboard", "-o"], capture_output=True, text=True, check=True, ) content = result.stdout # Validate the content is proper UTF-8 try: content.encode("utf-8").decode("utf-8") return True, content, "" except UnicodeError: return False, "", "Clipboard contains invalid Unicode characters" except FileNotFoundError: if PLATFORM == "darwin": return ( False, "", "Could not access clipboard. Please ensure you have the proper permissions.", ) elif PLATFORM == "win32": return ( False, "", "Windows clipboard access failed. Try installing pyperclip with: pip install pyperclip", ) else: return ( False, "", "xclip is not installed. Please install it with: sudo apt-get install xclip", ) except CalledProcessError as e: return False, "", f"Failed to read clipboard: {e.stderr}" except Exception as e: return False, "", f"Unexpected error reading clipboard: {str(e)}" def set_clipboard_content(content: str) -> Tuple[bool, str]: """Set content to clipboard with proper error handling. Args: content: The content to copy to clipboard Returns: Tuple[bool, str]: (success, error_message) """ try: # Validate content is proper UTF-8 before attempting to copy try: input_bytes = content.encode("utf-8") except UnicodeError: return False, "Content contains invalid Unicode characters" # macOS - use pbcopy if PLATFORM == "darwin": run(["pbcopy"], input=input_bytes, check=True) # Windows - fallback to pyperclip if available elif PLATFORM == "win32": try: import pyperclip pyperclip.copy(content) except ImportError: return ( False, "The pyperclip module is required for clipboard operations on Windows.\nPlease install it with: pip install pyperclip", ) except Exception as e: return False, f"Windows clipboard error: {str(e)}" # Linux - use xclip else: run(["xclip", "-selection", "clipboard"], input=input_bytes, check=True) return True, "" except FileNotFoundError: if PLATFORM == "darwin": return ( False, "Could not access clipboard. Please ensure you have the proper permissions.", ) elif PLATFORM == "win32": return ( False, "Windows clipboard access failed. Try installing pyperclip with: pip install pyperclip", ) else: return ( False, "xclip is not installed. Please install it with: sudo apt-get install xclip", ) except CalledProcessError as e: return False, f"Failed to copy to clipboard: {e.stderr}" except Exception as e: return False, f"Unexpected error copying to clipboard: {str(e)}" def main(): """Main function to run the Streamlit app.""" logger.info("Starting Fabric Pattern Studio") try: # Set page config st.set_page_config( page_title="Fabric Pattern Studio", page_icon="🧬", layout="wide", initial_sidebar_state="expanded", ) # Add title with gradient styling and footer signature st.markdown( """

Pattern Studio

made by zo6 """, unsafe_allow_html=True, ) initialize_session_state() if not st.session_state.config_loaded: logger.info("Loading initial configuration") success = load_configuration() if not success: logger.error("Failed to load configuration") st.error("Failed to load configuration. Please check your .env file.") st.stop() with st.sidebar: # Add GitHub link st.markdown( """
GitHub Repo
""", unsafe_allow_html=True, ) st.title("Configuration") load_models_and_providers() st.markdown("---") st.title("Navigation") view = st.radio( "Select View", ["Run Patterns", "Pattern Management", "Analysis Dashboard"], key="view_selector", ) logger.debug(f"Selected view: {view}") if view != st.session_state.get("current_view"): st.session_state["current_view"] = view if view == "Run Patterns": patterns = get_patterns() logger.debug(f"Available patterns: {patterns}") if not patterns: logger.warning("No patterns available") st.warning("No patterns available. Create a pattern first.") return tabs = st.tabs(["Run", "Analysis"]) with tabs[0]: st.header("Run Patterns") selected_patterns = st.multiselect( "Select Patterns to Run", patterns, default=st.session_state.selected_patterns, key="selected_patterns_widget", ) st.session_state.selected_patterns = selected_patterns if selected_patterns: for pattern in selected_patterns: with st.expander(f"šŸ“ {pattern} Details", expanded=False): metadata = get_pattern_metadata(pattern) if metadata: st.markdown(metadata) else: st.info("No description available") st.subheader("Input") input_method = st.radio( "Input Method", ["Clipboard", "Manual Input"], horizontal=True ) if input_method == "Clipboard": col_load, col_preview = st.columns([2, 1]) with col_load: if st.button( "šŸ“‹ Load from Clipboard", use_container_width=True ): success, content, error = get_clipboard_content() if success: # Validate clipboard content is_valid, error_message = validate_input_content( content ) if not is_valid: st.error( f"Invalid clipboard content: {error_message}" ) else: # Sanitize clipboard content sanitized_content = sanitize_input_content( content ) if sanitized_content != content: st.warning( "Clipboard content was automatically sanitized for better compatibility." ) st.session_state.input_content = ( sanitized_content ) st.session_state.show_preview = True st.success("Content loaded from clipboard!") else: st.error(error) with col_preview: if st.button("šŸ‘ Toggle Preview", use_container_width=True): st.session_state.show_preview = ( not st.session_state.get("show_preview", False) ) else: st.session_state.input_content = st.text_area( "Enter Input Text", value=st.session_state.get("input_content", ""), height=200, ) if ( st.session_state.get("show_preview", False) or input_method == "Manual Input" ): if st.session_state.get("input_content"): enhance_input_preview() # Move chain mode checkbox before the run button chain_mode = st.checkbox( "Chain Mode", help="Execute patterns in sequence, passing output of each pattern as input to the next", ) if chain_mode and len(selected_patterns) > 1: st.info("Patterns will be executed in the order selected above") st.markdown("##### Drag to reorder patterns:") # Convert patterns list to DataFrame for data editor patterns_df = pd.DataFrame({"Pattern": selected_patterns}) edited_df = st.data_editor( patterns_df, use_container_width=True, key="pattern_reorder", hide_index=True, column_config={ "Pattern": st.column_config.TextColumn( "Pattern", help="Drag to reorder patterns" ) }, ) # Update selected patterns if order changed new_patterns = edited_df["Pattern"].tolist() if new_patterns != selected_patterns: st.session_state.selected_patterns = new_patterns col1, col2 = st.columns([3, 1]) with col1: if st.button( "šŸš€ Run Patterns", type="primary", use_container_width=True ): if not st.session_state.input_content: st.warning("Please provide input content.") else: with st.spinner("Running patterns..."): if chain_mode: # Execute pattern chain chain_results = execute_pattern_chain( selected_patterns, st.session_state.input_content, ) # Display chain results st.markdown("## Chain Execution Results") # Show sequence st.markdown("### Pattern Sequence") st.code(" → ".join(chain_results["sequence"])) # Show each stage st.markdown("### Execution Stages") for i, stage in enumerate( chain_results["stages"], 1 ): with st.expander( f"Stage {i}: {stage['pattern']}", expanded=False, ): st.markdown("#### Input") st.code(stage["input"]) st.markdown("#### Output") if stage["success"]: st.markdown(stage["output"]) else: st.error(stage["error"]) # Show final output if chain_results["metadata"]["success"]: st.markdown("### Final Output") st.markdown(chain_results["final_output"]) st.session_state.chat_output.append( chain_results["final_output"] ) else: st.error( "Chain execution failed. Check the stages above for details." ) else: # Normal pattern execution outputs = execute_patterns(selected_patterns) st.session_state.chat_output.extend(outputs) # Display outputs after execution if st.session_state.chat_output: st.markdown("---") st.header("Pattern Outputs") for message in st.session_state.chat_output: st.markdown(message) st.markdown("---") # Add separator between outputs # Output Actions col1, col2 = st.columns(2) with col1: if st.button("šŸ“‹ Copy All Outputs"): all_outputs = "\n\n".join(st.session_state.chat_output) success, error = set_clipboard_content(all_outputs) if success: st.success("All outputs copied to clipboard!") else: st.error(error) with col2: if st.button("āŒ Clear Outputs"): st.session_state.chat_output = [] st.success("Outputs cleared!") st.experimental_rerun() with col2: st.write("") # Empty space for layout balance else: st.info("Select one or more patterns to run.") with tabs[1]: st.header("Output Analysis") if st.session_state.chat_output: # Display pattern outputs in chronological order for i, output in enumerate( reversed(st.session_state.chat_output), 1 ): with st.expander(f"Output #{i}", expanded=False): st.markdown(output) else: st.info("Run some patterns to see output analysis.") elif view == "Pattern Management": create_tab, edit_tab, delete_tab = st.tabs(["Create", "Edit", "Delete"]) with create_tab: st.header("Create New Pattern") creation_mode = st.radio( "Creation Mode", ["Simple Editor", "Advanced (Wizard)"], key="creation_mode_main", horizontal=True, ) if creation_mode == "Simple Editor": pattern_creation_ui() else: pattern_creation_wizard() with edit_tab: st.header("Edit Patterns") patterns = get_patterns() if not patterns: st.warning("No patterns available. Create a pattern first.") else: selected_pattern = st.selectbox( "Select Pattern to Edit", [""] + patterns ) if selected_pattern: pattern_editor(selected_pattern) with delete_tab: st.header("Delete Patterns") patterns = get_patterns() if not patterns: st.warning("No patterns available.") else: patterns_to_delete = st.multiselect( "Select Patterns to Delete", patterns, key="delete_patterns_selector", ) if patterns_to_delete: st.warning( f"You are about to delete {len(patterns_to_delete)} pattern(s):" ) for pattern in patterns_to_delete: st.markdown(f"- {pattern}") confirm_delete = st.checkbox( "I understand that this action cannot be undone" ) if st.button( "šŸ—‘ļø Delete Selected Patterns", type="primary", disabled=not confirm_delete, ): if confirm_delete: for pattern in patterns_to_delete: success, message = delete_pattern(pattern) if success: st.success(f"āœ“ {pattern}: {message}") else: st.error(f"āœ— {pattern}: {message}") st.experimental_rerun() else: st.error( "Please confirm deletion by checking the box above." ) else: st.info("Select one or more patterns to delete.") else: st.header("Pattern Output History") # Create tabs for All Outputs and Starred Outputs all_tab, starred_tab = st.tabs(["All Outputs", "⭐ Starred"]) with all_tab: if not st.session_state.output_logs: st.info( "No pattern outputs recorded yet. Run some patterns to see their logs here." ) else: for i, log in enumerate(reversed(st.session_state.output_logs)): with st.expander( f"Output #{len(st.session_state.output_logs)-i} - {log['pattern_name']} ({log['timestamp']})", expanded=False, ): st.markdown("### Input") st.code(log["input"], language="text") st.markdown("### Output") st.markdown(log["output"]) # Check if this output is already starred is_starred = any( s["timestamp"] == log["timestamp"] for s in st.session_state.starred_outputs ) col1, col2 = st.columns([1, 4]) with col1: if not is_starred: if st.button( "⭐ Star", key=f"star_{i}", use_container_width=True, ): st.session_state.starring_output = ( len(st.session_state.output_logs) - i - 1 ) st.session_state.temp_star_name = "" else: st.write("⭐ Starred") with col2: if st.button("šŸ“‹ Copy Output", key=f"copy_{i}"): success, error = set_clipboard_content( log["output"] ) if success: st.success("Output copied to clipboard!") else: st.error(error) # Show starring form inside the expander if this is the output being starred if ( st.session_state.starring_output == len(st.session_state.output_logs) - i - 1 ): st.markdown("---") with st.form(key=f"star_name_form_{i}"): name_input = st.text_input( "Enter a name for this output (optional):", key=f"star_name_input_{i}", ) col1, col2 = st.columns(2) with col1: submit = st.form_submit_button( "Save", use_container_width=True ) with col2: cancel = st.form_submit_button( "Cancel", use_container_width=True ) if submit: handle_star_name_input( st.session_state.starring_output, name_input ) # Reset starring state after handling st.session_state.starring_output = None st.experimental_rerun() elif cancel: # Reset starring state st.session_state.starring_output = None st.experimental_rerun() # Remove the old starring form from the bottom st.markdown("---") with starred_tab: if not st.session_state.starred_outputs: st.info( "No starred outputs yet. Star some outputs to see them here!" ) else: for i, starred in enumerate(st.session_state.starred_outputs): with st.expander( f"⭐ {starred.get('custom_name', f'Starred Output #{i+1}')} ({starred['timestamp']})", expanded=False, ): col1, col2 = st.columns([3, 1]) with col1: st.markdown( f"### {starred.get('custom_name', f'Starred Output #{i+1}')}" ) with col2: if st.button("āœļø Edit Name", key=f"edit_name_{i}"): st.session_state[f"editing_name_{i}"] = True if st.session_state.get(f"editing_name_{i}", False): new_name = st.text_input( "Enter new name:", value=starred.get("custom_name", ""), key=f"new_name_{i}", ) col1, col2 = st.columns([1, 1]) with col1: if st.button("Save", key=f"save_name_{i}"): st.session_state.starred_outputs[i][ "custom_name" ] = new_name del st.session_state[f"editing_name_{i}"] st.success("Name updated!") st.experimental_rerun() with col2: if st.button("Cancel", key=f"cancel_name_{i}"): del st.session_state[f"editing_name_{i}"] st.experimental_rerun() st.markdown("### Pattern") st.code(starred["pattern_name"], language="text") st.markdown("### Input") st.code( starred["input"], language="text" ) # Display input as code block st.markdown("### Output") st.markdown(starred["output"]) # Display output as markdown col1, col2 = st.columns([1, 4]) with col1: if st.button("āŒ Remove Star", key=f"unstar_{i}"): unstar_output(i) st.success("Output unstarred!") st.experimental_rerun() with col2: if st.button("šŸ“‹ Copy Output", key=f"copy_starred_{i}"): try: run( ["xclip", "-selection", "clipboard"], input=starred["output"].encode(), check=True, ) st.success("Output copied to clipboard!") except Exception as e: st.error(f"Error copying to clipboard: {e}") if st.button("Clear All Starred"): if st.checkbox("Confirm clearing all starred outputs"): st.session_state.starred_outputs = [] save_outputs() # Save after clearing st.success("All starred outputs cleared!") st.experimental_rerun() except Exception as e: logger.error("Unexpected error in main function", exc_info=True) st.error(f"An unexpected error occurred: {str(e)}") st.stop() if __name__ == "__main__": logger.info("Application startup") main()