mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Merge branch 'swiftyos/secrt-1709-store-provider-names-and-env-vars-in-db' of github.com:Significant-Gravitas/AutoGPT into swiftyos/secrt-1709-store-provider-names-and-env-vars-in-db
This commit is contained in:
@@ -290,7 +290,18 @@ class UserCreditBase(ABC):
|
||||
WITH user_balance_lock AS (
|
||||
SELECT
|
||||
$2::text as userId,
|
||||
COALESCE((SELECT balance FROM {schema_prefix}"UserBalance" WHERE "userId" = $2 FOR UPDATE), 0) as balance
|
||||
COALESCE(
|
||||
(SELECT balance FROM {schema_prefix}"UserBalance" WHERE "userId" = $2 FOR UPDATE),
|
||||
-- Fallback: compute balance from transaction history if UserBalance doesn't exist
|
||||
(SELECT COALESCE(ct."runningBalance", 0)
|
||||
FROM {schema_prefix}"CreditTransaction" ct
|
||||
WHERE ct."userId" = $2
|
||||
AND ct."isActive" = true
|
||||
AND ct."runningBalance" IS NOT NULL
|
||||
ORDER BY ct."createdAt" DESC
|
||||
LIMIT 1),
|
||||
0
|
||||
) as balance
|
||||
),
|
||||
transaction_check AS (
|
||||
SELECT * FROM {schema_prefix}"CreditTransaction"
|
||||
@@ -411,76 +422,110 @@ class UserCreditBase(ABC):
|
||||
)
|
||||
|
||||
# Single unified atomic operation for all transaction types using UserBalance
|
||||
result = await query_raw_with_schema(
|
||||
"""
|
||||
WITH user_balance_lock AS (
|
||||
SELECT
|
||||
$1::text as userId,
|
||||
-- CRITICAL: FOR UPDATE lock prevents concurrent modifications to the same user's balance
|
||||
-- This ensures atomic read-modify-write operations and prevents race conditions
|
||||
COALESCE((SELECT balance FROM {schema_prefix}"UserBalance" WHERE "userId" = $1 FOR UPDATE), 0) as balance
|
||||
),
|
||||
balance_update AS (
|
||||
INSERT INTO {schema_prefix}"UserBalance" ("userId", "balance", "updatedAt")
|
||||
SELECT
|
||||
$1::text,
|
||||
CASE
|
||||
-- For inactive transactions: Don't update balance
|
||||
WHEN $5::boolean = false THEN user_balance_lock.balance
|
||||
-- For ceiling balance (amount > 0): Apply ceiling
|
||||
WHEN $2 > 0 AND $7::int IS NOT NULL AND user_balance_lock.balance > $7::int - $2 THEN $7::int
|
||||
-- For regular operations: Apply with overflow/underflow protection
|
||||
WHEN user_balance_lock.balance + $2 > $6::int THEN $6::int
|
||||
WHEN user_balance_lock.balance + $2 < $10::int THEN $10::int
|
||||
ELSE user_balance_lock.balance + $2
|
||||
END,
|
||||
CURRENT_TIMESTAMP
|
||||
FROM user_balance_lock
|
||||
WHERE (
|
||||
$5::boolean = false OR -- Allow inactive transactions
|
||||
$2 >= 0 OR -- Allow positive amounts (top-ups, grants)
|
||||
$8::boolean = false OR -- Allow when insufficient balance check is disabled
|
||||
user_balance_lock.balance + $2 >= 0 -- Allow spending only when sufficient balance
|
||||
try:
|
||||
result = await query_raw_with_schema(
|
||||
"""
|
||||
WITH user_balance_lock AS (
|
||||
SELECT
|
||||
$1::text as userId,
|
||||
-- CRITICAL: FOR UPDATE lock prevents concurrent modifications to the same user's balance
|
||||
-- This ensures atomic read-modify-write operations and prevents race conditions
|
||||
COALESCE(
|
||||
(SELECT balance FROM {schema_prefix}"UserBalance" WHERE "userId" = $1 FOR UPDATE),
|
||||
-- Fallback: compute balance from transaction history if UserBalance doesn't exist
|
||||
(SELECT COALESCE(ct."runningBalance", 0)
|
||||
FROM {schema_prefix}"CreditTransaction" ct
|
||||
WHERE ct."userId" = $1
|
||||
AND ct."isActive" = true
|
||||
AND ct."runningBalance" IS NOT NULL
|
||||
ORDER BY ct."createdAt" DESC
|
||||
LIMIT 1),
|
||||
0
|
||||
) as balance
|
||||
),
|
||||
balance_update AS (
|
||||
INSERT INTO {schema_prefix}"UserBalance" ("userId", "balance", "updatedAt")
|
||||
SELECT
|
||||
$1::text,
|
||||
CASE
|
||||
-- For inactive transactions: Don't update balance
|
||||
WHEN $5::boolean = false THEN user_balance_lock.balance
|
||||
-- For ceiling balance (amount > 0): Apply ceiling
|
||||
WHEN $2 > 0 AND $7::int IS NOT NULL AND user_balance_lock.balance > $7::int - $2 THEN $7::int
|
||||
-- For regular operations: Apply with overflow/underflow protection
|
||||
WHEN user_balance_lock.balance + $2 > $6::int THEN $6::int
|
||||
WHEN user_balance_lock.balance + $2 < $10::int THEN $10::int
|
||||
ELSE user_balance_lock.balance + $2
|
||||
END,
|
||||
CURRENT_TIMESTAMP
|
||||
FROM user_balance_lock
|
||||
WHERE (
|
||||
$5::boolean = false OR -- Allow inactive transactions
|
||||
$2 >= 0 OR -- Allow positive amounts (top-ups, grants)
|
||||
$8::boolean = false OR -- Allow when insufficient balance check is disabled
|
||||
user_balance_lock.balance + $2 >= 0 -- Allow spending only when sufficient balance
|
||||
)
|
||||
ON CONFLICT ("userId") DO UPDATE SET
|
||||
"balance" = EXCLUDED."balance",
|
||||
"updatedAt" = EXCLUDED."updatedAt"
|
||||
RETURNING "balance", "updatedAt"
|
||||
),
|
||||
transaction_insert AS (
|
||||
INSERT INTO {schema_prefix}"CreditTransaction" (
|
||||
"userId", "amount", "type", "runningBalance",
|
||||
"metadata", "isActive", "createdAt", "transactionKey"
|
||||
)
|
||||
SELECT
|
||||
$1::text,
|
||||
$2::int,
|
||||
$3::text::{schema_prefix}"CreditTransactionType",
|
||||
CASE
|
||||
-- For inactive transactions: Set runningBalance to original balance (don't apply the change yet)
|
||||
WHEN $5::boolean = false THEN user_balance_lock.balance
|
||||
ELSE COALESCE(balance_update.balance, user_balance_lock.balance)
|
||||
END,
|
||||
$4::jsonb,
|
||||
$5::boolean,
|
||||
COALESCE(balance_update."updatedAt", CURRENT_TIMESTAMP),
|
||||
COALESCE($9, gen_random_uuid()::text)
|
||||
FROM user_balance_lock
|
||||
LEFT JOIN balance_update ON true
|
||||
WHERE (
|
||||
$5::boolean = false OR -- Allow inactive transactions
|
||||
$2 >= 0 OR -- Allow positive amounts (top-ups, grants)
|
||||
$8::boolean = false OR -- Allow when insufficient balance check is disabled
|
||||
user_balance_lock.balance + $2 >= 0 -- Allow spending only when sufficient balance
|
||||
)
|
||||
RETURNING "runningBalance", "transactionKey"
|
||||
)
|
||||
ON CONFLICT ("userId") DO UPDATE SET
|
||||
"balance" = EXCLUDED."balance",
|
||||
"updatedAt" = EXCLUDED."updatedAt"
|
||||
RETURNING "balance", "updatedAt"
|
||||
),
|
||||
transaction_insert AS (
|
||||
INSERT INTO {schema_prefix}"CreditTransaction" (
|
||||
"userId", "amount", "type", "runningBalance",
|
||||
"metadata", "isActive", "createdAt", "transactionKey"
|
||||
)
|
||||
SELECT
|
||||
$1::text,
|
||||
$2::int,
|
||||
$3::text::{schema_prefix}"CreditTransactionType",
|
||||
CASE
|
||||
-- For inactive transactions: Set runningBalance to original balance (don't apply the change yet)
|
||||
WHEN $5::boolean = false THEN user_balance_lock.balance
|
||||
ELSE balance_update.balance
|
||||
END,
|
||||
$4::jsonb,
|
||||
$5::boolean,
|
||||
balance_update."updatedAt",
|
||||
COALESCE($9, gen_random_uuid()::text)
|
||||
FROM balance_update, user_balance_lock
|
||||
RETURNING "runningBalance", "transactionKey"
|
||||
SELECT "runningBalance" as balance, "transactionKey" FROM transaction_insert;
|
||||
""",
|
||||
user_id, # $1
|
||||
amount, # $2
|
||||
transaction_type.value, # $3
|
||||
dumps(metadata.data), # $4 - use pre-serialized JSON string for JSONB
|
||||
is_active, # $5
|
||||
POSTGRES_INT_MAX, # $6 - overflow protection
|
||||
ceiling_balance, # $7 - ceiling balance (nullable)
|
||||
fail_insufficient_credits, # $8 - check balance for spending
|
||||
transaction_key, # $9 - transaction key (nullable)
|
||||
POSTGRES_INT_MIN, # $10 - underflow protection
|
||||
)
|
||||
SELECT "runningBalance" as balance, "transactionKey" FROM transaction_insert;
|
||||
""",
|
||||
user_id, # $1
|
||||
amount, # $2
|
||||
transaction_type.value, # $3
|
||||
dumps(metadata.data), # $4 - use pre-serialized JSON string for JSONB
|
||||
is_active, # $5
|
||||
POSTGRES_INT_MAX, # $6 - overflow protection
|
||||
ceiling_balance, # $7 - ceiling balance (nullable)
|
||||
fail_insufficient_credits, # $8 - check balance for spending
|
||||
transaction_key, # $9 - transaction key (nullable)
|
||||
POSTGRES_INT_MIN, # $10 - underflow protection
|
||||
)
|
||||
except Exception as e:
|
||||
# Convert raw SQL unique constraint violations to UniqueViolationError
|
||||
# for consistent exception handling throughout the codebase
|
||||
error_str = str(e).lower()
|
||||
if (
|
||||
"already exists" in error_str
|
||||
or "duplicate key" in error_str
|
||||
or "unique constraint" in error_str
|
||||
):
|
||||
# Extract table and constraint info for better error messages
|
||||
# Re-raise as a UniqueViolationError but with proper format
|
||||
# Create a minimal data structure that the error constructor expects
|
||||
raise UniqueViolationError({"error": str(e), "user_facing_error": {}})
|
||||
# For any other error, re-raise as-is
|
||||
raise
|
||||
|
||||
if result:
|
||||
new_balance, tx_key = result[0]["balance"], result[0]["transactionKey"]
|
||||
@@ -494,10 +539,7 @@ class UserCreditBase(ABC):
|
||||
|
||||
# Must be insufficient balance for spending operation
|
||||
if amount < 0 and fail_insufficient_credits:
|
||||
user_balance_record = await UserBalance.prisma().find_unique(
|
||||
where={"userId": user_id}
|
||||
)
|
||||
current_balance = user_balance_record.balance if user_balance_record else 0
|
||||
current_balance, _ = await self._get_credits(user_id)
|
||||
raise InsufficientBalanceError(
|
||||
message=f"Insufficient balance of ${current_balance/100}, where this will cost ${abs(amount)/100}",
|
||||
user_id=user_id,
|
||||
@@ -582,18 +624,9 @@ class UserCredit(UserCreditBase):
|
||||
),
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
# Handle both Prisma UniqueViolationError and raw SQL unique constraint violations
|
||||
# Raw SQL raises different exception types than Prisma ORM
|
||||
error_str = str(e).lower()
|
||||
if (
|
||||
isinstance(e, UniqueViolationError)
|
||||
or "already exists" in error_str
|
||||
or "duplicate key" in error_str
|
||||
):
|
||||
return False
|
||||
# For any other error, re-raise
|
||||
raise
|
||||
except UniqueViolationError:
|
||||
# User already received this reward
|
||||
return False
|
||||
|
||||
async def top_up_refund(
|
||||
self, user_id: str, transaction_key: str, metadata: dict[str, str]
|
||||
|
||||
@@ -1548,11 +1548,12 @@ class ExecutionManager(AppProcess):
|
||||
logger.warning(
|
||||
f"[{self.service_name}] Graph {graph_exec_id} already running on pod {current_owner}"
|
||||
)
|
||||
_ack_message(reject=True, requeue=False)
|
||||
else:
|
||||
logger.warning(
|
||||
f"[{self.service_name}] Could not acquire lock for {graph_exec_id} - Redis unavailable"
|
||||
)
|
||||
_ack_message(reject=True, requeue=True)
|
||||
_ack_message(reject=True, requeue=True)
|
||||
return
|
||||
self._execution_locks[graph_exec_id] = cluster_lock
|
||||
|
||||
|
||||
@@ -14,19 +14,49 @@ def configured_snapshot(snapshot: Snapshot) -> Snapshot:
|
||||
@pytest.fixture
|
||||
def test_user_id() -> str:
|
||||
"""Test user ID fixture."""
|
||||
return "test-user-id"
|
||||
return "3e53486c-cf57-477e-ba2a-cb02dc828e1a"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def admin_user_id() -> str:
|
||||
"""Admin user ID fixture."""
|
||||
return "admin-user-id"
|
||||
return "4e53486c-cf57-477e-ba2a-cb02dc828e1b"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def target_user_id() -> str:
|
||||
"""Target user ID fixture."""
|
||||
return "target-user-id"
|
||||
return "5e53486c-cf57-477e-ba2a-cb02dc828e1c"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def setup_test_user(test_user_id):
|
||||
"""Create test user in database before tests."""
|
||||
from backend.data.user import get_or_create_user
|
||||
|
||||
# Create the test user in the database using JWT token format
|
||||
user_data = {
|
||||
"sub": test_user_id,
|
||||
"email": "test@example.com",
|
||||
"user_metadata": {"name": "Test User"},
|
||||
}
|
||||
await get_or_create_user(user_data)
|
||||
return test_user_id
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def setup_admin_user(admin_user_id):
|
||||
"""Create admin user in database before tests."""
|
||||
from backend.data.user import get_or_create_user
|
||||
|
||||
# Create the admin user in the database using JWT token format
|
||||
user_data = {
|
||||
"sub": admin_user_id,
|
||||
"email": "test-admin@example.com",
|
||||
"user_metadata": {"name": "Test Admin"},
|
||||
}
|
||||
await get_or_create_user(user_data)
|
||||
return admin_user_id
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
|
||||
@@ -23,10 +23,13 @@ client = fastapi.testclient.TestClient(app)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_app_auth(mock_jwt_user):
|
||||
def setup_app_auth(mock_jwt_user, setup_test_user):
|
||||
"""Setup auth overrides for all tests in this module"""
|
||||
from autogpt_libs.auth.jwt_utils import get_jwt_payload
|
||||
|
||||
# setup_test_user fixture already executed and user is created in database
|
||||
# It returns the user_id which we don't need to await
|
||||
|
||||
app.dependency_overrides[get_jwt_payload] = mock_jwt_user["get_jwt_payload"]
|
||||
yield
|
||||
app.dependency_overrides.clear()
|
||||
@@ -274,12 +277,22 @@ def test_configure_auto_top_up(
|
||||
snapshot: Snapshot,
|
||||
) -> None:
|
||||
"""Test configure auto top-up endpoint - this test would have caught the enum casting bug"""
|
||||
# Mock the set_auto_top_up function to avoid database calls
|
||||
mock_set_auto_top_up = mocker.patch(
|
||||
# Mock the set_auto_top_up function to avoid database operations
|
||||
mocker.patch(
|
||||
"backend.server.routers.v1.set_auto_top_up",
|
||||
return_value=None,
|
||||
)
|
||||
|
||||
# Mock credit model to avoid Stripe API calls
|
||||
mock_credit_model = mocker.AsyncMock()
|
||||
mock_credit_model.get_credits.return_value = 50 # Current balance below threshold
|
||||
mock_credit_model.top_up_credits.return_value = None
|
||||
|
||||
mocker.patch(
|
||||
"backend.server.routers.v1.get_user_credit_model",
|
||||
return_value=mock_credit_model,
|
||||
)
|
||||
|
||||
# Test data
|
||||
request_data = {
|
||||
"threshold": 100,
|
||||
@@ -292,27 +305,24 @@ def test_configure_auto_top_up(
|
||||
assert response.status_code == 200
|
||||
assert response.json() == "Auto top-up settings updated"
|
||||
|
||||
# Verify the function was called with correct parameters
|
||||
mock_set_auto_top_up.assert_called_once()
|
||||
call_args = mock_set_auto_top_up.call_args
|
||||
|
||||
# Check user_id (from mock auth)
|
||||
assert call_args[0][0] == "test-user-id"
|
||||
|
||||
# Check AutoTopUpConfig object
|
||||
config_arg = call_args[0][1]
|
||||
assert isinstance(config_arg, AutoTopUpConfig)
|
||||
assert config_arg.threshold == 100
|
||||
assert config_arg.amount == 500
|
||||
|
||||
|
||||
def test_configure_auto_top_up_validation_errors(
|
||||
mocker: pytest_mock.MockFixture,
|
||||
) -> None:
|
||||
"""Test configure auto top-up endpoint validation"""
|
||||
# Mock to avoid database calls
|
||||
# Mock set_auto_top_up to avoid database operations for successful case
|
||||
mocker.patch("backend.server.routers.v1.set_auto_top_up")
|
||||
|
||||
# Mock credit model to avoid Stripe API calls for the successful case
|
||||
mock_credit_model = mocker.AsyncMock()
|
||||
mock_credit_model.get_credits.return_value = 50
|
||||
mock_credit_model.top_up_credits.return_value = None
|
||||
|
||||
mocker.patch(
|
||||
"backend.server.routers.v1.get_user_credit_model",
|
||||
return_value=mock_credit_model,
|
||||
)
|
||||
|
||||
# Test negative threshold
|
||||
response = client.post(
|
||||
"/credits/auto-top-up", json={"threshold": -1, "amount": 500}
|
||||
|
||||
@@ -1,38 +1,16 @@
|
||||
-- Create UserBalance table for atomic credit operations
|
||||
-- This replaces the need for User.balance column and provides better separation of concerns
|
||||
-- UserBalance records are automatically created by the application when users interact with the credit system
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE "UserBalance" (
|
||||
-- CreateTable (only if it doesn't exist)
|
||||
CREATE TABLE IF NOT EXISTS "UserBalance" (
|
||||
"userId" TEXT NOT NULL,
|
||||
"balance" INTEGER NOT NULL DEFAULT 0,
|
||||
"updatedAt" TIMESTAMP(3) NOT NULL,
|
||||
|
||||
CONSTRAINT "UserBalance_pkey" PRIMARY KEY ("userId")
|
||||
CONSTRAINT "UserBalance_pkey" PRIMARY KEY ("userId"),
|
||||
CONSTRAINT "UserBalance_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE
|
||||
);
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX "UserBalance_userId_idx" ON "UserBalance"("userId");
|
||||
|
||||
-- AddForeignKey
|
||||
ALTER TABLE "UserBalance" ADD CONSTRAINT "UserBalance_userId_fkey" FOREIGN KEY ("userId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;
|
||||
|
||||
|
||||
-- Migrate existing user balances from transaction history
|
||||
-- Users with transactions: use their latest runningBalance
|
||||
-- Users without transactions: create with balance 0
|
||||
INSERT INTO "UserBalance" ("userId", "balance", "updatedAt")
|
||||
SELECT
|
||||
u.id as "userId",
|
||||
COALESCE(latest_balances.latest_running_balance, 0) as balance,
|
||||
COALESCE(latest_balances.last_transaction_time, u."updatedAt") as "updatedAt"
|
||||
FROM "User" u
|
||||
LEFT JOIN (
|
||||
SELECT DISTINCT ON (ct."userId")
|
||||
ct."userId" as user_id,
|
||||
ct."runningBalance" as latest_running_balance,
|
||||
ct."createdAt" as last_transaction_time
|
||||
FROM "CreditTransaction" ct
|
||||
WHERE ct."isActive" = true
|
||||
AND ct."runningBalance" IS NOT NULL
|
||||
ORDER BY ct."userId", ct."createdAt" DESC
|
||||
) latest_balances ON u.id = latest_balances.user_id;
|
||||
-- CreateIndex (only if it doesn't exist)
|
||||
CREATE INDEX IF NOT EXISTS "UserBalance_userId_idx" ON "UserBalance"("userId");
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"email": "test@example.com",
|
||||
"id": "test-user-id",
|
||||
"id": "3e53486c-cf57-477e-ba2a-cb02dc828e1a",
|
||||
"name": "Test User"
|
||||
}
|
||||
@@ -28,6 +28,6 @@
|
||||
"recommended_schedule_cron": null,
|
||||
"sub_graphs": [],
|
||||
"trigger_setup_info": null,
|
||||
"user_id": "test-user-id",
|
||||
"user_id": "3e53486c-cf57-477e-ba2a-cb02dc828e1a",
|
||||
"version": 1
|
||||
}
|
||||
@@ -26,7 +26,7 @@
|
||||
"recommended_schedule_cron": null,
|
||||
"sub_graphs": [],
|
||||
"trigger_setup_info": null,
|
||||
"user_id": "test-user-id",
|
||||
"user_id": "3e53486c-cf57-477e-ba2a-cb02dc828e1a",
|
||||
"version": 1
|
||||
}
|
||||
]
|
||||
Reference in New Issue
Block a user