mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-14 16:55:13 -05:00
### Changes 🏗️ Added the dispute & refund handling on the system. https://github.com/user-attachments/assets/2d9c7b48-2ee1-401d-be65-5e9787c8130e ### Checklist 📋 #### For code changes: - [ ] I have clearly listed my changes in the PR description - [ ] I have made a test plan - [ ] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [ ] ... <details> <summary>Example test plan</summary> - [ ] Create from scratch and execute an agent with at least 3 blocks - [ ] Import an agent from file upload, and confirm it executes correctly - [ ] Upload agent to marketplace - [ ] Import an agent from marketplace and confirm it executes correctly - [ ] Edit an agent from monitor, and confirm it executes correctly </details> #### For configuration changes: - [ ] `.env.example` is updated or already compatible with my changes - [ ] `docker-compose.yml` is updated or already compatible with my changes - [ ] I have included a list of my configuration changes in the PR description (under **Changes**) <details> <summary>Examples of configuration changes</summary> - Changing ports - Adding new services that need to communicate with each other - Secrets or environment variable changes - New or infrastructure changes such as databases </details>
38 lines
1.1 KiB
Python
38 lines
1.1 KiB
Python
import json
|
|
from typing import Optional
|
|
|
|
from cryptography.fernet import Fernet
|
|
|
|
from backend.util.settings import Settings
|
|
|
|
ENCRYPTION_KEY = Settings().secrets.encryption_key
|
|
|
|
|
|
class JSONCryptor:
|
|
def __init__(self, key: Optional[str] = None):
|
|
# Use provided key or get from environment
|
|
self.key = key or ENCRYPTION_KEY
|
|
if not self.key:
|
|
raise ValueError(
|
|
"Encryption key must be provided or set in ENCRYPTION_KEY environment variable"
|
|
)
|
|
self.fernet = Fernet(
|
|
self.key.encode() if isinstance(self.key, str) else self.key
|
|
)
|
|
|
|
def encrypt(self, data: dict) -> str:
|
|
"""Encrypt dictionary data to string"""
|
|
json_str = json.dumps(data)
|
|
encrypted = self.fernet.encrypt(json_str.encode())
|
|
return encrypted.decode()
|
|
|
|
def decrypt(self, encrypted_str: str) -> dict:
|
|
"""Decrypt string to dictionary"""
|
|
if not encrypted_str:
|
|
return {}
|
|
try:
|
|
decrypted = self.fernet.decrypt(encrypted_str.encode())
|
|
return json.loads(decrypted.decode())
|
|
except Exception:
|
|
return {}
|