Files
endurain/backend/app/alembic/versions/v0_16_4_migration.py
João Vitória Silva 0797435d4e Add CASCADE ondelete to relevant foreign keys
Updated foreign key constraints in migration and model files to include ondelete="CASCADE" for idp_id, user_id, and token_family_id. This ensures related records are properly deleted when parent records are removed, improving data integrity and cleanup.
2025-12-22 22:08:56 +00:00

345 lines
10 KiB
Python

"""v0.16.4 migration
Revision ID: ed5f1c867943
Revises: 2af2c0629b37
Create Date: 2025-12-18 12:02:47.808747
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "ed5f1c867943"
down_revision: Union[str, None] = "2af2c0629b37"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# Create oauth_states table
op.create_table(
"oauth_states",
sa.Column(
"id",
sa.String(length=64),
nullable=False,
comment="State parameter itself (secrets.token_urlsafe(32))",
),
sa.Column(
"idp_id", sa.Integer(), nullable=False, comment="Identity provider ID"
),
sa.Column(
"user_id", sa.Integer(), nullable=True, comment="User ID (for link mode)"
),
sa.Column(
"code_challenge",
sa.String(length=128),
nullable=True,
comment="Base64url-encoded SHA256(code_verifier)",
),
sa.Column(
"code_challenge_method",
sa.String(length=10),
nullable=True,
comment="PKCE method (only S256 supported)",
),
sa.Column(
"nonce",
sa.String(length=64),
nullable=False,
comment="OIDC nonce for ID token validation",
),
sa.Column(
"redirect_path",
sa.String(length=500),
nullable=True,
comment="Frontend path after login",
),
sa.Column(
"client_type",
sa.String(length=10),
nullable=False,
comment="Client type: web or mobile",
),
sa.Column(
"ip_address",
sa.String(length=45),
nullable=True,
comment="Client IP address (IPv6 max length)",
),
sa.Column(
"created_at",
sa.DateTime(),
server_default=sa.text("now()"),
nullable=False,
comment="OAuth state creation timestamp",
),
sa.Column(
"expires_at",
sa.DateTime(),
nullable=False,
comment="Hard expiry at 10 minutes (cleanup marker)",
),
sa.Column(
"used",
sa.Boolean(),
nullable=False,
comment="True when state is consumed (prevents replay)",
),
sa.ForeignKeyConstraint(
["idp_id"],
["identity_providers.id"],
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["user_id"],
["users.id"],
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_oauth_states_expires_at"), "oauth_states", ["expires_at"], unique=False
)
op.create_index(op.f("ix_oauth_states_id"), "oauth_states", ["id"], unique=False)
op.create_index(
op.f("ix_oauth_states_idp_id"), "oauth_states", ["idp_id"], unique=False
)
op.create_index(
op.f("ix_oauth_states_used"), "oauth_states", ["used"], unique=False
)
op.create_index(
op.f("ix_oauth_states_user_id"), "oauth_states", ["user_id"], unique=False
)
# Delete all existing sessions before altering user_sessions table
op.execute("DELETE FROM users_sessions")
# Add new columns to users_sessions
op.add_column(
"users_sessions",
sa.Column(
"last_activity_at",
sa.DateTime(),
nullable=False,
comment="Last activity timestamp for idle timeout",
),
)
op.add_column(
"users_sessions",
sa.Column(
"oauth_state_id",
sa.String(length=64),
nullable=True,
comment="Link to OAuth state for PKCE validation",
),
)
op.add_column(
"users_sessions",
sa.Column(
"tokens_exchanged",
sa.Boolean(),
nullable=False,
comment="Prevents duplicate token exchange for mobile",
),
)
op.add_column(
"users_sessions",
sa.Column(
"token_family_id",
sa.String(length=36),
nullable=False,
comment="UUID identifying token family for reuse detection",
),
)
op.add_column(
"users_sessions",
sa.Column(
"rotation_count",
sa.Integer(),
nullable=False,
comment="Number of times refresh token has been rotated",
),
)
op.add_column(
"users_sessions",
sa.Column(
"last_rotation_at",
sa.DateTime(),
nullable=True,
comment="Timestamp of last token rotation",
),
)
op.add_column(
"users_sessions",
sa.Column(
"csrf_token_hash",
sa.String(length=255),
nullable=True,
comment="Hashed CSRF token for refresh validation",
),
)
op.create_index(
op.f("ix_users_sessions_oauth_state_id"),
"users_sessions",
["oauth_state_id"],
unique=False,
)
op.create_index(
op.f("ix_users_sessions_token_family_id"),
"users_sessions",
["token_family_id"],
unique=True,
)
op.create_foreign_key(
"users_sessions_oauth_state_id_fkey",
"users_sessions",
"oauth_states",
["oauth_state_id"],
["id"],
ondelete="SET NULL",
)
# Create rotated_refresh_tokens table
op.create_table(
"rotated_refresh_tokens",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column(
"token_family_id",
sa.String(length=36),
nullable=False,
comment="UUID of the token family",
),
sa.Column(
"hashed_token",
sa.String(length=255),
nullable=False,
comment="Hashed old refresh token",
),
sa.Column(
"rotation_count",
sa.Integer(),
nullable=False,
comment="Which rotation this token belonged to",
),
sa.Column(
"rotated_at",
sa.DateTime(),
nullable=False,
comment="When this token was rotated",
),
sa.Column(
"expires_at",
sa.DateTime(),
nullable=False,
comment="Cleanup marker (rotated_at + 60 seconds)",
),
sa.ForeignKeyConstraint(
["token_family_id"],
["users_sessions.token_family_id"],
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("hashed_token"),
)
op.create_index(
op.f("ix_rotated_refresh_tokens_token_family_id"),
"rotated_refresh_tokens",
["token_family_id"],
unique=False,
)
# Create mfa_backup_codes table
op.create_table(
"mfa_backup_codes",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column(
"user_id",
sa.Integer(),
nullable=False,
comment="User who owns this backup code",
),
sa.Column(
"code_hash",
sa.String(length=255),
nullable=False,
comment="Argon2 hash of the backup code",
),
sa.Column(
"used",
sa.Boolean(),
nullable=False,
comment="Whether this code has been consumed",
),
sa.Column(
"used_at", sa.DateTime(), nullable=True, comment="When this code was used"
),
sa.Column(
"created_at",
sa.DateTime(),
nullable=False,
comment="When this code was generated",
),
sa.Column(
"expires_at",
sa.DateTime(),
nullable=True,
comment="Optional expiry for code rotation policy",
),
sa.ForeignKeyConstraint(["user_id"], ["users.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("code_hash"),
)
op.create_index(
"idx_user_unused_codes", "mfa_backup_codes", ["user_id", "used"], unique=False
)
op.create_index(
op.f("ix_mfa_backup_codes_used"), "mfa_backup_codes", ["used"], unique=False
)
op.create_index(
op.f("ix_mfa_backup_codes_user_id"),
"mfa_backup_codes",
["user_id"],
unique=False,
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_mfa_backup_codes_user_id"), table_name="mfa_backup_codes")
op.drop_index(op.f("ix_mfa_backup_codes_used"), table_name="mfa_backup_codes")
op.drop_index("idx_user_unused_codes", table_name="mfa_backup_codes")
op.drop_table("mfa_backup_codes")
op.drop_constraint(
"users_sessions_oauth_state_id_fkey", "users_sessions", type_="foreignkey"
)
op.drop_index(
op.f("ix_users_sessions_token_family_id"), table_name="users_sessions"
)
op.drop_index(op.f("ix_users_sessions_oauth_state_id"), table_name="users_sessions")
op.drop_column("users_sessions", "csrf_token_hash")
op.drop_column("users_sessions", "last_rotation_at")
op.drop_column("users_sessions", "rotation_count")
op.drop_column("users_sessions", "token_family_id")
op.drop_column("users_sessions", "tokens_exchanged")
op.drop_column("users_sessions", "oauth_state_id")
op.drop_column("users_sessions", "last_activity_at")
op.drop_index(
op.f("ix_rotated_refresh_tokens_token_family_id"),
table_name="rotated_refresh_tokens",
)
op.drop_table("rotated_refresh_tokens")
op.drop_index(op.f("ix_oauth_states_user_id"), table_name="oauth_states")
op.drop_index(op.f("ix_oauth_states_used"), table_name="oauth_states")
op.drop_index(op.f("ix_oauth_states_idp_id"), table_name="oauth_states")
op.drop_index(op.f("ix_oauth_states_id"), table_name="oauth_states")
op.drop_index(op.f("ix_oauth_states_expires_at"), table_name="oauth_states")
op.drop_table("oauth_states")
# ### end Alembic commands ###