add Dockerfile with selfhosting instructions
18
.dockerignore
Normal file
@@ -0,0 +1,18 @@
|
||||
**/.DS_Store
|
||||
**/__pycache__/
|
||||
**/*.pyc
|
||||
**/*.pyo
|
||||
**/*.pyd
|
||||
|
||||
frontend/node_modules/
|
||||
frontend/dist/
|
||||
|
||||
backend/dist/
|
||||
backend/urls.db
|
||||
backend/config.json
|
||||
|
||||
.pytest_cache/
|
||||
.venv/
|
||||
venv/
|
||||
node_modules/
|
||||
|
||||
42
Dockerfile
Normal file
@@ -0,0 +1,42 @@
|
||||
FROM node:20-alpine AS frontend-build
|
||||
|
||||
WORKDIR /src
|
||||
|
||||
# Copy only what the Vite build needs first (better layer caching)
|
||||
COPY frontend/package.json frontend/package-lock.json ./frontend/
|
||||
RUN cd frontend && npm ci
|
||||
|
||||
COPY frontend ./frontend
|
||||
COPY backend ./backend
|
||||
|
||||
RUN cd frontend && npm run build
|
||||
|
||||
|
||||
FROM python:3.10-slim AS runtime
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
ENV PYTHONDONTWRITEBYTECODE=1 \
|
||||
PYTHONUNBUFFERED=1
|
||||
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY backend/requirements.txt /app/backend/requirements.txt
|
||||
RUN pip install --no-cache-dir -r /app/backend/requirements.txt
|
||||
|
||||
COPY backend /app/backend
|
||||
COPY --from=frontend-build /src/backend/dist /app/backend/dist
|
||||
|
||||
COPY docker/entrypoint.sh /entrypoint.sh
|
||||
RUN chmod +x /entrypoint.sh
|
||||
|
||||
# Default DB location inside the container; mount a volume at /data and set TNYR_DB_PATH=/data/urls.db
|
||||
VOLUME ["/data"]
|
||||
|
||||
EXPOSE 5502
|
||||
|
||||
ENTRYPOINT ["/entrypoint.sh"]
|
||||
|
||||
|
||||
72
README.md
@@ -31,35 +31,56 @@ A secure, self-hosted URL shortener with custom passwordless encryption. Perfect
|
||||
|
||||
## Self Hosting and Development
|
||||
|
||||
### Prerequisites
|
||||
- Python 3.9+
|
||||
- Node.js 16+
|
||||
### Self-host (recommended): Docker / Docker Compose
|
||||
|
||||
### Instructions
|
||||
#### Prerequisites
|
||||
- Docker (and optionally Docker Compose)
|
||||
|
||||
1. **Deploy with your domain:**
|
||||
```bash
|
||||
./deploy.sh your-domain.com
|
||||
```
|
||||
#### 1) Generate salts (optional; only if you hosted tnyr.me before **Dec 30, 2025**)
|
||||
|
||||
2. **Install Python dependencies:**
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
```bash
|
||||
python3 backend/generate_salts.py --env
|
||||
```
|
||||
|
||||
3. **Setup the config:**
|
||||
```bash
|
||||
cp config_template.json config.json
|
||||
python generate_salts.py
|
||||
```
|
||||
You will see two salts, which you can use in the config.
|
||||
#### 2) Run with Docker Compose (no secrets required)
|
||||
|
||||
4. **Start Server**
|
||||
```bash
|
||||
python main.py
|
||||
```
|
||||
```bash
|
||||
mkdir -p data
|
||||
export TNYR_PUBLIC_URL=https://example.com # (or http://1.2.3.4:5502)
|
||||
docker compose up -d --build
|
||||
```
|
||||
|
||||
5. Access at `http://localhost:5000`
|
||||
Required env vars:
|
||||
- `TNYR_PUBLIC_URL` (example `https://example.com`, `http://1.2.3.4:5502`)
|
||||
|
||||
Optional env vars:
|
||||
- `TNYR_DB_PATH` (defaults to `/data/urls.db` in the container via compose)
|
||||
- `TNYR_DELETION_TOKEN` (set to enable `POST /delete-url`)
|
||||
- **Legacy link support (only if you hosted tnyr.me before Dec 30, 2025)**:
|
||||
- `TNYR_SALT1_HEX` (16 bytes = 32 hex chars)
|
||||
- `TNYR_SALT2_HEX` (16 bytes = 32 hex chars)
|
||||
- `TNYR_ARGON2_TIME_COST` (default `3`)
|
||||
- `TNYR_ARGON2_MEMORY_COST` (default `65536`)
|
||||
- `TNYR_ARGON2_PARALLELISM` (default `1`)
|
||||
- `TNYR_ARGON2_HASH_LENGTH` (default `32`)
|
||||
|
||||
**Note**: Creating new legacy links via `POST /shorten-server` is disabled. The legacy env vars above are only for resolving existing old `/<id>` links.
|
||||
|
||||
#### 3) Or run with plain Docker
|
||||
|
||||
```bash
|
||||
docker build -t tnyr .
|
||||
docker run --rm \
|
||||
-p 5502:5502 \
|
||||
-v "$PWD/data:/data" \
|
||||
-e TNYR_PUBLIC_URL="${TNYR_PUBLIC_URL:-http://localhost:5502}" \
|
||||
-e TNYR_DB_PATH="/data/urls.db" \
|
||||
tnyr
|
||||
```
|
||||
|
||||
Then open `http://localhost:5502`.
|
||||
|
||||
**Note**: On container start, the entrypoint will initialize the SQLite schema (if missing) and replace `%VITE_PUBLIC_URL%` / `%VITE_DOMAIN%` placeholders in `backend/dist` based on `TNYR_PUBLIC_URL`.
|
||||
|
||||
### Development
|
||||
|
||||
@@ -73,6 +94,11 @@ A secure, self-hosted URL shortener with custom passwordless encryption. Perfect
|
||||
```bash
|
||||
cd backend
|
||||
pip install -r requirements.txt
|
||||
# Required:
|
||||
export TNYR_PUBLIC_URL=https://example.com
|
||||
# Optional (legacy server-side mode only):
|
||||
# export TNYR_SALT1_HEX=... # 32 hex chars
|
||||
# export TNYR_SALT2_HEX=... # 32 hex chars
|
||||
python main.py
|
||||
```
|
||||
|
||||
|
||||
@@ -1,200 +0,0 @@
|
||||
# Backend Testing Guide
|
||||
|
||||
This document explains the comprehensive test suite for the URL shortener backend.
|
||||
|
||||
## 🧪 Test Overview
|
||||
|
||||
The test suite covers all critical functionality:
|
||||
|
||||
- **Cryptographic Functions** (`test_crypto.py`) - AES encryption, Argon2 key derivation
|
||||
- **API Endpoints** (`test_api.py`) - All Flask routes and error handling
|
||||
- **Database Operations** (`test_database.py`) - SQLite schema, CRUD operations, constraints
|
||||
- **Utility Functions** (`test_utils.py`) - ID generation, configuration validation
|
||||
|
||||
## 📦 Installation
|
||||
|
||||
Install testing dependencies:
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
## 🚀 Running Tests
|
||||
|
||||
### Quick Start
|
||||
```bash
|
||||
# Run all tests
|
||||
pytest
|
||||
|
||||
# Or use the test runner
|
||||
python run_tests.py
|
||||
```
|
||||
|
||||
### Specific Test Categories
|
||||
```bash
|
||||
# Cryptographic tests only
|
||||
python run_tests.py crypto
|
||||
|
||||
# API endpoint tests only
|
||||
python run_tests.py api
|
||||
|
||||
# Database tests only
|
||||
python run_tests.py database
|
||||
|
||||
# Utility function tests only
|
||||
python run_tests.py utils
|
||||
```
|
||||
|
||||
### Test Options
|
||||
```bash
|
||||
# Run with coverage report
|
||||
python run_tests.py coverage
|
||||
|
||||
# Fast mode (stop on first failure)
|
||||
python run_tests.py fast
|
||||
|
||||
# Verbose output
|
||||
python run_tests.py verbose
|
||||
|
||||
# See all available options
|
||||
python run_tests.py help
|
||||
```
|
||||
|
||||
## 📊 Coverage Requirements
|
||||
|
||||
The test suite maintains **80%+ code coverage** and covers:
|
||||
|
||||
### Cryptographic Security ⚡
|
||||
- ✅ AES-256-CBC encryption/decryption roundtrips
|
||||
- ✅ Argon2 key derivation consistency and security
|
||||
- ✅ Invalid key length handling
|
||||
- ✅ Corruption detection (wrong keys, corrupted data)
|
||||
- ✅ Unicode URL handling
|
||||
- ✅ Edge cases (empty strings, large data)
|
||||
|
||||
### API Robustness 🌐
|
||||
- ✅ All endpoint success paths
|
||||
- ✅ Comprehensive error handling (400, 404, 409, 500)
|
||||
- ✅ Input validation and sanitization
|
||||
- ✅ JSON parsing error handling
|
||||
- ✅ HTTP method validation
|
||||
- ✅ URL prefix normalization
|
||||
- ✅ ID collision handling with retry logic
|
||||
|
||||
### Database Integrity 🗄️
|
||||
- ✅ Schema creation and validation
|
||||
- ✅ Primary key and NOT NULL constraints
|
||||
- ✅ BLOB data integrity across various byte patterns
|
||||
- ✅ Concurrent access patterns
|
||||
- ✅ Performance characteristics
|
||||
- ✅ Data type validation (TEXT, BLOB)
|
||||
|
||||
### ID Generation 🎲
|
||||
- ✅ Character set compliance (no confusing chars: 0, O, I, l)
|
||||
- ✅ Length consistency
|
||||
- ✅ Uniqueness verification (statistical)
|
||||
- ✅ Configuration flexibility
|
||||
|
||||
## 🔧 Test Configuration
|
||||
|
||||
Tests use isolated configuration:
|
||||
- **In-memory SQLite database** (no interference with production data)
|
||||
- **Reduced Argon2 parameters** (faster test execution)
|
||||
- **Deterministic test salts** (reproducible results)
|
||||
|
||||
## 📁 Test Files Structure
|
||||
|
||||
```
|
||||
backend/
|
||||
├── tests/
|
||||
│ ├── conftest.py # Test fixtures and configuration
|
||||
│ ├── test_crypto.py # Cryptographic function tests
|
||||
│ ├── test_api.py # API endpoint tests
|
||||
│ ├── test_database.py # Database operation tests
|
||||
│ └── test_utils.py # Utility function tests
|
||||
├── pytest.ini # Pytest configuration
|
||||
├── run_tests.py # Test runner script
|
||||
└── README_TESTING.md # This documentation
|
||||
```
|
||||
|
||||
## ⚠️ Security Test Notes
|
||||
|
||||
**Critical Security Areas Tested:**
|
||||
1. **Encryption Integrity** - Ensures no data corruption or key leakage
|
||||
2. **ID Collision Handling** - Prevents duplicate shortened URLs
|
||||
3. **Input Validation** - Protects against malformed requests
|
||||
4. **Database Constraints** - Maintains data integrity
|
||||
|
||||
**Known Test Limitations:**
|
||||
- Tests use reduced Argon2 parameters for speed (production uses stronger settings)
|
||||
- Some timing-based attacks are not covered (out of scope for unit tests)
|
||||
- Network-level security is tested in integration/E2E tests
|
||||
|
||||
## 🐛 Debugging Failed Tests
|
||||
|
||||
### Common Issues
|
||||
|
||||
1. **Import Errors**: Ensure you're in the backend directory
|
||||
2. **Missing Dependencies**: Run `pip install -r requirements.txt`
|
||||
3. **Database Errors**: Tests use in-memory DB, but check file permissions
|
||||
4. **Config Mocking Issues**: Restart test session if config patches interfere
|
||||
|
||||
### Debug Commands
|
||||
```bash
|
||||
# Run single test with full output
|
||||
pytest tests/test_crypto.py::TestCryptographicFunctions::test_encrypt_decrypt_roundtrip -v -s
|
||||
|
||||
# Run with Python debugger on failure
|
||||
pytest --pdb tests/test_api.py
|
||||
|
||||
# Show all print statements
|
||||
pytest -s
|
||||
```
|
||||
|
||||
## 📈 Performance Expectations
|
||||
|
||||
**Test Execution Times:**
|
||||
- Full test suite: < 30 seconds
|
||||
- Crypto tests: < 10 seconds
|
||||
- API tests: < 15 seconds
|
||||
- Database tests: < 10 seconds
|
||||
- Utils tests: < 5 seconds
|
||||
|
||||
**Coverage Targets:**
|
||||
- Overall: ≥ 80%
|
||||
- Critical crypto functions: 100%
|
||||
- API endpoints: ≥ 90%
|
||||
- Database operations: ≥ 85%
|
||||
|
||||
## 🔄 Continuous Integration
|
||||
|
||||
For CI/CD pipelines, use:
|
||||
|
||||
```bash
|
||||
# CI-friendly command with XML output
|
||||
pytest --junitxml=test-results.xml --cov=main --cov-report=xml
|
||||
|
||||
# Fail on low coverage
|
||||
pytest --cov=main --cov-fail-under=80
|
||||
```
|
||||
|
||||
## 🆘 Getting Help
|
||||
|
||||
If tests fail unexpectedly:
|
||||
|
||||
1. **Check test output** for specific assertion failures
|
||||
2. **Run individual test files** to isolate issues
|
||||
3. **Verify dependencies** are correctly installed
|
||||
4. **Check file permissions** for database operations
|
||||
5. **Review recent code changes** that might affect tested functionality
|
||||
|
||||
## 🎯 Best Practices
|
||||
|
||||
When adding new functionality:
|
||||
|
||||
1. **Write tests first** (TDD approach)
|
||||
2. **Test error conditions** as thoroughly as success cases
|
||||
3. **Use descriptive test names** that explain what is being tested
|
||||
4. **Mock external dependencies** (filesystem, network)
|
||||
5. **Maintain test isolation** (no shared state between tests)
|
||||
6. **Update this documentation** when adding new test categories
|
||||
@@ -1,24 +0,0 @@
|
||||
{
|
||||
"salts": {
|
||||
"salt1_var": "REPLACE_ME",
|
||||
"salt2_var": "REPLACE_ME"
|
||||
},
|
||||
"argon2": {
|
||||
"time_cost": 3,
|
||||
"memory_cost": 65536,
|
||||
"parallelism": 1,
|
||||
"hash_length": 32
|
||||
},
|
||||
"database": {
|
||||
"path": "urls.db"
|
||||
},
|
||||
"id_generation": {
|
||||
"length": 10,
|
||||
"allowed_chars": "abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ123456789"
|
||||
},
|
||||
"domain": {
|
||||
"name": "your-domain.com",
|
||||
"api_base_url": "https://your-domain.com"
|
||||
},
|
||||
"deletion_token": "EITHER_REPLACE_ME_OR_REMOVE_ME_TO_DISABLE_DELETION"
|
||||
}
|
||||
110
backend/dist/assets/index-BPyHGJ-Z.js
vendored
1
backend/dist/assets/index-Dlj26pVa.css
vendored
BIN
backend/dist/favicon/apple-touch-icon.png
vendored
|
Before Width: | Height: | Size: 6.7 KiB |
BIN
backend/dist/favicon/favicon-96x96.png
vendored
|
Before Width: | Height: | Size: 3.8 KiB |
BIN
backend/dist/favicon/favicon.ico
vendored
|
Before Width: | Height: | Size: 15 KiB |
3
backend/dist/favicon/favicon.svg
vendored
|
Before Width: | Height: | Size: 83 KiB |
21
backend/dist/favicon/site.webmanifest
vendored
@@ -1,21 +0,0 @@
|
||||
{
|
||||
"name": "tnyr.me",
|
||||
"short_name": "tnyr.me",
|
||||
"icons": [
|
||||
{
|
||||
"src": "/assets/favicon/web-app-manifest-192x192.png",
|
||||
"sizes": "192x192",
|
||||
"type": "image/png",
|
||||
"purpose": "maskable"
|
||||
},
|
||||
{
|
||||
"src": "/assets/favicon/web-app-manifest-512x512.png",
|
||||
"sizes": "512x512",
|
||||
"type": "image/png",
|
||||
"purpose": "maskable"
|
||||
}
|
||||
],
|
||||
"theme_color": "#ffffff",
|
||||
"background_color": "#ffffff",
|
||||
"display": "standalone"
|
||||
}
|
||||
BIN
backend/dist/favicon/web-app-manifest-192x192.png
vendored
|
Before Width: | Height: | Size: 7.1 KiB |
BIN
backend/dist/favicon/web-app-manifest-512x512.png
vendored
|
Before Width: | Height: | Size: 22 KiB |
39
backend/dist/index.html
vendored
@@ -1,39 +0,0 @@
|
||||
<!doctype html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8" />
|
||||
<link rel="icon" type="image/png" href="/favicon/favicon-96x96.png" sizes="96x96" />
|
||||
<link rel="icon" type="image/svg+xml" href="/favicon/favicon.svg" />
|
||||
<link rel="shortcut icon" href="/favicon/favicon.ico" />
|
||||
<link rel="apple-touch-icon" sizes="180x180" href="/favicon/apple-touch-icon.png" />
|
||||
<meta name="apple-mobile-web-app-title" content="tnyr.me" />
|
||||
<link rel="manifest" href="/favicon/site.webmanifest" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||
<meta name="description" content="An easy to use end-to-end encrypted URL shortener. tnyr.me is privacy friendly and does not track you or store cookies" />
|
||||
<link rel="canonical" href="https://tnyr.me" />
|
||||
|
||||
<meta property="og:title" content="TNYR - Privacy friendly URL shortener" />
|
||||
<meta property="og:site_name" content="tnyr.me">
|
||||
<meta property="og:url" content="https://tnyr.me">
|
||||
<meta property="og:description" content="A self encrypted url shortener that puts your privacy first, while being easy to use. tnyr.me doesn't track users, store cookies or log your requests.">
|
||||
<meta property="og:type" content="website">
|
||||
<meta property="og:image" content="https://tnyr.me/meta/logo.png">
|
||||
|
||||
<script type="application/ld+json">
|
||||
{
|
||||
"@context": "https://schema.org",
|
||||
"@type": "WebSite",
|
||||
"name": "TNYR",
|
||||
"url": "https://tnyr.me",
|
||||
"description": "A self encrypted url shortener that puts your privacy first, while being easy to use. tnyr.me doesn't track users, store cookies or log your requests."
|
||||
}
|
||||
</script>
|
||||
|
||||
<title>tnyr.me - Privacy friendly URL shortener</title>
|
||||
<script type="module" crossorigin src="/assets/index-BPyHGJ-Z.js"></script>
|
||||
<link rel="stylesheet" crossorigin href="/assets/index-Dlj26pVa.css">
|
||||
</head>
|
||||
<body>
|
||||
<div id="root"></div>
|
||||
</body>
|
||||
</html>
|
||||
BIN
backend/dist/meta/logo.png
vendored
|
Before Width: | Height: | Size: 62 KiB |
4
backend/dist/meta/robots.txt
vendored
@@ -1,4 +0,0 @@
|
||||
User-agent: *
|
||||
Disallow: /cgi-bin/
|
||||
Disallow: /admin/
|
||||
Sitemap: https://tnyr.me/sitemap.xml
|
||||
13
backend/dist/meta/sitemap.xml
vendored
@@ -1,13 +0,0 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<urlset
|
||||
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://www.sitemaps.org/schemas/sitemap/0.9
|
||||
http://www.sitemaps.org/schemas/sitemap/0.9/sitemap.xsd">
|
||||
|
||||
<url>
|
||||
<loc>https://tnyr.me/</loc>
|
||||
</url>
|
||||
|
||||
|
||||
</urlset>
|
||||
@@ -1,4 +1,12 @@
|
||||
import sys
|
||||
from secrets import token_hex
|
||||
|
||||
print("SALT1:", token_hex(16))
|
||||
print("SALT2:", token_hex(16))
|
||||
salt1 = token_hex(16)
|
||||
salt2 = token_hex(16)
|
||||
|
||||
if "--env" in sys.argv:
|
||||
print(f"export TNYR_SALT1_HEX={salt1}")
|
||||
print(f"export TNYR_SALT2_HEX={salt2}")
|
||||
else:
|
||||
print("SALT1:", salt1)
|
||||
print("SALT2:", salt2)
|
||||
197
backend/main.py
@@ -1,5 +1,4 @@
|
||||
import os
|
||||
import json
|
||||
import sqlite3
|
||||
import secrets
|
||||
from flask import Flask, request, jsonify, redirect
|
||||
@@ -12,30 +11,94 @@ from hashlib import scrypt as hashlib_scrypt
|
||||
|
||||
# --- Determine absolute path for file access ---
|
||||
APP_DIR = os.path.dirname(os.path.abspath(__file__))
|
||||
CONFIG_PATH = os.path.join(APP_DIR, 'config.json')
|
||||
|
||||
# Load configuration
|
||||
with open(CONFIG_PATH) as f:
|
||||
config = json.load(f)
|
||||
def _env_int(name: str, default: int) -> int:
|
||||
val = os.getenv(name)
|
||||
if val is None or val == "":
|
||||
return default
|
||||
return int(val)
|
||||
|
||||
# --- Make database path absolute ---
|
||||
if not os.path.isabs(config['database']['path']):
|
||||
config['database']['path'] = os.path.join(APP_DIR, config['database']['path'])
|
||||
def _default_db_path() -> str:
|
||||
# Good Docker default, but keep local-dev workable too
|
||||
if os.path.isdir("/data"):
|
||||
return "/data/urls.db"
|
||||
return "urls.db"
|
||||
|
||||
def _load_config_from_env() -> dict:
|
||||
salt1_hex = os.getenv("TNYR_SALT1_HEX", "").strip()
|
||||
salt2_hex = os.getenv("TNYR_SALT2_HEX", "").strip()
|
||||
# Salts are only required for the legacy server-side mode (/shorten-server and old /<id> links).
|
||||
# For the default client-side mode (/shorten + /get-encrypted-url + /#... links), no server secrets are required.
|
||||
|
||||
db_path = os.getenv("TNYR_DB_PATH", _default_db_path()).strip()
|
||||
|
||||
public_url = os.getenv("TNYR_PUBLIC_URL", "").strip()
|
||||
if not public_url:
|
||||
raise RuntimeError("Missing required environment variable: TNYR_PUBLIC_URL (https://example.com or http://1.2.3.4:5502)")
|
||||
try:
|
||||
from urllib.parse import urlparse
|
||||
parsed = urlparse(public_url)
|
||||
if parsed.scheme not in ("http", "https") or not parsed.netloc:
|
||||
raise ValueError("Invalid TNYR_PUBLIC_URL")
|
||||
domain = parsed.netloc
|
||||
normalized_public_url = f"{parsed.scheme}://{parsed.netloc}"
|
||||
except Exception as e:
|
||||
raise RuntimeError("Invalid TNYR_PUBLIC_URL (must be http(s)://host[:port])") from e
|
||||
|
||||
cfg = {
|
||||
"salts": {
|
||||
"salt1_var": salt1_hex,
|
||||
"salt2_var": salt2_hex,
|
||||
},
|
||||
"argon2": {
|
||||
"time_cost": _env_int("TNYR_ARGON2_TIME_COST", 3),
|
||||
"memory_cost": _env_int("TNYR_ARGON2_MEMORY_COST", 65536),
|
||||
"parallelism": _env_int("TNYR_ARGON2_PARALLELISM", 1),
|
||||
"hash_length": _env_int("TNYR_ARGON2_HASH_LENGTH", 32),
|
||||
},
|
||||
"database": {
|
||||
"path": db_path,
|
||||
},
|
||||
"id_generation": {
|
||||
"length": _env_int("TNYR_ID_LENGTH", 10),
|
||||
"allowed_chars": os.getenv(
|
||||
"TNYR_ID_ALLOWED_CHARS",
|
||||
"abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ123456789",
|
||||
),
|
||||
},
|
||||
"domain": {
|
||||
# Not required for core functionality; used only in the abuse page and SEO files
|
||||
"name": domain,
|
||||
"api_base_url": os.getenv("TNYR_API_BASE_URL", "").strip() or normalized_public_url,
|
||||
},
|
||||
"deletion_token": os.getenv("TNYR_DELETION_TOKEN", "").strip(),
|
||||
}
|
||||
|
||||
# --- Make database path absolute (except SQLite special :memory:) ---
|
||||
if cfg["database"]["path"] != ":memory:" and not os.path.isabs(cfg["database"]["path"]):
|
||||
cfg["database"]["path"] = os.path.join(APP_DIR, cfg["database"]["path"])
|
||||
|
||||
return cfg
|
||||
|
||||
config = _load_config_from_env()
|
||||
|
||||
app = Flask(__name__, static_folder='dist', static_url_path='/static')
|
||||
|
||||
# Validate and load salts
|
||||
salt1_hex = config['salts']['salt1_var']
|
||||
salt2_hex = config['salts']['salt2_var']
|
||||
# Legacy server-side mode (optional)
|
||||
salt1_hex = (config.get('salts', {}) or {}).get('salt1_var', '').strip()
|
||||
salt2_hex = (config.get('salts', {}) or {}).get('salt2_var', '').strip()
|
||||
LEGACY_SERVER_SIDE_ENABLED = bool(salt1_hex and salt2_hex)
|
||||
|
||||
# Validate and convert salts
|
||||
try:
|
||||
SALT1 = bytes.fromhex(salt1_hex)
|
||||
SALT2 = bytes.fromhex(salt2_hex)
|
||||
if len(SALT1) != 16 or len(SALT2) != 16:
|
||||
raise ValueError("Salts must decode to 16 bytes")
|
||||
except ValueError as e:
|
||||
raise ValueError("Invalid salt format") from e
|
||||
SALT1 = None
|
||||
SALT2 = None
|
||||
if LEGACY_SERVER_SIDE_ENABLED:
|
||||
try:
|
||||
SALT1 = bytes.fromhex(salt1_hex)
|
||||
SALT2 = bytes.fromhex(salt2_hex)
|
||||
if len(SALT1) != 16 or len(SALT2) != 16:
|
||||
raise ValueError("Salts must decode to 16 bytes")
|
||||
except ValueError as e:
|
||||
raise ValueError("Invalid salt format") from e
|
||||
|
||||
# Database setup
|
||||
def get_db():
|
||||
@@ -61,6 +124,8 @@ def generate_id():
|
||||
# Argon2 configuration
|
||||
def derive_key(id_bytes, salt):
|
||||
"""Derive cryptographic key using Argon2 with config parameters"""
|
||||
if not LEGACY_SERVER_SIDE_ENABLED:
|
||||
raise RuntimeError("Legacy server-side mode is disabled (set TNYR_SALT1_HEX and TNYR_SALT2_HEX to enable)")
|
||||
return hash_secret_raw(
|
||||
secret=id_bytes,
|
||||
salt=salt,
|
||||
@@ -184,26 +249,27 @@ def delete_url():
|
||||
updated = False
|
||||
|
||||
with get_db() as conn:
|
||||
# Try to update old server-side encrypted URLs table
|
||||
id_bytes = link_id.encode()
|
||||
lookup_hash_old = derive_key(id_bytes, SALT1).hex()
|
||||
|
||||
# Check if it exists in old table
|
||||
cur = conn.execute(
|
||||
"SELECT 1 FROM urls WHERE lookup_hash = ?",
|
||||
(lookup_hash_old,)
|
||||
)
|
||||
if cur.fetchone():
|
||||
# Re-encrypt the abuse marker using server-side method
|
||||
encryption_key = derive_key(id_bytes, SALT2)
|
||||
iv, encrypted_warning = encrypt_url(encryption_key, ABUSE_WARNING_MARKER)
|
||||
# Try to update old server-side encrypted URLs table (legacy mode)
|
||||
if LEGACY_SERVER_SIDE_ENABLED:
|
||||
id_bytes = link_id.encode()
|
||||
lookup_hash_old = derive_key(id_bytes, SALT1).hex()
|
||||
|
||||
conn.execute(
|
||||
"UPDATE urls SET iv = ?, encrypted_url = ? WHERE lookup_hash = ?",
|
||||
(iv, encrypted_warning, lookup_hash_old)
|
||||
# Check if it exists in old table
|
||||
cur = conn.execute(
|
||||
"SELECT 1 FROM urls WHERE lookup_hash = ?",
|
||||
(lookup_hash_old,)
|
||||
)
|
||||
conn.commit()
|
||||
updated = True
|
||||
if cur.fetchone():
|
||||
# Re-encrypt the abuse marker using server-side method
|
||||
encryption_key = derive_key(id_bytes, SALT2)
|
||||
iv, encrypted_warning = encrypt_url(encryption_key, ABUSE_WARNING_MARKER)
|
||||
|
||||
conn.execute(
|
||||
"UPDATE urls SET iv = ?, encrypted_url = ? WHERE lookup_hash = ?",
|
||||
(iv, encrypted_warning, lookup_hash_old)
|
||||
)
|
||||
conn.commit()
|
||||
updated = True
|
||||
|
||||
# If not found, try to update new client-side encrypted URLs table
|
||||
if not updated:
|
||||
@@ -235,50 +301,11 @@ def delete_url():
|
||||
|
||||
@app.route('/shorten-server', methods=['POST'])
|
||||
def shorten_url_server():
|
||||
data = request.get_json()
|
||||
|
||||
if not data or 'url' not in data:
|
||||
return jsonify({"error": "Missing URL"}), 400
|
||||
|
||||
id = None
|
||||
url = data['url']
|
||||
|
||||
if not (url.startswith('https://') or url.startswith('http://') or url.startswith('magnet:')):
|
||||
url = 'http://' + url
|
||||
|
||||
with get_db() as conn:
|
||||
# Generate unique ID and hash
|
||||
for _ in range(100): # Retry limit
|
||||
id = generate_id()
|
||||
id_bytes = id.encode()
|
||||
|
||||
# Derive lookup hash
|
||||
lookup_hash = derive_key(id_bytes, SALT1).hex()
|
||||
|
||||
# Check for collision
|
||||
cur = conn.execute(
|
||||
"SELECT 1 FROM urls WHERE lookup_hash = ?",
|
||||
(lookup_hash,)
|
||||
)
|
||||
if not cur.fetchone():
|
||||
break
|
||||
else:
|
||||
return jsonify({"error": "Failed to generate unique ID"}), 500
|
||||
|
||||
# Derive encryption key
|
||||
encryption_key = derive_key(id_bytes, SALT2)
|
||||
|
||||
# Encrypt URL
|
||||
iv, encrypted_url = encrypt_url(encryption_key, url)
|
||||
|
||||
# Store in database
|
||||
conn.execute(
|
||||
"INSERT INTO urls (lookup_hash, iv, encrypted_url) VALUES (?, ?, ?)",
|
||||
(lookup_hash, iv, encrypted_url)
|
||||
)
|
||||
conn.commit()
|
||||
|
||||
return jsonify({"id": id}), 201
|
||||
# Legacy shortening is intentionally disabled.
|
||||
# Old /<id> links can still be resolved if legacy salts are configured.
|
||||
return jsonify({
|
||||
"error": "Legacy shortening is disabled. Use the default client-side mode."
|
||||
}), 410
|
||||
|
||||
@app.route('/shorten', methods=['POST'])
|
||||
def shorten_url_client():
|
||||
@@ -339,6 +366,8 @@ def get_encrypted_url():
|
||||
|
||||
@app.route('/<id>') # Still needed for old links
|
||||
def redirect_url(id):
|
||||
if not LEGACY_SERVER_SIDE_ENABLED:
|
||||
return jsonify({"error": "Link not found"}), 404
|
||||
id_bytes = id.encode()
|
||||
|
||||
# Derive lookup hash
|
||||
@@ -367,7 +396,8 @@ def redirect_url(id):
|
||||
# Check if this is an abuse warning
|
||||
if url == ABUSE_WARNING_MARKER:
|
||||
# Redirect to home page with abuse warning hash
|
||||
domain = config.get('domain', {}).get('name', 'tnyr.me')
|
||||
domain = (config.get("domain", {}) or {}).get("name", "") or request.host.split(":")[0] or "tnyr.me"
|
||||
domain = domain.strip()
|
||||
abuse_html = f"""<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
@@ -524,4 +554,5 @@ if __name__ == '__main__':
|
||||
return response
|
||||
|
||||
init_db()
|
||||
app.run(host='0.0.0.0', port=5000)
|
||||
port = int(os.getenv("TNYR_PORT", "5502"))
|
||||
app.run(host='0.0.0.0', port=port)
|
||||
@@ -1,25 +0,0 @@
|
||||
[tool:pytest]
|
||||
testpaths = tests
|
||||
python_files = test_*.py
|
||||
python_classes = Test*
|
||||
python_functions = test_*
|
||||
addopts =
|
||||
-v
|
||||
--tb=short
|
||||
--strict-markers
|
||||
--disable-warnings
|
||||
--cov=main
|
||||
--cov-report=html:htmlcov
|
||||
--cov-report=term-missing
|
||||
--cov-fail-under=80
|
||||
|
||||
markers =
|
||||
slow: marks tests as slow (deselect with '-m "not slow"')
|
||||
integration: marks tests as integration tests
|
||||
crypto: marks tests as cryptographic tests
|
||||
api: marks tests as API tests
|
||||
database: marks tests as database tests
|
||||
|
||||
filterwarnings =
|
||||
ignore::DeprecationWarning
|
||||
ignore::PendingDeprecationWarning
|
||||
@@ -3,10 +3,4 @@ argon2-cffi-bindings==21.2.0
|
||||
cryptography==36.0.2
|
||||
Flask==2.0.3
|
||||
Werkzeug==2.2.2
|
||||
gunicorn
|
||||
|
||||
# Testing dependencies
|
||||
pytest==7.4.3
|
||||
pytest-flask==1.3.0
|
||||
pytest-mock==3.12.0
|
||||
pytest-cov==4.1.0
|
||||
gunicorn
|
||||
@@ -1,58 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test runner script for the URL shortener backend
|
||||
"""
|
||||
|
||||
import sys
|
||||
import subprocess
|
||||
import os
|
||||
|
||||
def run_tests():
|
||||
"""Run the test suite with different options"""
|
||||
|
||||
print("🧪 Running URL Shortener Backend Tests\n")
|
||||
|
||||
# Basic test commands
|
||||
commands = {
|
||||
"all": ["pytest"],
|
||||
"crypto": ["pytest", "-m", "crypto", "tests/test_crypto.py"],
|
||||
"api": ["pytest", "-m", "api", "tests/test_api.py"],
|
||||
"database": ["pytest", "-m", "database", "tests/test_database.py"],
|
||||
"utils": ["pytest", "tests/test_utils.py"],
|
||||
"coverage": ["pytest", "--cov=main", "--cov-report=html", "--cov-report=term"],
|
||||
"fast": ["pytest", "-x", "--tb=line"], # Stop on first failure, short traceback
|
||||
"verbose": ["pytest", "-v", "-s"],
|
||||
}
|
||||
|
||||
# Check if specific test type was requested
|
||||
if len(sys.argv) > 1:
|
||||
test_type = sys.argv[1].lower()
|
||||
if test_type in commands:
|
||||
cmd = commands[test_type]
|
||||
print(f"Running {test_type} tests...")
|
||||
result = subprocess.run(cmd)
|
||||
return result.returncode
|
||||
elif test_type == "help":
|
||||
print("Available test options:")
|
||||
for key, cmd in commands.items():
|
||||
print(f" {key}: {' '.join(cmd)}")
|
||||
return 0
|
||||
else:
|
||||
print(f"Unknown test type: {test_type}")
|
||||
print("Use 'help' to see available options")
|
||||
return 1
|
||||
|
||||
# Run all tests by default
|
||||
print("Running all tests...")
|
||||
result = subprocess.run(commands["all"])
|
||||
return result.returncode
|
||||
|
||||
if __name__ == "__main__":
|
||||
exit_code = run_tests()
|
||||
|
||||
if exit_code == 0:
|
||||
print("\n✅ All tests passed!")
|
||||
else:
|
||||
print(f"\n❌ Tests failed with exit code {exit_code}")
|
||||
|
||||
sys.exit(exit_code)
|
||||
@@ -1,319 +0,0 @@
|
||||
import pytest
|
||||
import json
|
||||
from unittest.mock import patch, Mock
|
||||
from main import app
|
||||
|
||||
|
||||
class TestShortenServerEndpoint:
|
||||
"""Test the /shorten-server endpoint"""
|
||||
|
||||
def test_shorten_server_basic(self, client, sample_urls):
|
||||
"""Test basic URL shortening functionality"""
|
||||
for url in sample_urls:
|
||||
response = client.post('/shorten-server',
|
||||
json={'url': url},
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 201
|
||||
data = json.loads(response.data)
|
||||
assert 'id' in data
|
||||
assert isinstance(data['id'], str)
|
||||
assert len(data['id']) == 10 # From test config
|
||||
|
||||
def test_shorten_server_missing_url(self, client):
|
||||
"""Test error handling when URL is missing"""
|
||||
response = client.post('/shorten-server',
|
||||
json={},
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 400
|
||||
data = json.loads(response.data)
|
||||
assert 'error' in data
|
||||
assert 'Missing URL' in data['error']
|
||||
|
||||
def test_shorten_server_no_json_body(self, client):
|
||||
"""Test error handling when no JSON body is provided"""
|
||||
response = client.post('/shorten-server')
|
||||
|
||||
assert response.status_code == 400
|
||||
# Flask returns HTML error page when Content-Type is not application/json
|
||||
# So we just check the status code for this case
|
||||
|
||||
def test_shorten_server_url_prefix_handling(self, client):
|
||||
"""Test that URLs get proper prefixes"""
|
||||
test_cases = [
|
||||
('google.com', 'http://google.com'),
|
||||
('https://example.com', 'https://example.com'),
|
||||
('http://test.org', 'http://test.org'),
|
||||
('magnet:?xt=urn:btih:example', 'magnet:?xt=urn:btih:example'),
|
||||
]
|
||||
|
||||
for input_url, expected_prefix in test_cases:
|
||||
response = client.post('/shorten-server',
|
||||
json={'url': input_url},
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 201
|
||||
# The URL gets stored encrypted, so we can't directly verify the prefix
|
||||
# But we can verify the request was processed successfully
|
||||
|
||||
def test_shorten_server_duplicate_handling(self, client):
|
||||
"""Test that duplicate URLs get unique IDs"""
|
||||
url = "https://example.com"
|
||||
|
||||
# Create multiple shortened URLs for the same target
|
||||
ids = []
|
||||
for _ in range(5):
|
||||
response = client.post('/shorten-server',
|
||||
json={'url': url},
|
||||
content_type='application/json')
|
||||
assert response.status_code == 201
|
||||
data = json.loads(response.data)
|
||||
ids.append(data['id'])
|
||||
|
||||
# All IDs should be unique
|
||||
assert len(set(ids)) == len(ids)
|
||||
|
||||
@patch('main.generate_id')
|
||||
def test_shorten_server_id_collision_retry(self, mock_generate_id, client):
|
||||
"""Test retry logic when ID collisions occur"""
|
||||
# Mock generate_id to return the same ID twice, then a unique one
|
||||
mock_generate_id.side_effect = ['duplicate', 'duplicate', 'unique123']
|
||||
|
||||
# First request should succeed with 'duplicate' ID
|
||||
response1 = client.post('/shorten-server',
|
||||
json={'url': 'https://first.com'},
|
||||
content_type='application/json')
|
||||
assert response1.status_code == 201
|
||||
|
||||
# Second request should retry and get 'unique123'
|
||||
response2 = client.post('/shorten-server',
|
||||
json={'url': 'https://second.com'},
|
||||
content_type='application/json')
|
||||
assert response2.status_code == 201
|
||||
data = json.loads(response2.data)
|
||||
assert data['id'] == 'unique123'
|
||||
|
||||
@patch('main.generate_id')
|
||||
def test_shorten_server_max_retries_exceeded(self, mock_generate_id, client):
|
||||
"""Test error when max retries for unique ID is exceeded"""
|
||||
# Mock generate_id to always return the same ID
|
||||
mock_generate_id.return_value = 'always_same'
|
||||
|
||||
# First request should succeed
|
||||
response1 = client.post('/shorten-server',
|
||||
json={'url': 'https://first.com'},
|
||||
content_type='application/json')
|
||||
assert response1.status_code == 201
|
||||
|
||||
# Second request should fail after max retries
|
||||
response2 = client.post('/shorten-server',
|
||||
json={'url': 'https://second.com'},
|
||||
content_type='application/json')
|
||||
assert response2.status_code == 500
|
||||
data = json.loads(response2.data)
|
||||
assert 'Failed to generate unique ID' in data['error']
|
||||
|
||||
|
||||
class TestShortenClientEndpoint:
|
||||
"""Test the /shorten (client-side) endpoint"""
|
||||
|
||||
def test_shorten_client_basic(self, client, sample_encrypted_data):
|
||||
"""Test basic client-side URL shortening"""
|
||||
response = client.post('/shorten',
|
||||
json=sample_encrypted_data,
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 201
|
||||
data = json.loads(response.data)
|
||||
assert 'message' in data
|
||||
assert 'successfully' in data['message']
|
||||
|
||||
def test_shorten_client_missing_fields(self, client):
|
||||
"""Test error handling when required fields are missing"""
|
||||
incomplete_data = {
|
||||
'LOOKUP_HASH': 'abcd1234',
|
||||
'IV': '22222222222222222222222222222222',
|
||||
# Missing ENCRYTION_SALT and ENCRYPTED_URL
|
||||
}
|
||||
|
||||
response = client.post('/shorten',
|
||||
json=incomplete_data,
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 400
|
||||
data = json.loads(response.data)
|
||||
assert 'Missing fields' in data['error']
|
||||
|
||||
def test_shorten_client_invalid_hex(self, client):
|
||||
"""Test error handling with invalid hex data"""
|
||||
invalid_data = {
|
||||
'LOOKUP_HASH': 'abcd1234',
|
||||
'ENCRYTION_SALT': 'invalid_hex_string',
|
||||
'IV': '22222222222222222222222222222222',
|
||||
'ENCRYPTED_URL': '33333333333333333333333333333333'
|
||||
}
|
||||
|
||||
response = client.post('/shorten',
|
||||
json=invalid_data,
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 400
|
||||
data = json.loads(response.data)
|
||||
assert 'Invalid hex format' in data['error']
|
||||
|
||||
def test_shorten_client_duplicate_hash(self, client, sample_encrypted_data):
|
||||
"""Test error handling when lookup hash already exists"""
|
||||
# First request should succeed
|
||||
response1 = client.post('/shorten',
|
||||
json=sample_encrypted_data,
|
||||
content_type='application/json')
|
||||
assert response1.status_code == 201
|
||||
|
||||
# Second request with same lookup hash should fail
|
||||
response2 = client.post('/shorten',
|
||||
json=sample_encrypted_data,
|
||||
content_type='application/json')
|
||||
assert response2.status_code == 409
|
||||
data = json.loads(response2.data)
|
||||
assert 'already exists' in data['error']
|
||||
|
||||
|
||||
class TestGetEncryptedUrlEndpoint:
|
||||
"""Test the /get-encrypted-url endpoint"""
|
||||
|
||||
def test_get_encrypted_url_basic(self, client, sample_encrypted_data):
|
||||
"""Test basic encrypted URL retrieval"""
|
||||
# First, store a URL
|
||||
client.post('/shorten',
|
||||
json=sample_encrypted_data,
|
||||
content_type='application/json')
|
||||
|
||||
# Then retrieve it
|
||||
response = client.get(f'/get-encrypted-url?lookup_hash={sample_encrypted_data["LOOKUP_HASH"]}')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = json.loads(response.data)
|
||||
assert 'ENCRYTION_SALT' in data
|
||||
assert 'IV' in data
|
||||
assert 'ENCRYPTED_URL' in data
|
||||
assert data['ENCRYTION_SALT'] == sample_encrypted_data['ENCRYTION_SALT']
|
||||
assert data['IV'] == sample_encrypted_data['IV']
|
||||
assert data['ENCRYPTED_URL'] == sample_encrypted_data['ENCRYPTED_URL']
|
||||
|
||||
def test_get_encrypted_url_missing_parameter(self, client):
|
||||
"""Test error handling when lookup_hash parameter is missing"""
|
||||
response = client.get('/get-encrypted-url')
|
||||
|
||||
assert response.status_code == 400
|
||||
data = json.loads(response.data)
|
||||
assert 'Missing lookup_hash parameter' in data['error']
|
||||
|
||||
def test_get_encrypted_url_not_found(self, client):
|
||||
"""Test error handling when lookup hash doesn't exist"""
|
||||
response = client.get('/get-encrypted-url?lookup_hash=nonexistent')
|
||||
|
||||
assert response.status_code == 404
|
||||
data = json.loads(response.data)
|
||||
assert 'Link not found' in data['error']
|
||||
|
||||
|
||||
class TestRedirectEndpoint:
|
||||
"""Test the /<id> redirect endpoint"""
|
||||
|
||||
def test_redirect_basic(self, client):
|
||||
"""Test basic URL redirection"""
|
||||
# First, create a shortened URL using server-side endpoint
|
||||
original_url = "https://example.com"
|
||||
response = client.post('/shorten-server',
|
||||
json={'url': original_url},
|
||||
content_type='application/json')
|
||||
assert response.status_code == 201
|
||||
|
||||
data = json.loads(response.data)
|
||||
short_id = data['id']
|
||||
|
||||
# Then test redirection
|
||||
response = client.get(f'/{short_id}')
|
||||
assert response.status_code == 302
|
||||
assert response.location == original_url
|
||||
|
||||
def test_redirect_not_found(self, client):
|
||||
"""Test error handling when short ID doesn't exist"""
|
||||
response = client.get('/nonexistent123')
|
||||
|
||||
assert response.status_code == 404
|
||||
data = json.loads(response.data)
|
||||
assert 'Link not found' in data['error']
|
||||
|
||||
def test_redirect_various_urls(self, client, sample_urls):
|
||||
"""Test redirection with various URL types"""
|
||||
for original_url in sample_urls:
|
||||
# Create shortened URL
|
||||
response = client.post('/shorten-server',
|
||||
json={'url': original_url},
|
||||
content_type='application/json')
|
||||
assert response.status_code == 201
|
||||
|
||||
data = json.loads(response.data)
|
||||
short_id = data['id']
|
||||
|
||||
# Test redirection
|
||||
response = client.get(f'/{short_id}')
|
||||
assert response.status_code == 302
|
||||
|
||||
# For URLs that get prefixed, check the prefixed version
|
||||
expected_url = original_url
|
||||
if not (original_url.startswith('https://') or
|
||||
original_url.startswith('http://') or
|
||||
original_url.startswith('magnet:')):
|
||||
expected_url = 'http://' + original_url
|
||||
|
||||
assert response.location == expected_url
|
||||
|
||||
|
||||
class TestStaticFileEndpoints:
|
||||
"""Test static file serving endpoints"""
|
||||
|
||||
def test_serve_react_app(self, client):
|
||||
"""Test serving the React app"""
|
||||
response = client.get('/')
|
||||
# We can't test the actual file content without the dist folder
|
||||
# But we can test that the route exists and doesn't error
|
||||
assert response.status_code in [200, 404] # 404 if dist/index.html doesn't exist
|
||||
|
||||
def test_serve_robots_txt(self, client):
|
||||
"""Test serving robots.txt"""
|
||||
response = client.get('/robots.txt')
|
||||
assert response.status_code in [200, 404] # 404 if file doesn't exist
|
||||
|
||||
def test_serve_sitemap_xml(self, client):
|
||||
"""Test serving sitemap.xml"""
|
||||
response = client.get('/sitemap.xml')
|
||||
assert response.status_code in [200, 404] # 404 if file doesn't exist
|
||||
|
||||
|
||||
class TestErrorHandling:
|
||||
"""Test general error handling"""
|
||||
|
||||
def test_invalid_json(self, client):
|
||||
"""Test handling of invalid JSON in requests"""
|
||||
response = client.post('/shorten-server',
|
||||
data='invalid json',
|
||||
content_type='application/json')
|
||||
|
||||
# Flask should handle this and return 400
|
||||
assert response.status_code == 400
|
||||
|
||||
def test_unsupported_http_methods(self, client):
|
||||
"""Test that unsupported HTTP methods are rejected"""
|
||||
# GET on POST-only endpoints - Flask returns 404 for unmatched routes
|
||||
response = client.get('/shorten-server')
|
||||
assert response.status_code in [404, 405] # Flask may return 404 or 405
|
||||
|
||||
response = client.get('/shorten')
|
||||
assert response.status_code in [404, 405]
|
||||
|
||||
# POST on GET-only endpoints
|
||||
response = client.post('/get-encrypted-url')
|
||||
assert response.status_code in [404, 405]
|
||||
@@ -1,111 +0,0 @@
|
||||
import os
|
||||
import tempfile
|
||||
import pytest
|
||||
import sqlite3
|
||||
import sys
|
||||
from unittest.mock import patch
|
||||
import json
|
||||
|
||||
# Add parent directory to path so we can import main
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
||||
# Test configuration to avoid using production config
|
||||
TEST_CONFIG = {
|
||||
"salts": {
|
||||
"salt1_var": "11111111111111111111111111111111",
|
||||
"salt2_var": "22222222222222222222222222222222"
|
||||
},
|
||||
"argon2": {
|
||||
"time_cost": 1, # Reduced for faster tests
|
||||
"memory_cost": 1024, # Reduced for faster tests
|
||||
"parallelism": 1,
|
||||
"hash_length": 32
|
||||
},
|
||||
"database": {
|
||||
"path": ":memory:" # In-memory database for tests
|
||||
},
|
||||
"id_generation": {
|
||||
"length": 10,
|
||||
"allowed_chars": "abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ123456789"
|
||||
}
|
||||
}
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def test_config():
|
||||
"""Provide test configuration"""
|
||||
return TEST_CONFIG.copy()
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def mock_config(test_config):
|
||||
"""Mock the config loading in main.py"""
|
||||
with patch('main.config', test_config):
|
||||
with patch('main.SALT1', bytes.fromhex(test_config['salts']['salt1_var'])):
|
||||
with patch('main.SALT2', bytes.fromhex(test_config['salts']['salt2_var'])):
|
||||
yield test_config
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def app(mock_config):
|
||||
"""Create and configure a test Flask app"""
|
||||
# Import after mocking config
|
||||
from main import app, init_db
|
||||
|
||||
app.config.update({
|
||||
"TESTING": True,
|
||||
})
|
||||
|
||||
# Mock get_db to use in-memory database
|
||||
with patch('main.get_db') as mock_get_db:
|
||||
conn = sqlite3.connect(':memory:')
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Create schema using absolute path
|
||||
schema_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'schema.sql')
|
||||
with open(schema_path, 'r') as f:
|
||||
conn.executescript(f.read())
|
||||
|
||||
mock_get_db.return_value = conn
|
||||
|
||||
with app.app_context():
|
||||
yield app
|
||||
|
||||
conn.close()
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def client(app):
|
||||
"""Create a test client for the Flask app"""
|
||||
return app.test_client()
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
def test_db_connection(mock_config):
|
||||
"""Provide a test database connection"""
|
||||
conn = sqlite3.connect(':memory:')
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Create schema using absolute path
|
||||
schema_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'schema.sql')
|
||||
with open(schema_path, 'r') as f:
|
||||
conn.executescript(f.read())
|
||||
|
||||
yield conn
|
||||
conn.close()
|
||||
|
||||
@pytest.fixture
|
||||
def sample_urls():
|
||||
"""Provide sample URLs for testing"""
|
||||
return [
|
||||
"https://google.com",
|
||||
"http://example.com",
|
||||
"https://github.com/user/repo",
|
||||
"magnet:?xt=urn:btih:example",
|
||||
"domain.com", # Should get http:// prefix
|
||||
]
|
||||
|
||||
@pytest.fixture
|
||||
def sample_encrypted_data():
|
||||
"""Provide sample encrypted data for testing"""
|
||||
return {
|
||||
"LOOKUP_HASH": "abcd1234efgh5678",
|
||||
"ENCRYTION_SALT": "11111111111111111111111111111111",
|
||||
"IV": "22222222222222222222222222222222",
|
||||
"ENCRYPTED_URL": "33333333333333333333333333333333"
|
||||
}
|
||||
@@ -1,324 +0,0 @@
|
||||
import pytest
|
||||
import json
|
||||
import sys
|
||||
import os
|
||||
from unittest.mock import patch, Mock
|
||||
|
||||
# Add parent directory to path so we can import main
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from main import app
|
||||
|
||||
|
||||
class TestShortenServerEndpoint:
|
||||
"""Test the /shorten-server endpoint"""
|
||||
|
||||
def test_shorten_server_basic(self, client, sample_urls):
|
||||
"""Test basic URL shortening functionality"""
|
||||
for url in sample_urls:
|
||||
response = client.post('/shorten-server',
|
||||
json={'url': url},
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 201
|
||||
data = json.loads(response.data)
|
||||
assert 'id' in data
|
||||
assert isinstance(data['id'], str)
|
||||
assert len(data['id']) == 10 # From test config
|
||||
|
||||
def test_shorten_server_missing_url(self, client):
|
||||
"""Test error handling when URL is missing"""
|
||||
response = client.post('/shorten-server',
|
||||
json={},
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 400
|
||||
data = json.loads(response.data)
|
||||
assert 'error' in data
|
||||
assert 'Missing URL' in data['error']
|
||||
|
||||
def test_shorten_server_no_json_body(self, client):
|
||||
"""Test error handling when no JSON body is provided"""
|
||||
response = client.post('/shorten-server')
|
||||
|
||||
assert response.status_code == 400
|
||||
# Flask returns HTML error page when Content-Type is not application/json
|
||||
# So we just check the status code for this case
|
||||
|
||||
def test_shorten_server_url_prefix_handling(self, client):
|
||||
"""Test that URLs get proper prefixes"""
|
||||
test_cases = [
|
||||
('google.com', 'http://google.com'),
|
||||
('https://example.com', 'https://example.com'),
|
||||
('http://test.org', 'http://test.org'),
|
||||
('magnet:?xt=urn:btih:example', 'magnet:?xt=urn:btih:example'),
|
||||
]
|
||||
|
||||
for input_url, expected_prefix in test_cases:
|
||||
response = client.post('/shorten-server',
|
||||
json={'url': input_url},
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 201
|
||||
# The URL gets stored encrypted, so we can't directly verify the prefix
|
||||
# But we can verify the request was processed successfully
|
||||
|
||||
def test_shorten_server_duplicate_handling(self, client):
|
||||
"""Test that duplicate URLs get unique IDs"""
|
||||
url = "https://example.com"
|
||||
|
||||
# Create multiple shortened URLs for the same target
|
||||
ids = []
|
||||
for _ in range(5):
|
||||
response = client.post('/shorten-server',
|
||||
json={'url': url},
|
||||
content_type='application/json')
|
||||
assert response.status_code == 201
|
||||
data = json.loads(response.data)
|
||||
ids.append(data['id'])
|
||||
|
||||
# All IDs should be unique
|
||||
assert len(set(ids)) == len(ids)
|
||||
|
||||
@patch('main.generate_id')
|
||||
def test_shorten_server_id_collision_retry(self, mock_generate_id, client):
|
||||
"""Test retry logic when ID collisions occur"""
|
||||
# Mock generate_id to return the same ID twice, then a unique one
|
||||
mock_generate_id.side_effect = ['duplicate', 'duplicate', 'unique123']
|
||||
|
||||
# First request should succeed with 'duplicate' ID
|
||||
response1 = client.post('/shorten-server',
|
||||
json={'url': 'https://first.com'},
|
||||
content_type='application/json')
|
||||
assert response1.status_code == 201
|
||||
|
||||
# Second request should retry and get 'unique123'
|
||||
response2 = client.post('/shorten-server',
|
||||
json={'url': 'https://second.com'},
|
||||
content_type='application/json')
|
||||
assert response2.status_code == 201
|
||||
data = json.loads(response2.data)
|
||||
assert data['id'] == 'unique123'
|
||||
|
||||
@patch('main.generate_id')
|
||||
def test_shorten_server_max_retries_exceeded(self, mock_generate_id, client):
|
||||
"""Test error when max retries for unique ID is exceeded"""
|
||||
# Mock generate_id to always return the same ID
|
||||
mock_generate_id.return_value = 'always_same'
|
||||
|
||||
# First request should succeed
|
||||
response1 = client.post('/shorten-server',
|
||||
json={'url': 'https://first.com'},
|
||||
content_type='application/json')
|
||||
assert response1.status_code == 201
|
||||
|
||||
# Second request should fail after max retries
|
||||
response2 = client.post('/shorten-server',
|
||||
json={'url': 'https://second.com'},
|
||||
content_type='application/json')
|
||||
assert response2.status_code == 500
|
||||
data = json.loads(response2.data)
|
||||
assert 'Failed to generate unique ID' in data['error']
|
||||
|
||||
|
||||
class TestShortenClientEndpoint:
|
||||
"""Test the /shorten (client-side) endpoint"""
|
||||
|
||||
def test_shorten_client_basic(self, client, sample_encrypted_data):
|
||||
"""Test basic client-side URL shortening"""
|
||||
response = client.post('/shorten',
|
||||
json=sample_encrypted_data,
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 201
|
||||
data = json.loads(response.data)
|
||||
assert 'message' in data
|
||||
assert 'successfully' in data['message']
|
||||
|
||||
def test_shorten_client_missing_fields(self, client):
|
||||
"""Test error handling when required fields are missing"""
|
||||
incomplete_data = {
|
||||
'LOOKUP_HASH': 'abcd1234',
|
||||
'IV': '22222222222222222222222222222222',
|
||||
# Missing ENCRYTION_SALT and ENCRYPTED_URL
|
||||
}
|
||||
|
||||
response = client.post('/shorten',
|
||||
json=incomplete_data,
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 400
|
||||
data = json.loads(response.data)
|
||||
assert 'Missing fields' in data['error']
|
||||
|
||||
def test_shorten_client_invalid_hex(self, client):
|
||||
"""Test error handling with invalid hex data"""
|
||||
invalid_data = {
|
||||
'LOOKUP_HASH': 'abcd1234',
|
||||
'ENCRYTION_SALT': 'invalid_hex_string',
|
||||
'IV': '22222222222222222222222222222222',
|
||||
'ENCRYPTED_URL': '33333333333333333333333333333333'
|
||||
}
|
||||
|
||||
response = client.post('/shorten',
|
||||
json=invalid_data,
|
||||
content_type='application/json')
|
||||
|
||||
assert response.status_code == 400
|
||||
data = json.loads(response.data)
|
||||
assert 'Invalid hex format' in data['error']
|
||||
|
||||
def test_shorten_client_duplicate_hash(self, client, sample_encrypted_data):
|
||||
"""Test error handling when lookup hash already exists"""
|
||||
# First request should succeed
|
||||
response1 = client.post('/shorten',
|
||||
json=sample_encrypted_data,
|
||||
content_type='application/json')
|
||||
assert response1.status_code == 201
|
||||
|
||||
# Second request with same lookup hash should fail
|
||||
response2 = client.post('/shorten',
|
||||
json=sample_encrypted_data,
|
||||
content_type='application/json')
|
||||
assert response2.status_code == 409
|
||||
data = json.loads(response2.data)
|
||||
assert 'already exists' in data['error']
|
||||
|
||||
|
||||
class TestGetEncryptedUrlEndpoint:
|
||||
"""Test the /get-encrypted-url endpoint"""
|
||||
|
||||
def test_get_encrypted_url_basic(self, client, sample_encrypted_data):
|
||||
"""Test basic encrypted URL retrieval"""
|
||||
# First, store a URL
|
||||
client.post('/shorten',
|
||||
json=sample_encrypted_data,
|
||||
content_type='application/json')
|
||||
|
||||
# Then retrieve it
|
||||
response = client.get(f'/get-encrypted-url?lookup_hash={sample_encrypted_data["LOOKUP_HASH"]}')
|
||||
|
||||
assert response.status_code == 200
|
||||
data = json.loads(response.data)
|
||||
assert 'ENCRYTION_SALT' in data
|
||||
assert 'IV' in data
|
||||
assert 'ENCRYPTED_URL' in data
|
||||
assert data['ENCRYTION_SALT'] == sample_encrypted_data['ENCRYTION_SALT']
|
||||
assert data['IV'] == sample_encrypted_data['IV']
|
||||
assert data['ENCRYPTED_URL'] == sample_encrypted_data['ENCRYPTED_URL']
|
||||
|
||||
def test_get_encrypted_url_missing_parameter(self, client):
|
||||
"""Test error handling when lookup_hash parameter is missing"""
|
||||
response = client.get('/get-encrypted-url')
|
||||
|
||||
assert response.status_code == 400
|
||||
data = json.loads(response.data)
|
||||
assert 'Missing lookup_hash parameter' in data['error']
|
||||
|
||||
def test_get_encrypted_url_not_found(self, client):
|
||||
"""Test error handling when lookup hash doesn't exist"""
|
||||
response = client.get('/get-encrypted-url?lookup_hash=nonexistent')
|
||||
|
||||
assert response.status_code == 404
|
||||
data = json.loads(response.data)
|
||||
assert 'Link not found' in data['error']
|
||||
|
||||
|
||||
class TestRedirectEndpoint:
|
||||
"""Test the /<id> redirect endpoint"""
|
||||
|
||||
def test_redirect_basic(self, client):
|
||||
"""Test basic URL redirection"""
|
||||
# First, create a shortened URL using server-side endpoint
|
||||
original_url = "https://example.com"
|
||||
response = client.post('/shorten-server',
|
||||
json={'url': original_url},
|
||||
content_type='application/json')
|
||||
assert response.status_code == 201
|
||||
|
||||
data = json.loads(response.data)
|
||||
short_id = data['id']
|
||||
|
||||
# Then test redirection
|
||||
response = client.get(f'/{short_id}')
|
||||
assert response.status_code == 302
|
||||
assert response.location == original_url
|
||||
|
||||
def test_redirect_not_found(self, client):
|
||||
"""Test error handling when short ID doesn't exist"""
|
||||
response = client.get('/nonexistent123')
|
||||
|
||||
assert response.status_code == 404
|
||||
data = json.loads(response.data)
|
||||
assert 'Link not found' in data['error']
|
||||
|
||||
def test_redirect_various_urls(self, client, sample_urls):
|
||||
"""Test redirection with various URL types"""
|
||||
for original_url in sample_urls:
|
||||
# Create shortened URL
|
||||
response = client.post('/shorten-server',
|
||||
json={'url': original_url},
|
||||
content_type='application/json')
|
||||
assert response.status_code == 201
|
||||
|
||||
data = json.loads(response.data)
|
||||
short_id = data['id']
|
||||
|
||||
# Test redirection
|
||||
response = client.get(f'/{short_id}')
|
||||
assert response.status_code == 302
|
||||
|
||||
# For URLs that get prefixed, check the prefixed version
|
||||
expected_url = original_url
|
||||
if not (original_url.startswith('https://') or
|
||||
original_url.startswith('http://') or
|
||||
original_url.startswith('magnet:')):
|
||||
expected_url = 'http://' + original_url
|
||||
|
||||
assert response.location == expected_url
|
||||
|
||||
|
||||
class TestStaticFileEndpoints:
|
||||
"""Test static file serving endpoints"""
|
||||
|
||||
def test_serve_react_app(self, client):
|
||||
"""Test serving the React app"""
|
||||
response = client.get('/')
|
||||
# We can't test the actual file content without the dist folder
|
||||
# But we can test that the route exists and doesn't error
|
||||
assert response.status_code in [200, 404] # 404 if dist/index.html doesn't exist
|
||||
|
||||
def test_serve_robots_txt(self, client):
|
||||
"""Test serving robots.txt"""
|
||||
response = client.get('/robots.txt')
|
||||
assert response.status_code in [200, 404] # 404 if file doesn't exist
|
||||
|
||||
def test_serve_sitemap_xml(self, client):
|
||||
"""Test serving sitemap.xml"""
|
||||
response = client.get('/sitemap.xml')
|
||||
assert response.status_code in [200, 404] # 404 if file doesn't exist
|
||||
|
||||
|
||||
class TestErrorHandling:
|
||||
"""Test general error handling"""
|
||||
|
||||
def test_invalid_json(self, client):
|
||||
"""Test handling of invalid JSON in requests"""
|
||||
response = client.post('/shorten-server',
|
||||
data='invalid json',
|
||||
content_type='application/json')
|
||||
|
||||
# Flask should handle this and return 400
|
||||
assert response.status_code == 400
|
||||
|
||||
def test_unsupported_http_methods(self, client):
|
||||
"""Test that unsupported HTTP methods are rejected"""
|
||||
# GET on POST-only endpoints - Flask returns 404 for unmatched routes
|
||||
response = client.get('/shorten-server')
|
||||
assert response.status_code in [404, 405] # Flask may return 404 or 405
|
||||
|
||||
response = client.get('/shorten')
|
||||
assert response.status_code in [404, 405]
|
||||
|
||||
# POST on GET-only endpoints
|
||||
response = client.post('/get-encrypted-url')
|
||||
assert response.status_code in [404, 405]
|
||||
@@ -1,207 +0,0 @@
|
||||
import pytest
|
||||
import os
|
||||
import sys
|
||||
from unittest.mock import patch
|
||||
|
||||
# Add parent directory to path so we can import main
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from main import encrypt_url, decrypt_url, derive_key
|
||||
from argon2.low_level import Type
|
||||
|
||||
|
||||
class TestCryptographicFunctions:
|
||||
"""Test all cryptographic functions for security and correctness"""
|
||||
|
||||
def test_derive_key_basic(self, mock_config):
|
||||
"""Test basic key derivation functionality"""
|
||||
id_bytes = b"test_id"
|
||||
salt = bytes.fromhex("11111111111111111111111111111111")
|
||||
|
||||
key = derive_key(id_bytes, salt)
|
||||
|
||||
assert len(key) == 32 # Should be 32 bytes for AES-256
|
||||
assert isinstance(key, bytes)
|
||||
|
||||
def test_derive_key_deterministic(self, mock_config):
|
||||
"""Test that key derivation is deterministic"""
|
||||
id_bytes = b"test_id"
|
||||
salt = bytes.fromhex("11111111111111111111111111111111")
|
||||
|
||||
key1 = derive_key(id_bytes, salt)
|
||||
key2 = derive_key(id_bytes, salt)
|
||||
|
||||
assert key1 == key2
|
||||
|
||||
def test_derive_key_different_inputs_different_outputs(self, mock_config):
|
||||
"""Test that different inputs produce different keys"""
|
||||
salt = bytes.fromhex("11111111111111111111111111111111")
|
||||
|
||||
key1 = derive_key(b"id1", salt)
|
||||
key2 = derive_key(b"id2", salt)
|
||||
key3 = derive_key(b"id1", bytes.fromhex("22222222222222222222222222222222"))
|
||||
|
||||
assert key1 != key2
|
||||
assert key1 != key3
|
||||
assert key2 != key3
|
||||
|
||||
def test_encrypt_url_basic(self):
|
||||
"""Test basic URL encryption"""
|
||||
key = os.urandom(32)
|
||||
plaintext = "https://example.com"
|
||||
|
||||
iv, ciphertext = encrypt_url(key, plaintext)
|
||||
|
||||
assert len(iv) == 16 # AES block size
|
||||
assert len(ciphertext) > 0
|
||||
assert isinstance(iv, bytes)
|
||||
assert isinstance(ciphertext, bytes)
|
||||
|
||||
def test_encrypt_url_invalid_key_length(self):
|
||||
"""Test encryption with invalid key length"""
|
||||
invalid_key = os.urandom(16) # Wrong length
|
||||
plaintext = "https://example.com"
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid key length"):
|
||||
encrypt_url(invalid_key, plaintext)
|
||||
|
||||
def test_decrypt_url_basic(self):
|
||||
"""Test basic URL decryption"""
|
||||
key = os.urandom(32)
|
||||
original_url = "https://example.com"
|
||||
|
||||
iv, ciphertext = encrypt_url(key, original_url)
|
||||
decrypted_url = decrypt_url(key, iv, ciphertext)
|
||||
|
||||
assert decrypted_url == original_url
|
||||
|
||||
def test_decrypt_url_invalid_key_length(self):
|
||||
"""Test decryption with invalid key length"""
|
||||
key = os.urandom(32)
|
||||
original_url = "https://example.com"
|
||||
|
||||
iv, ciphertext = encrypt_url(key, original_url)
|
||||
invalid_key = os.urandom(16) # Wrong length
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid key length"):
|
||||
decrypt_url(invalid_key, iv, ciphertext)
|
||||
|
||||
def test_encrypt_decrypt_roundtrip(self):
|
||||
"""Test complete encryption/decryption roundtrip"""
|
||||
key = os.urandom(32)
|
||||
test_urls = [
|
||||
"https://google.com",
|
||||
"http://example.com/path?param=value",
|
||||
"https://github.com/user/repo/issues/123",
|
||||
"magnet:?xt=urn:btih:example123456789",
|
||||
"https://very-long-domain-name.example.com/very/long/path/with/many/segments?lots=of&query=parameters&more=stuff"
|
||||
]
|
||||
|
||||
for url in test_urls:
|
||||
iv, ciphertext = encrypt_url(key, url)
|
||||
decrypted_url = decrypt_url(key, iv, ciphertext)
|
||||
assert decrypted_url == url
|
||||
|
||||
def test_encrypt_same_url_different_iv(self):
|
||||
"""Test that encrypting the same URL twice produces different ciphertexts (due to random IV)"""
|
||||
key = os.urandom(32)
|
||||
url = "https://example.com"
|
||||
|
||||
iv1, ciphertext1 = encrypt_url(key, url)
|
||||
iv2, ciphertext2 = encrypt_url(key, url)
|
||||
|
||||
assert iv1 != iv2
|
||||
assert ciphertext1 != ciphertext2
|
||||
|
||||
# But both should decrypt to the same URL
|
||||
assert decrypt_url(key, iv1, ciphertext1) == url
|
||||
assert decrypt_url(key, iv2, ciphertext2) == url
|
||||
|
||||
def test_decrypt_wrong_key(self):
|
||||
"""Test decryption with wrong key fails appropriately"""
|
||||
key1 = os.urandom(32)
|
||||
key2 = os.urandom(32)
|
||||
url = "https://example.com"
|
||||
|
||||
iv, ciphertext = encrypt_url(key1, url)
|
||||
|
||||
with pytest.raises(Exception): # Could be various crypto exceptions
|
||||
decrypt_url(key2, iv, ciphertext)
|
||||
|
||||
def test_decrypt_corrupted_iv(self):
|
||||
"""Test decryption with corrupted IV fails"""
|
||||
key = os.urandom(32)
|
||||
url = "https://example.com"
|
||||
|
||||
iv, ciphertext = encrypt_url(key, url)
|
||||
corrupted_iv = os.urandom(16) # Different IV
|
||||
|
||||
with pytest.raises(Exception):
|
||||
decrypt_url(key, corrupted_iv, ciphertext)
|
||||
|
||||
def test_decrypt_corrupted_ciphertext(self):
|
||||
"""Test decryption with corrupted ciphertext fails"""
|
||||
key = os.urandom(32)
|
||||
url = "https://example.com"
|
||||
|
||||
iv, ciphertext = encrypt_url(key, url)
|
||||
corrupted_ciphertext = os.urandom(len(ciphertext))
|
||||
|
||||
with pytest.raises(Exception):
|
||||
decrypt_url(key, iv, corrupted_ciphertext)
|
||||
|
||||
def test_unicode_url_handling(self):
|
||||
"""Test handling of Unicode characters in URLs"""
|
||||
key = os.urandom(32)
|
||||
unicode_url = "https://example.com/测试/ñoño?查询=参数"
|
||||
|
||||
iv, ciphertext = encrypt_url(key, unicode_url)
|
||||
decrypted_url = decrypt_url(key, iv, ciphertext)
|
||||
|
||||
assert decrypted_url == unicode_url
|
||||
|
||||
def test_empty_url_handling(self):
|
||||
"""Test handling of edge case inputs"""
|
||||
key = os.urandom(32)
|
||||
|
||||
# Empty string
|
||||
iv, ciphertext = encrypt_url(key, "")
|
||||
decrypted_url = decrypt_url(key, iv, ciphertext)
|
||||
assert decrypted_url == ""
|
||||
|
||||
# Very short URL
|
||||
short_url = "a"
|
||||
iv, ciphertext = encrypt_url(key, short_url)
|
||||
decrypted_url = decrypt_url(key, iv, ciphertext)
|
||||
assert decrypted_url == short_url
|
||||
|
||||
def test_key_derivation_with_production_config(self):
|
||||
"""Test key derivation matches expected behavior with actual config values"""
|
||||
# This test uses more realistic Argon2 parameters
|
||||
from argon2.low_level import hash_secret_raw, Type
|
||||
|
||||
id_bytes = b"testid1234"
|
||||
salt = bytes.fromhex("85ddce7d130a6f1beba59fc8ba2a715d")
|
||||
|
||||
key = hash_secret_raw(
|
||||
secret=id_bytes,
|
||||
salt=salt,
|
||||
time_cost=3,
|
||||
memory_cost=65536,
|
||||
parallelism=1,
|
||||
hash_len=32,
|
||||
type=Type.ID
|
||||
)
|
||||
|
||||
assert len(key) == 32
|
||||
assert isinstance(key, bytes)
|
||||
# Key should be deterministic
|
||||
key2 = hash_secret_raw(
|
||||
secret=id_bytes,
|
||||
salt=salt,
|
||||
time_cost=3,
|
||||
memory_cost=65536,
|
||||
parallelism=1,
|
||||
hash_len=32,
|
||||
type=Type.ID
|
||||
)
|
||||
assert key == key2
|
||||
@@ -1,307 +0,0 @@
|
||||
import pytest
|
||||
import sqlite3
|
||||
import os
|
||||
import sys
|
||||
from contextlib import closing
|
||||
from unittest.mock import patch
|
||||
|
||||
# Add parent directory to path so we can import main
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from main import get_db, init_db
|
||||
|
||||
|
||||
class TestDatabaseOperations:
|
||||
"""Test database operations and schema"""
|
||||
|
||||
def test_database_schema_creation(self, test_db_connection):
|
||||
"""Test that the database schema is created correctly"""
|
||||
conn = test_db_connection
|
||||
|
||||
# Check that both tables exist
|
||||
cursor = conn.execute("""
|
||||
SELECT name FROM sqlite_master
|
||||
WHERE type='table' AND name IN ('urls', 'client_side_urls')
|
||||
""")
|
||||
tables = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
assert 'urls' in tables
|
||||
assert 'client_side_urls' in tables
|
||||
|
||||
def test_urls_table_structure(self, test_db_connection):
|
||||
"""Test the structure of the urls table"""
|
||||
conn = test_db_connection
|
||||
|
||||
cursor = conn.execute("PRAGMA table_info(urls)")
|
||||
columns = {row[1]: row[2] for row in cursor.fetchall()}
|
||||
|
||||
assert 'lookup_hash' in columns
|
||||
assert 'iv' in columns
|
||||
assert 'encrypted_url' in columns
|
||||
assert columns['lookup_hash'] == 'TEXT'
|
||||
assert columns['iv'] == 'BLOB'
|
||||
assert columns['encrypted_url'] == 'BLOB'
|
||||
|
||||
def test_client_side_urls_table_structure(self, test_db_connection):
|
||||
"""Test the structure of the client_side_urls table"""
|
||||
conn = test_db_connection
|
||||
|
||||
cursor = conn.execute("PRAGMA table_info(client_side_urls)")
|
||||
columns = {row[1]: row[2] for row in cursor.fetchall()}
|
||||
|
||||
assert 'lookup_hash' in columns
|
||||
assert 'encryption_salt' in columns
|
||||
assert 'iv' in columns
|
||||
assert 'encrypted_url' in columns
|
||||
assert columns['lookup_hash'] == 'TEXT'
|
||||
assert columns['encryption_salt'] == 'BLOB'
|
||||
assert columns['iv'] == 'BLOB'
|
||||
assert columns['encrypted_url'] == 'BLOB'
|
||||
|
||||
def test_primary_key_constraints(self, test_db_connection):
|
||||
"""Test that primary key constraints work correctly"""
|
||||
conn = test_db_connection
|
||||
|
||||
# Test urls table primary key
|
||||
conn.execute("""
|
||||
INSERT INTO urls (lookup_hash, iv, encrypted_url)
|
||||
VALUES ('test_hash', ?, ?)
|
||||
""", (b'\x12\x34\x56\x78\x90\xab\xcd\xef', b'\xfe\xdc\xba\x09\x87\x65\x43\x21'))
|
||||
|
||||
# Attempting to insert duplicate lookup_hash should fail
|
||||
with pytest.raises(sqlite3.IntegrityError):
|
||||
conn.execute("""
|
||||
INSERT INTO urls (lookup_hash, iv, encrypted_url)
|
||||
VALUES ('test_hash', ?, ?)
|
||||
""", (b'\xab\xcd', b'\xef\xab'))
|
||||
|
||||
# Test client_side_urls table primary key
|
||||
conn.execute("""
|
||||
INSERT INTO client_side_urls (lookup_hash, encryption_salt, iv, encrypted_url)
|
||||
VALUES ('client_hash', ?, ?, ?)
|
||||
""", (b'salt123', b'iv123', b'url123'))
|
||||
|
||||
# Attempting to insert duplicate lookup_hash should fail
|
||||
with pytest.raises(sqlite3.IntegrityError):
|
||||
conn.execute("""
|
||||
INSERT INTO client_side_urls (lookup_hash, encryption_salt, iv, encrypted_url)
|
||||
VALUES ('client_hash', ?, ?, ?)
|
||||
""", (b'salt456', b'iv456', b'url456'))
|
||||
|
||||
def test_not_null_constraints(self, test_db_connection):
|
||||
"""Test that NOT NULL constraints are enforced"""
|
||||
conn = test_db_connection
|
||||
|
||||
# Test urls table NOT NULL constraints
|
||||
with pytest.raises(sqlite3.IntegrityError):
|
||||
conn.execute("""
|
||||
INSERT INTO urls (lookup_hash, iv, encrypted_url)
|
||||
VALUES ('test', NULL, ?)
|
||||
""", (b'encrypted',))
|
||||
|
||||
with pytest.raises(sqlite3.IntegrityError):
|
||||
conn.execute("""
|
||||
INSERT INTO urls (lookup_hash, iv, encrypted_url)
|
||||
VALUES ('test', ?, NULL)
|
||||
""", (b'iv',))
|
||||
|
||||
# Test client_side_urls table NOT NULL constraints
|
||||
with pytest.raises(sqlite3.IntegrityError):
|
||||
conn.execute("""
|
||||
INSERT INTO client_side_urls (lookup_hash, encryption_salt, iv, encrypted_url)
|
||||
VALUES ('test', NULL, ?, ?)
|
||||
""", (b'iv', b'url'))
|
||||
|
||||
def test_data_insertion_and_retrieval(self, test_db_connection):
|
||||
"""Test basic data insertion and retrieval"""
|
||||
conn = test_db_connection
|
||||
|
||||
# Test urls table
|
||||
test_data = {
|
||||
'lookup_hash': 'test_lookup_hash',
|
||||
'iv': b'\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f\x10',
|
||||
'encrypted_url': b'\xff\xfe\xfd\xfc\xfb\xfa\xf9\xf8'
|
||||
}
|
||||
|
||||
conn.execute("""
|
||||
INSERT INTO urls (lookup_hash, iv, encrypted_url)
|
||||
VALUES (?, ?, ?)
|
||||
""", (test_data['lookup_hash'], test_data['iv'], test_data['encrypted_url']))
|
||||
|
||||
cursor = conn.execute("""
|
||||
SELECT lookup_hash, iv, encrypted_url FROM urls
|
||||
WHERE lookup_hash = ?
|
||||
""", (test_data['lookup_hash'],))
|
||||
|
||||
row = cursor.fetchone()
|
||||
assert row is not None
|
||||
assert row['lookup_hash'] == test_data['lookup_hash']
|
||||
assert row['iv'] == test_data['iv']
|
||||
assert row['encrypted_url'] == test_data['encrypted_url']
|
||||
|
||||
def test_blob_data_integrity(self, test_db_connection):
|
||||
"""Test that BLOB data maintains integrity"""
|
||||
conn = test_db_connection
|
||||
|
||||
# Test with various byte patterns
|
||||
test_cases = [
|
||||
b'\x00' * 16, # All zeros
|
||||
b'\xff' * 16, # All ones
|
||||
b'\x00\xff\x00\xff' * 4, # Alternating pattern
|
||||
bytes(range(16)), # Sequential bytes
|
||||
b'\x80\x00\x01\x7f' * 4, # Edge values
|
||||
]
|
||||
|
||||
for i, test_bytes in enumerate(test_cases):
|
||||
lookup_hash = f'test_blob_{i}'
|
||||
conn.execute("""
|
||||
INSERT INTO urls (lookup_hash, iv, encrypted_url)
|
||||
VALUES (?, ?, ?)
|
||||
""", (lookup_hash, test_bytes, test_bytes))
|
||||
|
||||
cursor = conn.execute("""
|
||||
SELECT iv, encrypted_url FROM urls WHERE lookup_hash = ?
|
||||
""", (lookup_hash,))
|
||||
|
||||
row = cursor.fetchone()
|
||||
assert row['iv'] == test_bytes
|
||||
assert row['encrypted_url'] == test_bytes
|
||||
|
||||
def test_concurrent_access(self, test_db_connection):
|
||||
"""Test handling of concurrent database access patterns"""
|
||||
conn = test_db_connection
|
||||
|
||||
# Simulate concurrent inserts (sequential in test, but tests the pattern)
|
||||
for i in range(10):
|
||||
lookup_hash = f'concurrent_test_{i}'
|
||||
conn.execute("""
|
||||
INSERT INTO urls (lookup_hash, iv, encrypted_url)
|
||||
VALUES (?, ?, ?)
|
||||
""", (lookup_hash, b'test_iv', b'test_url'))
|
||||
|
||||
# Verify all records were inserted
|
||||
cursor = conn.execute("""
|
||||
SELECT COUNT(*) FROM urls WHERE lookup_hash LIKE 'concurrent_test_%'
|
||||
""")
|
||||
count = cursor.fetchone()[0]
|
||||
assert count == 10
|
||||
|
||||
@patch('main.config')
|
||||
def test_get_db_with_file_database(self, mock_config):
|
||||
"""Test get_db function with file-based database"""
|
||||
# Create a temporary database file
|
||||
import tempfile
|
||||
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp_file:
|
||||
db_path = tmp_file.name
|
||||
|
||||
try:
|
||||
mock_config.return_value = {'database': {'path': db_path}}
|
||||
|
||||
# Mock the config in main module
|
||||
with patch('main.config', {'database': {'path': db_path}}):
|
||||
conn = get_db()
|
||||
assert conn is not None
|
||||
|
||||
# Test that row_factory is set correctly
|
||||
assert conn.row_factory == sqlite3.Row
|
||||
|
||||
conn.close()
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
if os.path.exists(db_path):
|
||||
os.unlink(db_path)
|
||||
|
||||
def test_init_db_functionality(self, mock_config):
|
||||
"""Test that init_db creates the schema correctly"""
|
||||
# Create a temporary database file
|
||||
import tempfile
|
||||
with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp_file:
|
||||
db_path = tmp_file.name
|
||||
|
||||
try:
|
||||
# Mock config to use our temporary database
|
||||
with patch('main.config', {'database': {'path': db_path}}):
|
||||
init_db()
|
||||
|
||||
# Verify the database was created with correct schema
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
|
||||
# Check tables exist
|
||||
cursor = conn.execute("""
|
||||
SELECT name FROM sqlite_master
|
||||
WHERE type='table' AND name IN ('urls', 'client_side_urls')
|
||||
""")
|
||||
tables = [row[0] for row in cursor.fetchall()]
|
||||
|
||||
assert 'urls' in tables
|
||||
assert 'client_side_urls' in tables
|
||||
|
||||
conn.close()
|
||||
|
||||
finally:
|
||||
# Clean up
|
||||
if os.path.exists(db_path):
|
||||
os.unlink(db_path)
|
||||
|
||||
def test_database_performance_basic(self, test_db_connection):
|
||||
"""Test basic database performance characteristics"""
|
||||
import time
|
||||
conn = test_db_connection
|
||||
|
||||
# Test insertion performance
|
||||
start_time = time.time()
|
||||
for i in range(100):
|
||||
conn.execute("""
|
||||
INSERT INTO urls (lookup_hash, iv, encrypted_url)
|
||||
VALUES (?, ?, ?)
|
||||
""", (f'perf_test_{i}', b'test_iv' * 2, b'test_url' * 4))
|
||||
insertion_time = time.time() - start_time
|
||||
|
||||
# Test query performance
|
||||
start_time = time.time()
|
||||
for i in range(100):
|
||||
cursor = conn.execute("""
|
||||
SELECT iv, encrypted_url FROM urls WHERE lookup_hash = ?
|
||||
""", (f'perf_test_{i}',))
|
||||
cursor.fetchone()
|
||||
query_time = time.time() - start_time
|
||||
|
||||
# Basic performance assertions (should be very fast for 100 operations)
|
||||
assert insertion_time < 1.0 # Should complete in under 1 second
|
||||
assert query_time < 1.0 # Should complete in under 1 second
|
||||
|
||||
def test_data_types_validation(self, test_db_connection):
|
||||
"""Test that correct data types are handled properly"""
|
||||
conn = test_db_connection
|
||||
|
||||
# Test TEXT data
|
||||
long_text = 'a' * 1000 # 1KB text
|
||||
conn.execute("""
|
||||
INSERT INTO urls (lookup_hash, iv, encrypted_url)
|
||||
VALUES (?, ?, ?)
|
||||
""", (long_text, b'test_iv', b'test_url'))
|
||||
|
||||
cursor = conn.execute("""
|
||||
SELECT lookup_hash FROM urls WHERE lookup_hash = ?
|
||||
""", (long_text,))
|
||||
row = cursor.fetchone()
|
||||
assert row['lookup_hash'] == long_text
|
||||
|
||||
# Test BLOB data of various sizes
|
||||
blob_sizes = [1, 16, 256, 1024, 4096] # Various sizes
|
||||
for size in blob_sizes:
|
||||
test_blob = b'x' * size
|
||||
lookup_hash = f'blob_test_{size}'
|
||||
|
||||
conn.execute("""
|
||||
INSERT INTO client_side_urls (lookup_hash, encryption_salt, iv, encrypted_url)
|
||||
VALUES (?, ?, ?, ?)
|
||||
""", (lookup_hash, test_blob, b'iv', b'url'))
|
||||
|
||||
cursor = conn.execute("""
|
||||
SELECT encryption_salt FROM client_side_urls WHERE lookup_hash = ?
|
||||
""", (lookup_hash,))
|
||||
row = cursor.fetchone()
|
||||
assert len(row['encryption_salt']) == size
|
||||
assert row['encryption_salt'] == test_blob
|
||||
@@ -1,155 +0,0 @@
|
||||
import pytest
|
||||
import re
|
||||
import sys
|
||||
import os
|
||||
from unittest.mock import patch
|
||||
|
||||
# Add parent directory to path so we can import main
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
from main import generate_id
|
||||
|
||||
|
||||
class TestIDGeneration:
|
||||
"""Test ID generation functionality"""
|
||||
|
||||
def test_generate_id_basic(self, mock_config):
|
||||
"""Test basic ID generation"""
|
||||
id_str = generate_id()
|
||||
|
||||
assert isinstance(id_str, str)
|
||||
assert len(id_str) == 10 # From test config
|
||||
|
||||
def test_generate_id_allowed_chars(self, mock_config):
|
||||
"""Test that generated IDs only contain allowed characters"""
|
||||
allowed_chars = "abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ123456789"
|
||||
|
||||
for _ in range(100): # Test multiple generations
|
||||
id_str = generate_id()
|
||||
for char in id_str:
|
||||
assert char in allowed_chars
|
||||
|
||||
def test_generate_id_uniqueness(self, mock_config):
|
||||
"""Test that generated IDs are likely unique"""
|
||||
ids = set()
|
||||
|
||||
# Generate many IDs and check for uniqueness
|
||||
for _ in range(1000):
|
||||
id_str = generate_id()
|
||||
ids.add(id_str)
|
||||
|
||||
# With 10 characters from 58-character alphabet, collisions should be extremely rare
|
||||
# We expect very high uniqueness
|
||||
assert len(ids) > 990 # Allow for very rare collisions
|
||||
|
||||
def test_generate_id_format(self, mock_config):
|
||||
"""Test that generated IDs match expected format"""
|
||||
id_str = generate_id()
|
||||
|
||||
# Should be alphanumeric but exclude confusing characters like 0, O, I, l
|
||||
pattern = r'^[abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ123456789]{10}$'
|
||||
assert re.match(pattern, id_str)
|
||||
|
||||
def test_generate_id_no_confusing_chars(self, mock_config):
|
||||
"""Test that IDs don't contain visually confusing characters"""
|
||||
confusing_chars = ['0', 'O', 'I', 'l']
|
||||
|
||||
for _ in range(100):
|
||||
id_str = generate_id()
|
||||
for char in confusing_chars:
|
||||
assert char not in id_str
|
||||
|
||||
def test_generate_id_length_consistency(self, mock_config):
|
||||
"""Test that all generated IDs have consistent length"""
|
||||
expected_length = 10
|
||||
|
||||
for _ in range(50):
|
||||
id_str = generate_id()
|
||||
assert len(id_str) == expected_length
|
||||
|
||||
def test_generate_id_different_config_length(self, test_config):
|
||||
"""Test ID generation with different configured length"""
|
||||
test_config['id_generation']['length'] = 8
|
||||
|
||||
with patch('main.config', test_config):
|
||||
id_str = generate_id()
|
||||
assert len(id_str) == 8
|
||||
|
||||
def test_generate_id_different_allowed_chars(self, test_config):
|
||||
"""Test ID generation with different allowed characters"""
|
||||
test_config['id_generation']['allowed_chars'] = 'abc123'
|
||||
|
||||
with patch('main.config', test_config):
|
||||
id_str = generate_id()
|
||||
assert len(id_str) == test_config['id_generation']['length'] # Use config length
|
||||
for char in id_str:
|
||||
assert char in 'abc123'
|
||||
|
||||
|
||||
class TestConfigValidation:
|
||||
"""Test configuration validation and salt handling"""
|
||||
|
||||
def test_salt_validation_correct_length(self):
|
||||
"""Test that valid salts are accepted"""
|
||||
salt_hex = "85ddce7d130a6f1beba59fc8ba2a715d" # 32 hex chars = 16 bytes
|
||||
salt_bytes = bytes.fromhex(salt_hex)
|
||||
|
||||
assert len(salt_bytes) == 16
|
||||
|
||||
def test_salt_validation_incorrect_length(self):
|
||||
"""Test that invalid salt lengths are rejected"""
|
||||
# Too short
|
||||
short_salt = "85ddce7d130a6f1b" # 16 hex chars = 8 bytes
|
||||
salt_bytes = bytes.fromhex(short_salt)
|
||||
assert len(salt_bytes) != 16
|
||||
|
||||
# Too long
|
||||
long_salt = "85ddce7d130a6f1beba59fc8ba2a715d85ddce7d130a6f1beba59fc8ba2a715d" # 64 hex chars = 32 bytes
|
||||
salt_bytes = bytes.fromhex(long_salt)
|
||||
assert len(salt_bytes) != 16
|
||||
|
||||
def test_invalid_hex_salt(self):
|
||||
"""Test that invalid hex strings are rejected"""
|
||||
invalid_salts = [
|
||||
"invalid_hex_string",
|
||||
"85ddce7d130a6f1beba59fc8ba2a715g", # Invalid hex char 'g'
|
||||
"85ddce7d130a6f1beba59fc8ba2a715", # Odd number of chars
|
||||
]
|
||||
|
||||
for invalid_salt in invalid_salts:
|
||||
with pytest.raises(ValueError):
|
||||
bytes.fromhex(invalid_salt)
|
||||
|
||||
|
||||
class TestURLNormalization:
|
||||
"""Test URL preprocessing and normalization"""
|
||||
|
||||
def test_url_prefix_addition(self):
|
||||
"""Test that URLs get proper prefixes added"""
|
||||
test_cases = [
|
||||
("google.com", "http://google.com"),
|
||||
("example.org", "http://example.org"),
|
||||
("domain.co.uk", "http://domain.co.uk"),
|
||||
("subdomain.example.com", "http://subdomain.example.com"),
|
||||
]
|
||||
|
||||
for input_url, expected_output in test_cases:
|
||||
# Simulate the URL preprocessing logic from main.py
|
||||
url = input_url
|
||||
if not (url.startswith('https://') or url.startswith('http://') or url.startswith('magnet:')):
|
||||
url = 'http://' + url
|
||||
assert url == expected_output
|
||||
|
||||
def test_url_prefix_preservation(self):
|
||||
"""Test that existing prefixes are preserved"""
|
||||
test_urls = [
|
||||
"https://google.com",
|
||||
"http://example.com",
|
||||
"magnet:?xt=urn:btih:example",
|
||||
]
|
||||
|
||||
for url in test_urls:
|
||||
# Simulate the URL preprocessing logic from main.py
|
||||
processed_url = url
|
||||
if not (url.startswith('https://') or url.startswith('http://') or url.startswith('magnet:')):
|
||||
processed_url = 'http://' + url
|
||||
assert processed_url == url # Should remain unchanged
|
||||
40
build.sh
@@ -1,40 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Simple build script that uses the current config.json settings
|
||||
# Use this after you've manually configured backend/config.json
|
||||
|
||||
set -e
|
||||
|
||||
echo "📦 Building frontend using backend/config.json settings..."
|
||||
|
||||
# Check if config exists
|
||||
if [ ! -f "backend/config.json" ]; then
|
||||
echo "❌ backend/config.json not found. Please copy from config_template.json and configure it."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Extract domain from config for the placeholder replacement
|
||||
DOMAIN=$(cat backend/config.json | grep -o '"name": "[^"]*"' | sed 's/"name": "\([^"]*\)"/\1/')
|
||||
if [ -z "$DOMAIN" ]; then
|
||||
echo "❌ Could not find domain.name in backend/config.json"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "🔧 Using domain: $DOMAIN"
|
||||
|
||||
# Build the frontend
|
||||
cd frontend
|
||||
npm install
|
||||
npm run build
|
||||
cd ..
|
||||
|
||||
# Replace any remaining placeholders in the built files
|
||||
echo "🔧 Replacing domain placeholders in static files..."
|
||||
find backend/dist -type f \( -name "*.html" -o -name "*.xml" -o -name "*.json" -o -name "*.webmanifest" \) -exec sed -i.bak "s/%VITE_DOMAIN%/$DOMAIN/g" {} \;
|
||||
find backend/dist -name "*.bak" -delete
|
||||
|
||||
echo "✅ Build completed successfully!"
|
||||
echo "📁 Built files are in backend/dist/"
|
||||
echo ""
|
||||
echo "To run the backend:"
|
||||
echo " cd backend && python main.py"
|
||||
74
deploy.sh
@@ -1,74 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Deploy script for self-hosted tnyr.me
|
||||
# This script builds the frontend with custom domain configuration
|
||||
|
||||
set -e
|
||||
|
||||
# Check if domain is provided
|
||||
if [ -z "$1" ]; then
|
||||
echo "Usage: ./deploy.sh your-domain.com"
|
||||
echo "Example: ./deploy.sh myshortener.com"
|
||||
echo "Or configure backend/config.json and run: ./deploy.sh"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
DOMAIN=$1
|
||||
# Determine default API URL scheme based on domain (use http for localhost/ports)
|
||||
if [[ "$DOMAIN" =~ ^(localhost|127\.0\.0\.1)(:[0-9]+)?$ ]]; then
|
||||
DEFAULT_API_URL="http://$DOMAIN"
|
||||
else
|
||||
DEFAULT_API_URL="https://$DOMAIN"
|
||||
fi
|
||||
|
||||
API_URL=${2:-"$DEFAULT_API_URL"}
|
||||
|
||||
echo "🔧 Configuring deployment for domain: $DOMAIN"
|
||||
echo "🔧 API URL: $API_URL"
|
||||
|
||||
# Update backend config.json with the new domain
|
||||
echo "📝 Updating backend/config.json..."
|
||||
if [ -f "backend/config.json" ]; then
|
||||
# Use jq if available, otherwise use sed (less reliable but more portable)
|
||||
if command -v jq >/dev/null 2>&1; then
|
||||
jq --arg domain "$DOMAIN" --arg api_url "$API_URL" \
|
||||
'.domain.name = $domain | .domain.api_base_url = $api_url' \
|
||||
backend/config.json > backend/config.json.tmp && \
|
||||
mv backend/config.json.tmp backend/config.json
|
||||
else
|
||||
# Fallback to sed (assumes the config has the domain section)
|
||||
sed -i.bak "s|\"name\": \"[^\"]*\"|\"name\": \"$DOMAIN\"|g" backend/config.json
|
||||
sed -i.bak "s|\"api_base_url\": \"[^\"]*\"|\"api_base_url\": \"$API_URL\"|g" backend/config.json
|
||||
rm -f backend/config.json.bak
|
||||
fi
|
||||
echo "✅ Updated backend/config.json"
|
||||
else
|
||||
echo "❌ backend/config.json not found. Please copy from config_template.json"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Build the frontend
|
||||
echo "📦 Building frontend..."
|
||||
cd frontend
|
||||
npm install
|
||||
npm run build
|
||||
cd ..
|
||||
|
||||
# Replace any remaining placeholders in the built files
|
||||
echo "🔧 Replacing domain placeholders in static files..."
|
||||
find backend/dist -type f \( -name "*.html" -o -name "*.xml" -o -name "*.json" -o -name "*.webmanifest" \) -exec sed -i.bak "s/%VITE_DOMAIN%/$DOMAIN/g" {} \;
|
||||
find backend/dist -name "*.bak" -delete
|
||||
|
||||
echo "✅ Deployment built successfully!"
|
||||
echo "📁 Built files are in backend/dist/"
|
||||
echo ""
|
||||
echo "Next steps:"
|
||||
echo "1. Update your backend/config.json with appropriate settings"
|
||||
echo "2. Configure your backend to run on the specified domain"
|
||||
echo "3. Update DNS records to point to your server"
|
||||
echo ""
|
||||
echo "For development:"
|
||||
echo " cd frontend && npm run dev"
|
||||
echo ""
|
||||
echo "For production backend:"
|
||||
echo " cd backend && python main.py"
|
||||
26
docker-compose.yml
Normal file
@@ -0,0 +1,26 @@
|
||||
services:
|
||||
tnyr:
|
||||
build: .
|
||||
ports:
|
||||
- "5502:5502"
|
||||
environment:
|
||||
# Required (https://example.com or http://1.2.3.4:5502)
|
||||
- TNYR_PUBLIC_URL=${TNYR_PUBLIC_URL:?set TNYR_PUBLIC_URL (https://example.com or http://1.2.3.4:5502)}
|
||||
# Persistence
|
||||
- TNYR_DB_PATH=/data/urls.db
|
||||
- TNYR_PORT=5502
|
||||
# Optional (enables /delete-url)
|
||||
- TNYR_DELETION_TOKEN=${TNYR_DELETION_TOKEN:-}
|
||||
# Optional: legacy server-side mode (/shorten-server + old /<id>)
|
||||
- TNYR_SALT1_HEX=${TNYR_SALT1_HEX:-}
|
||||
- TNYR_SALT2_HEX=${TNYR_SALT2_HEX:-}
|
||||
# Optional: Argon2 params (legacy link decryption only)
|
||||
- TNYR_ARGON2_TIME_COST=${TNYR_ARGON2_TIME_COST:-}
|
||||
- TNYR_ARGON2_MEMORY_COST=${TNYR_ARGON2_MEMORY_COST:-}
|
||||
- TNYR_ARGON2_PARALLELISM=${TNYR_ARGON2_PARALLELISM:-}
|
||||
- TNYR_ARGON2_HASH_LENGTH=${TNYR_ARGON2_HASH_LENGTH:-}
|
||||
volumes:
|
||||
- ./data:/data
|
||||
restart: unless-stopped
|
||||
|
||||
|
||||
56
docker/entrypoint.sh
Normal file
@@ -0,0 +1,56 @@
|
||||
#!/usr/bin/env sh
|
||||
set -eu
|
||||
|
||||
DIST_DIR="/app/backend/dist"
|
||||
|
||||
if [ -z "${TNYR_SALT1_HEX:-}" ] || [ -z "${TNYR_SALT2_HEX:-}" ]; then
|
||||
echo "Legacy salts not set (TNYR_SALT1_HEX/TNYR_SALT2_HEX). That's OK: default mode needs no secrets."
|
||||
echo "If you need legacy server-side links (/shorten-server and old /<id>), generate salts with: python3 backend/generate_salts.py --env"
|
||||
fi
|
||||
|
||||
PUBLIC_URL="${TNYR_PUBLIC_URL:-}"
|
||||
PORT="${TNYR_PORT:-5502}"
|
||||
DB_PATH="${TNYR_DB_PATH:-}"
|
||||
|
||||
if [ -z "$PUBLIC_URL" ]; then
|
||||
echo "ERROR: Missing required env var: TNYR_PUBLIC_URL (https://example.com or http://1.2.3.4:5502)"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
DOMAIN="$(python - <<'PY'
|
||||
import os
|
||||
from urllib.parse import urlparse
|
||||
u = os.environ.get("TNYR_PUBLIC_URL","").strip()
|
||||
p = urlparse(u)
|
||||
if p.scheme not in ("http","https") or not p.netloc:
|
||||
raise SystemExit("Invalid TNYR_PUBLIC_URL (must be http(s)://host[:port])")
|
||||
print(p.netloc)
|
||||
PY
|
||||
)"
|
||||
|
||||
if [ -n "${DB_PATH:-}" ]; then
|
||||
DB_DIR="$(dirname "$DB_PATH")"
|
||||
mkdir -p "$DB_DIR" || true
|
||||
fi
|
||||
|
||||
mkdir -p /data || true
|
||||
|
||||
if [ -d "$DIST_DIR" ]; then
|
||||
if [ -n "$DOMAIN" ]; then
|
||||
echo "Applying public URL + domain to static files: $PUBLIC_URL ($DOMAIN)"
|
||||
SED_PUBLIC_URL="$(printf '%s' "$PUBLIC_URL" | sed 's/[\/&]/\\&/g')"
|
||||
SED_DOMAIN="$(printf '%s' "$DOMAIN" | sed 's/[\/&]/\\&/g')"
|
||||
find "$DIST_DIR" -type f \( -name "*.html" -o -name "*.xml" -o -name "*.json" -o -name "*.webmanifest" \) -print0 \
|
||||
| xargs -0 -r sed -i \
|
||||
-e "s/%VITE_PUBLIC_URL%/$SED_PUBLIC_URL/g" \
|
||||
-e "s/%VITE_DOMAIN%/$SED_DOMAIN/g"
|
||||
else
|
||||
echo "Could not derive domain from TNYR_PUBLIC_URL; leaving placeholders as-is in static files."
|
||||
fi
|
||||
fi
|
||||
|
||||
cd /app/backend
|
||||
python -c "import main; main.init_db()"
|
||||
exec gunicorn --workers 2 --bind "0.0.0.0:${PORT}" wsgi:app
|
||||
|
||||
|
||||
@@ -10,25 +10,30 @@
|
||||
<link rel="manifest" href="/favicon/site.webmanifest" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||
<meta name="description" content="An easy to use end-to-end encrypted URL shortener. tnyr.me is privacy friendly and does not track you or store cookies" />
|
||||
<link rel="canonical" href="https://%VITE_DOMAIN%" />
|
||||
<link rel="canonical" href="%VITE_PUBLIC_URL%" />
|
||||
|
||||
<meta property="og:title" content="TNYR - Privacy friendly URL shortener" />
|
||||
<meta property="og:site_name" content="%VITE_DOMAIN%">
|
||||
<meta property="og:url" content="https://%VITE_DOMAIN%">
|
||||
<meta property="og:url" content="%VITE_PUBLIC_URL%">
|
||||
<meta property="og:description" content="A self encrypted url shortener that puts your privacy first, while being easy to use. tnyr.me doesn't track users, store cookies or log your requests.">
|
||||
<meta property="og:type" content="website">
|
||||
<meta property="og:image" content="https://%VITE_DOMAIN%/meta/logo.png">
|
||||
<meta property="og:image" content="%VITE_PUBLIC_URL%/meta/logo.png">
|
||||
|
||||
<script type="application/ld+json">
|
||||
{
|
||||
"@context": "https://schema.org",
|
||||
"@type": "WebSite",
|
||||
"name": "TNYR",
|
||||
"url": "https://%VITE_DOMAIN%",
|
||||
"url": "%VITE_PUBLIC_URL%",
|
||||
"description": "A self encrypted url shortener that puts your privacy first, while being easy to use. tnyr.me doesn't track users, store cookies or log your requests."
|
||||
}
|
||||
</script>
|
||||
|
||||
<script>
|
||||
window.__TNYR_PUBLIC_URL__ = "%VITE_PUBLIC_URL%";
|
||||
window.__TNYR_DOMAIN__ = "%VITE_DOMAIN%";
|
||||
</script>
|
||||
|
||||
<title>tnyr.me - Privacy friendly URL shortener</title>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
http://www.sitemaps.org/schemas/sitemap/0.9/sitemap.xsd">
|
||||
|
||||
<url>
|
||||
<loc>https://%VITE_DOMAIN%/</loc>
|
||||
<loc>%VITE_PUBLIC_URL%/</loc>
|
||||
</url>
|
||||
|
||||
|
||||
|
||||
@@ -31,14 +31,53 @@ type UrlFormData = z.infer<typeof urlSchema>;
|
||||
|
||||
// Configuration constants
|
||||
const ALLOWED_CHARS = 'abcdefghijkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ123456789+*-';
|
||||
const DOMAIN = import.meta.env.VITE_DOMAIN || 'tnyr.me';
|
||||
const getDefaultApiBaseUrl = () => {
|
||||
const getConfiguredDomain = () => {
|
||||
if (typeof window === 'undefined') return '';
|
||||
const v = (window as any).__TNYR_DOMAIN__;
|
||||
if (typeof v === 'string' && v && v !== '%VITE_DOMAIN%') return v;
|
||||
return '';
|
||||
};
|
||||
|
||||
const getConfiguredPublicUrl = () => {
|
||||
if (typeof window === 'undefined') return '';
|
||||
const v = (window as any).__TNYR_PUBLIC_URL__;
|
||||
if (typeof v === 'string' && v && v !== '%VITE_PUBLIC_URL%') return v.replace(/\/+$/, '');
|
||||
return '';
|
||||
};
|
||||
|
||||
const getHostnameFromHost = (host: string) => {
|
||||
try {
|
||||
return new URL(`http://${host}`).hostname;
|
||||
} catch {
|
||||
return host.split(':')[0] || host;
|
||||
}
|
||||
};
|
||||
|
||||
const getSiteHost = () => {
|
||||
const configured = getConfiguredDomain();
|
||||
if (configured) return configured;
|
||||
if (typeof window !== 'undefined' && window.location && window.location.host) {
|
||||
return window.location.host; // includes port (useful for localhost)
|
||||
}
|
||||
return 'tnyr.me';
|
||||
};
|
||||
|
||||
const getSiteHostname = () => {
|
||||
const host = getSiteHost();
|
||||
return getHostnameFromHost(host);
|
||||
};
|
||||
|
||||
const getApiBaseUrl = () => {
|
||||
if (typeof window !== 'undefined' && window.location && window.location.origin) {
|
||||
return window.location.origin;
|
||||
}
|
||||
return `https://${DOMAIN}`;
|
||||
return 'https://tnyr.me';
|
||||
};
|
||||
const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || getDefaultApiBaseUrl();
|
||||
|
||||
const SITE_HOST = getSiteHost();
|
||||
const SITE_HOSTNAME = getSiteHostname();
|
||||
const API_BASE_URL = getApiBaseUrl();
|
||||
const PUBLIC_URL = getConfiguredPublicUrl();
|
||||
// Shared salt for lookup hash (protects against rainbow tables)
|
||||
const LOOKUP_SALT = new Uint8Array([0x74, 0x6e, 0x79, 0x72, 0x2e, 0x6d, 0x65, 0x5f, 0x6c, 0x6f, 0x6f, 0x6b, 0x75, 0x70, 0x5f, 0x73]); // "tnyr.me_lookup_s"
|
||||
|
||||
@@ -79,39 +118,45 @@ const deriveEncryptionKey = (id: string, salt: Uint8Array) => {
|
||||
|
||||
const encryptUrl = async (key: Uint8Array, plaintext: string) => {
|
||||
const iv = generateRandomBytes(16);
|
||||
const keyBytes = new Uint8Array(Array.from(key));
|
||||
const ivBytes = new Uint8Array(Array.from(iv));
|
||||
const encoder = new TextEncoder();
|
||||
const data = encoder.encode(plaintext);
|
||||
const dataBytes = new Uint8Array(Array.from(data));
|
||||
|
||||
const cryptoKey = await crypto.subtle.importKey(
|
||||
'raw',
|
||||
key,
|
||||
keyBytes,
|
||||
{ name: 'AES-CBC' },
|
||||
false,
|
||||
['encrypt']
|
||||
);
|
||||
|
||||
const encrypted = await crypto.subtle.encrypt(
|
||||
{ name: 'AES-CBC', iv: iv },
|
||||
{ name: 'AES-CBC', iv: ivBytes },
|
||||
cryptoKey,
|
||||
data
|
||||
dataBytes
|
||||
);
|
||||
|
||||
return { iv, encrypted: new Uint8Array(encrypted) };
|
||||
return { iv: ivBytes, encrypted: new Uint8Array(encrypted) };
|
||||
};
|
||||
|
||||
const decryptUrl = async (key: Uint8Array, iv: Uint8Array, ciphertext: Uint8Array) => {
|
||||
const keyBytes = new Uint8Array(Array.from(key));
|
||||
const ivBytes = new Uint8Array(Array.from(iv));
|
||||
const ciphertextBytes = new Uint8Array(Array.from(ciphertext));
|
||||
const cryptoKey = await crypto.subtle.importKey(
|
||||
'raw',
|
||||
key,
|
||||
keyBytes,
|
||||
{ name: 'AES-CBC' },
|
||||
false,
|
||||
['decrypt']
|
||||
);
|
||||
|
||||
const decrypted = await crypto.subtle.decrypt(
|
||||
{ name: 'AES-CBC', iv: iv },
|
||||
{ name: 'AES-CBC', iv: ivBytes },
|
||||
cryptoKey,
|
||||
ciphertext
|
||||
ciphertextBytes
|
||||
);
|
||||
|
||||
const decoder = new TextDecoder();
|
||||
@@ -230,8 +275,7 @@ export default function App() {
|
||||
ENCRYPTED_URL: arrayToHex(encrypted)
|
||||
});
|
||||
|
||||
const protocol = typeof window !== 'undefined' && window.location ? window.location.protocol : 'https:';
|
||||
const shortUrl = `${protocol}//${DOMAIN}/#${linkId}`;
|
||||
const shortUrl = PUBLIC_URL ? `${PUBLIC_URL}/#${linkId}` : `https://${SITE_HOST}/#${linkId}`;
|
||||
setShortened(shortUrl);
|
||||
} catch (error) {
|
||||
console.error('Encryption error:', error);
|
||||
@@ -247,7 +291,7 @@ export default function App() {
|
||||
|
||||
// Show abuse warning if detected
|
||||
if (showAbuseWarning) {
|
||||
return <AbuseWarning domain={DOMAIN} />;
|
||||
return <AbuseWarning domain={SITE_HOSTNAME} />;
|
||||
}
|
||||
|
||||
// Decryption loading screen
|
||||
@@ -415,7 +459,7 @@ export default function App() {
|
||||
<div className="mt-6 p-4 bg-slate-700/30 rounded-lg border border-slate-700/50">
|
||||
<p className="text-sm text-slate-400">
|
||||
🔒 <span className="font-medium">Important:</span> Make sure to
|
||||
Bookmark your {DOMAIN} links safely - there's no way to recover
|
||||
Bookmark your {SITE_HOST} links safely - there's no way to recover
|
||||
lost IDs or access links without them.
|
||||
</p>
|
||||
</div>
|
||||
@@ -444,10 +488,10 @@ export default function App() {
|
||||
<div className="text-xs text-slate-500 text-center">
|
||||
Report abuse:{" "}
|
||||
<a
|
||||
href={`mailto:abuse@${DOMAIN}`}
|
||||
href={`mailto:abuse@${SITE_HOSTNAME}`}
|
||||
className="hover:text-slate-400 transition-colors underline"
|
||||
>
|
||||
abuse@{DOMAIN}
|
||||
abuse@{SITE_HOSTNAME}
|
||||
</a>
|
||||
</div>
|
||||
</footer>
|
||||
|
||||
@@ -1,55 +1,9 @@
|
||||
import path from "path"
|
||||
import react from "@vitejs/plugin-react"
|
||||
import { defineConfig } from "vite"
|
||||
import fs from "fs"
|
||||
|
||||
// Read domain configuration from backend config
|
||||
function getDomainConfig() {
|
||||
try {
|
||||
const configPath = path.resolve(__dirname, '../backend/config.json');
|
||||
const config = JSON.parse(fs.readFileSync(configPath, 'utf8'));
|
||||
return {
|
||||
domain: config.domain?.name || process.env.VITE_DOMAIN || 'tnyr.me',
|
||||
apiBaseUrl: config.domain?.api_base_url || process.env.VITE_API_BASE_URL || `https://${config.domain?.name || 'tnyr.me'}`
|
||||
};
|
||||
} catch (error) {
|
||||
console.warn('Could not read backend config, using environment variables or defaults');
|
||||
return {
|
||||
domain: process.env.VITE_DOMAIN || 'tnyr.me',
|
||||
apiBaseUrl: process.env.VITE_API_BASE_URL || `https://${process.env.VITE_DOMAIN || 'tnyr.me'}`
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Plugin to replace environment variables in HTML and other static files
|
||||
function replaceEnvVars() {
|
||||
return {
|
||||
name: 'replace-env-vars',
|
||||
generateBundle(_options: any, bundle: any) {
|
||||
const { domain } = getDomainConfig()
|
||||
|
||||
Object.keys(bundle).forEach(fileName => {
|
||||
const file = bundle[fileName]
|
||||
if (file.type === 'asset') {
|
||||
if (typeof file.source === 'string') {
|
||||
file.source = file.source.replace(/%VITE_DOMAIN%/g, domain)
|
||||
} else if (file.source instanceof Uint8Array) {
|
||||
const text = new TextDecoder().decode(file.source)
|
||||
if (text.includes('%VITE_DOMAIN%')) {
|
||||
const replaced = text.replace(/%VITE_DOMAIN%/g, domain)
|
||||
file.source = new TextEncoder().encode(replaced)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const { domain, apiBaseUrl } = getDomainConfig();
|
||||
|
||||
export default defineConfig({
|
||||
plugins: [react(), replaceEnvVars()],
|
||||
plugins: [react()],
|
||||
resolve: {
|
||||
alias: {
|
||||
"@": path.resolve(__dirname, "./src"),
|
||||
@@ -59,8 +13,4 @@ export default defineConfig({
|
||||
outDir: path.resolve(__dirname, "../backend/dist"),
|
||||
emptyOutDir: true,
|
||||
},
|
||||
define: {
|
||||
'import.meta.env.VITE_DOMAIN': JSON.stringify(domain),
|
||||
'import.meta.env.VITE_API_BASE_URL': JSON.stringify(apiBaseUrl),
|
||||
},
|
||||
})
|
||||