Skip to content

Codebase Remediation Implementation Plan

For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Fix all critical, high, and medium findings from the codebase audit — security, architecture, and performance — in priority order.

Architecture: Incremental fixes to the existing FastAPI monolith. No new services or major structural changes until Week 3-4. Each task is independently deployable and backwards-compatible.

Tech Stack: Python 3.12 / FastAPI / SQLAlchemy 2.0 / Alembic / Redis / Terraform / Docker


Chunk 1: Critical Security (Week "Now")

Task 1: Remove Plaintext API Key Storage and Fallback

Files: - Modify: backend/app/dependencies.py:101-127 - Modify: backend/app/api/auth.py:189 and auth.py:517 - Modify: backend/app/models/lender.py:17 - Create: backend/alembic/versions/034_remove_plaintext_api_key.py - Test: backend/tests/test_api_key_security.py

  • [ ] Step 1: Write failing test — plaintext fallback must not authenticate
# backend/tests/test_api_key_security.py
import pytest
from unittest.mock import patch
from app.dependencies import _authenticate_api_key

@pytest.mark.asyncio
async def test_plaintext_api_key_rejected(db_session):
    """Plaintext API key stored in lender.api_key column must NOT authenticate."""
    from app.models.lender import Lender
    from app.services.auth import hash_password

    lender = Lender(
        name="Test Lender",
        email="plaintext-test@example.com",
        password_hash=hash_password("password123"),
        api_key="raw_key_plaintext_12345",
        api_key_hash=None,  # No hash — only plaintext
        api_key_version=1,
    )
    db_session.add(lender)
    await db_session.flush()

    result = await _authenticate_api_key(db_session, "raw_key_plaintext_12345")
    assert result is None, "Plaintext-only API keys must be rejected"
  • [ ] Step 2: Run test to verify it fails

Run: cd /Users/lordviswa/Desktop/new_project && source backend/venv/bin/activate && python -m pytest backend/tests/test_api_key_security.py -v Expected: FAIL — the plaintext fallback currently returns the lender.

  • [ ] Step 3: Remove plaintext fallback from _authenticate_api_key

In backend/app/dependencies.py, delete lines 121-125 (Strategy 2 block):

# DELETE THESE LINES:
    # Strategy 2: Legacy plaintext match (for lenders not yet migrated to hashed keys)
    result = await db.execute(select(Lender).where(Lender.api_key == api_key))
    lender = result.scalar_one_or_none()
    if lender is not None and lender.is_active:
        return lender

The function should end with just return None after the bcrypt loop.

  • [ ] Step 4: Run test to verify it passes

Run: python -m pytest backend/tests/test_api_key_security.py -v Expected: PASS

  • [ ] Step 5: Stop writing plaintext key on registration

In backend/app/api/auth.py line 189, change:

# Before:
        api_key=raw_api_key,
# After:
        api_key="REDACTED",

  • [ ] Step 6: Stop writing plaintext key on rotation

In backend/app/api/auth.py line 517, delete:

    lender.api_key = new_raw_key  # DELETE THIS LINE

  • [ ] Step 7: Make api_key column nullable in the model

In backend/app/models/lender.py line 17, change:

# Before:
    api_key: Mapped[str] = mapped_column(String(256), unique=True, nullable=False)
# After:
    api_key: Mapped[Optional[str]] = mapped_column(String(256), unique=True, nullable=True)

Add Optional import if not already present.

  • [ ] Step 8: Discover the unique constraint name

Run against local DB:

SELECT constraint_name FROM information_schema.table_constraints
WHERE table_name='lenders' AND constraint_type='UNIQUE';
The constraint on api_key is likely auto-generated as lenders_api_key_key (PostgreSQL convention). Use the actual name in the migration below.

  • [ ] Step 9: Create Alembic migration to NULL out plaintext keys
# backend/alembic/versions/034_remove_plaintext_api_key.py
"""Remove plaintext API key storage.

Revision ID: 034_remove_plaintext
Revises: 033_plaid_links
Create Date: 2026-03-11
"""
from alembic import op
import sqlalchemy as sa

revision = "034_remove_plaintext"
down_revision = "033_plaid_links"
branch_labels = None
depends_on = None

def upgrade() -> None:
    # NULL out plaintext keys for all lenders that have a bcrypt hash
    op.execute(
        "UPDATE lenders SET api_key = NULL WHERE api_key_hash IS NOT NULL"
    )
    # Make column nullable
    op.alter_column("lenders", "api_key", existing_type=sa.String(256), nullable=True)
    # Drop the unique constraint since NULLs will dominate
    # NOTE: Replace constraint name below with actual name from Step 8
    op.drop_constraint("lenders_api_key_key", "lenders", type_="unique")

def downgrade() -> None:
    op.create_unique_constraint("lenders_api_key_key", "lenders", ["api_key"])
    op.alter_column("lenders", "api_key", existing_type=sa.String(256), nullable=False)
  • [ ] Step 9b: Grep for all reads of lender.api_key to ensure nothing breaks

Run: grep -rn "lender\.api_key\b" backend/app/ --include="*.py" | grep -v "api_key_hash\|api_key_secondary\|api_key_version\|api_key_last_used\|api_key_prefix"

Verify each remaining reference either writes None or is already handled by this task.

  • [ ] Step 10: Run full test suite

Run: python -m pytest backend/tests/ -v --tb=short Expected: All 310+ tests pass. Fix any tests that relied on plaintext api_key being set.

  • [ ] Step 11: Commit
git add backend/app/dependencies.py backend/app/api/auth.py backend/app/models/lender.py backend/alembic/versions/034_remove_plaintext_api_key.py backend/tests/test_api_key_security.py
git commit -m "security: remove plaintext API key storage and legacy fallback auth

Removes Strategy 2 plaintext API key authentication from dependencies.py.
Stops writing raw API keys to lender.api_key on registration and rotation.
Migration 034 NULLs existing plaintext keys where bcrypt hash exists."

Task 2: Remove Ed25519 Private Key from README.md

Files: - Modify: README.md (line with private key hex)

  • [ ] Step 1: Find and remove the private key from README.md

Search for the hex string 6a416f05f80a204459d2f181d8c2f1e0794ac0e2c9e0361a7309a56f3cf3a4b5 in README.md. Replace the actual key value with a placeholder:

ISSUER_PRIVATE_KEY_HEX=<generate-a-new-64-char-hex-key>

Also search for any other .env example blocks in README.md that contain real secret values and replace them with placeholders.

  • [ ] Step 2: Verify no other tracked files contain the private key

Run: grep -r "6a416f05f80a204459d2f181d8c2f1e0794ac0e2c9e0361a7309a56f3cf3a4b5" --include="*.py" --include="*.md" --include="*.yml" --include="*.json" . Expected: No matches (the .env file is gitignored).

  • [ ] Step 3: Commit
git add README.md
git commit -m "security: remove Ed25519 issuer private key from README

Replace hardcoded issuer private key with placeholder. The production
key must be rotated — this key has been in version control history."
  • [ ] Step 4: Note for manual follow-up

The key is still in git history. A full rotation requires: 1. Generate new Ed25519 keypair 2. Update production .env / Secrets Manager with new key 3. Re-issue all credentials signed with the old key (or accept them as legacy) 4. Use git filter-repo to purge the old key from history (optional but recommended)


Task 3: Note — Rotate SendGrid API Key (Manual)

This requires logging into the SendGrid dashboard and rotating the key. Not automatable via code. The new key must be set in: - backend/.env locally - AWS Secrets Manager / SSM Parameter Store for production - GitHub Actions secrets if used in CI


Chunk 2: Week 1 — Low-Risk Infrastructure Fixes

Task 4: Redis Singleton Connection Pool

Files: - Modify: backend/app/services/cache.py (full rewrite of connection pattern) - Modify: backend/app/main.py:77-123 (add pool init/teardown to lifespan) - Test: backend/tests/test_cache.py

  • [ ] Step 1: Write failing test — pool is reused across calls
# backend/tests/test_cache.py
import pytest
from unittest.mock import patch, AsyncMock
from app.services import cache

@pytest.mark.asyncio
async def test_cache_uses_singleton_pool():
    """Cache operations must reuse a single Redis pool, not create new connections."""
    with patch.object(cache, "_pool", new=AsyncMock()) as mock_pool:
        mock_pool.get = AsyncMock(return_value=None)
        mock_pool.setex = AsyncMock()
        mock_pool.delete = AsyncMock()

        await cache.get_cached_trust_entry("did:key:z123")
        await cache.set_cached_trust_entry("did:key:z123", {"name": "Test"})
        await cache.invalidate_trust_cache("did:key:z123")

        # Pool should NOT be closed between operations
        mock_pool.aclose.assert_not_called()
  • [ ] Step 2: Run test to verify it fails

Run: python -m pytest backend/tests/test_cache.py::test_cache_uses_singleton_pool -v Expected: FAIL — current code creates/closes connections per call.

  • [ ] Step 3: Refactor cache.py to use singleton pool

Replace _get_redis() pattern with a module-level pool:

# backend/app/services/cache.py — new connection management

import redis.asyncio as aioredis
from typing import Optional
from app.config import get_settings

settings = get_settings()

# Module-level singleton — initialized via init_cache_pool(), closed via close_cache_pool()
_pool: Optional[aioredis.Redis] = None


def init_cache_pool() -> None:
    """Initialize the Redis connection pool. Call once at app startup."""
    global _pool
    _pool = aioredis.from_url(settings.redis_url, decode_responses=True)


async def close_cache_pool() -> None:
    """Close the Redis connection pool. Call at app shutdown."""
    global _pool
    if _pool:
        await _pool.aclose()
        _pool = None


def _get_redis() -> aioredis.Redis:
    """Return the shared Redis pool. Falls back to creating one if not initialized."""
    global _pool
    if _pool is None:
        logger.warning("Redis pool not initialized via init_cache_pool() — creating fallback")
        _pool = aioredis.from_url(settings.redis_url, decode_responses=True)
    return _pool

Then remove every await r.aclose() call from every function in the file. Each function should use r = _get_redis() and operate directly — no close.

Example — get_cached_trust_entry becomes:

async def get_cached_trust_entry(did: str) -> Optional[dict]:
    try:
        r = _get_redis()
        data = await r.get(f"{PREFIX_TRUST_ENTRY}{did}")
        if data:
            return json.loads(data)
    except Exception as e:
        logger.debug("cache_miss_trust_entry", did=did, error=str(e))
    return None

Apply this pattern to all 10 functions: remove the await r.aclose() line from each.

  • [ ] Step 4: Wire pool init/teardown into main.py lifespan

In backend/app/main.py, inside lifespan():

After _apply_migrations() (line 81), add:

    from app.services.cache import init_cache_pool, close_cache_pool
    init_cache_pool()

In the shutdown section (before logger.info("shutdown_complete")), replace the existing Redis close block (lines 116-122) with:

    from app.services.cache import close_cache_pool
    await close_cache_pool()
    logger.info("redis_pool_closed")

  • [ ] Step 5: Run test to verify it passes

Run: python -m pytest backend/tests/test_cache.py -v Expected: PASS

  • [ ] Step 6: Run full test suite

Run: python -m pytest backend/tests/ -v --tb=short Expected: All tests pass.

  • [ ] Step 7: Commit
git add backend/app/services/cache.py backend/app/main.py backend/tests/test_cache.py
git commit -m "perf: use singleton Redis pool for cache operations

Replaces per-call Redis connection creation/teardown with a module-level
singleton pool initialized at app startup. Eliminates TCP connection
overhead on every cache hit."

Task 5: Move Webhook Dispatch Off Verification Hot Path

Files: - Modify: backend/app/api/verification.py:162-174 - Test: existing verification tests should still pass

  • [ ] Step 1: Change inline await to background task

In backend/app/api/verification.py, replace lines 162-174:

    # Before (blocking):
    await dispatch_event(db, event_type, lender.id, { ... })

    # After (background):
    webhook_payload = {
        "event": event_type,
        "member_did": result.get("member_did", ""),
        "claims_verified": result.get("claims_verified", {}),
        "all_verified": all_verified,
        "verification_id": result.get("verification_id", ""),
        "failed_step": result.get("failed_step"),
        "claim_results": result.get("claim_results", []),
        "risk_score": risk_data["score"] if risk_data else None,
        "risk_level": risk_data["level"] if risk_data else None,
    }
    background_tasks.add_task(dispatch_event, db, event_type, lender.id, webhook_payload)

Note: background_tasks: BackgroundTasks is already a parameter on the verify_proof function (line 88). And BackgroundTasks is already imported (line 6).

  • [ ] Step 2: Run verification tests

Run: python -m pytest backend/tests/test_verification*.py backend/tests/test_e2e*.py -v --tb=short Expected: All pass.

  • [ ] Step 3: Commit
git add backend/app/api/verification.py
git commit -m "perf: move webhook dispatch to background task in verify_proof

Webhook delivery no longer blocks the verification response to lenders.
Uses FastAPI BackgroundTasks which was already imported and available."

Task 6: Delete _apply_migrations() from main.py

Files: - Modify: backend/app/main.py:40-83 - Verify: backend/alembic/versions/014_multi_cu_members.py already exists and covers the same columns

  • [ ] Step 1: Verify Alembic migration 014 exists and covers the same columns

Run: cat backend/alembic/versions/014_multi_cu_members.py

Confirm it adds encrypted_private_key to trust_registry and issuer_id to members. If it does, the raw SQL in _apply_migrations() is fully redundant.

  • [ ] Step 2: Remove _apply_migrations() and its call

In backend/app/main.py: - Delete the entire _apply_migrations() function (lines 40-74) - Delete the try/except block that calls it (lines 80-83):

    # DELETE:
    try:
        await _apply_migrations()
    except Exception as e:
        logger.warning("startup_migration_skipped: %s", str(e))
- Remove the now-unused import of text from sqlalchemy (if it's only used in _apply_migrations)

  • [ ] Step 3: Run full test suite

Run: python -m pytest backend/tests/ -v --tb=short Expected: All pass. Alembic handles schema at deploy time via alembic upgrade head in the Dockerfile CMD.

  • [ ] Step 4: Commit
git add backend/app/main.py
git commit -m "fix: remove raw SQL startup migration from main.py

_apply_migrations() duplicated Alembic migration 014 and ran on every
pod startup. Alembic is the single authority for schema state via
'alembic upgrade head' in the Docker entrypoint."

Task 7: Enable RDS TLS Certificate Verification

Files: - Modify: Dockerfile.backend:19-25 (add CA cert download) - Modify: backend/app/database.py:11-18 and 36-40

  • [ ] Step 1: Add RDS CA cert to Dockerfile

In Dockerfile.backend, in Stage 2 (runtime), after the RUN useradd line (line 27), add:

# Download AWS RDS CA certificate bundle for TLS verification
ADD https://truststore.pki.rds.amazonaws.com/us-east-1/us-east-1-bundle.pem /etc/ssl/certs/rds-ca-bundle.pem
RUN chmod 644 /etc/ssl/certs/rds-ca-bundle.pem
  • [ ] Step 2: Enable TLS verification in database.py

Replace lines 11-18 in backend/app/database.py:

# Before:
_connect_args = {}
if ".rds.amazonaws.com" in settings.database_url:
    _ctx = _ssl.create_default_context()
    _ctx.check_hostname = False
    _ctx.verify_mode = _ssl.CERT_NONE
    _connect_args["ssl"] = _ctx

# After:
_connect_args = {}
if ".rds.amazonaws.com" in settings.database_url:
    _ctx = _ssl.create_default_context(cafile="/etc/ssl/certs/rds-ca-bundle.pem")
    # check_hostname and CERT_REQUIRED are defaults for create_default_context
    _connect_args["ssl"] = _ctx

Apply the same fix for the read-replica block (lines 35-40):

# Before:
    if ".rds.amazonaws.com" in settings.database_read_url:
        _read_ctx = _ssl.create_default_context()
        _read_ctx.check_hostname = False
        _read_ctx.verify_mode = _ssl.CERT_NONE
        _read_connect_args["ssl"] = _read_ctx

# After:
    if ".rds.amazonaws.com" in settings.database_read_url:
        _read_ctx = _ssl.create_default_context(cafile="/etc/ssl/certs/rds-ca-bundle.pem")
        _read_connect_args["ssl"] = _read_ctx
  • [ ] Step 3: Run tests locally (uses local Postgres, not RDS — SSL path won't trigger)

Run: python -m pytest backend/tests/ -v --tb=short Expected: All pass (the RDS check only triggers when the URL contains .rds.amazonaws.com).

  • [ ] Step 4: Commit
git add Dockerfile.backend backend/app/database.py
git commit -m "security: enable RDS TLS certificate verification

Bundle AWS RDS us-east-1 CA cert in Docker image. Replace CERT_NONE
with verified TLS for both primary and read-replica connections.
Prevents MITM attacks on database connections within the VPC."

Task 8: Terraform — Make block_all_traffic a Variable

Files: - Modify: terraform/main.tf:83 - Modify: terraform/variables.tf (add variable)

  • [ ] Step 1: Add variable to variables.tf

Append to terraform/variables.tf:

variable "block_all_traffic" {
  description = "Block all traffic via WAF (emergency kill switch)"
  type        = bool
  default     = false
}
  • [ ] Step 2: Reference variable in main.tf

In terraform/main.tf line 83, change:

# Before:
  block_all_traffic     = true
# After:
  block_all_traffic     = var.block_all_traffic

  • [ ] Step 3: Commit
git add terraform/main.tf terraform/variables.tf
git commit -m "infra: make WAF block_all_traffic a Terraform variable

Replaces hardcoded true with var.block_all_traffic (default false).
Traffic can be blocked/unblocked via terraform apply without editing code."

Chunk 3: Week 2 — Auth & Data Integrity Fixes

Task 9: Token Refresh — Validate User Exists and Is Active

Files: - Modify: backend/app/api/auth.py:340-356 - Test: backend/tests/test_auth_refresh.py

  • [ ] Step 1: Write failing test — refresh for deleted user must fail
# backend/tests/test_auth_refresh.py
import pytest
from httpx import AsyncClient

@pytest.mark.asyncio
async def test_refresh_token_rejected_for_deleted_user(client: AsyncClient, db_session):
    """Refresh token for a deleted/inactive user must return 401."""
    from app.models.member import Member
    from app.services.auth import hash_password, create_refresh_token

    member = Member(
        email="deleted-refresh@example.com",
        password_hash=hash_password("password123"),
        encrypted_name="Deleted User",
        is_active=False,  # Deactivated
    )
    db_session.add(member)
    await db_session.flush()

    refresh = create_refresh_token(str(member.id), "member")
    resp = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
    assert resp.status_code == 401, f"Expected 401 for inactive user, got {resp.status_code}"
  • [ ] Step 2: Run test to verify it fails

Run: python -m pytest backend/tests/test_auth_refresh.py::test_refresh_token_rejected_for_deleted_user -v Expected: FAIL — current refresh endpoint returns 200 without checking DB.

  • [ ] Step 3: Add user validation to refresh endpoint

In backend/app/api/auth.py, replace lines 340-356:

@router.post(
    "/refresh",
    response_model=TokenResponse,
    responses={401: {"description": "Invalid or expired refresh token"}, 429: {"description": "Rate limit exceeded"}},
)
@limiter.limit("5/minute")
async def refresh_token(request: Request, req: RefreshRequest, db: AsyncSession = Depends(get_db)):
    """Exchange a refresh token for a new access/refresh token pair."""
    try:
        payload = decode_token(req.refresh_token)
        if not payload.get("refresh"):
            raise HTTPException(status_code=401, detail="Not a refresh token")
        user_id = payload["sub"]
        user_type = payload["type"]
    except Exception:
        raise HTTPException(status_code=401, detail="Invalid refresh token")

    # Verify user still exists and is active
    model_cls = Member if user_type == "member" else Lender
    result = await db.execute(select(model_cls).where(model_cls.id == user_id))
    user = result.scalar_one_or_none()
    if user is None or not user.is_active:
        raise HTTPException(status_code=401, detail="Account not found or deactivated")

    AUTH_OPERATIONS.labels("refresh", user_type, "success").inc()
    return TokenResponse(
        access_token=create_access_token(user_id, user_type),
        refresh_token=create_refresh_token(user_id, user_type),
        user_type=user_type,
    )

This requires db: AsyncSession = Depends(get_db) as a new parameter on the function, and imports for Member and Lender (check if already imported at top of auth.py).

  • [ ] Step 4: Run test to verify it passes

Run: python -m pytest backend/tests/test_auth_refresh.py -v Expected: PASS

  • [ ] Step 5: Run full test suite

Run: python -m pytest backend/tests/ -v --tb=short Expected: All pass.

  • [ ] Step 6: Commit
git add backend/app/api/auth.py backend/tests/test_auth_refresh.py
git commit -m "security: validate user exists and is active on token refresh

The /auth/refresh endpoint now performs a DB lookup to verify the user
still exists and is_active before issuing new tokens. Prevents banned
or deleted users from refreshing indefinitely."

Task 10: Fix IP Forwarding for CloudFront Proxy

Files: - Modify: backend/app/main.py (add ProxyHeadersMiddleware in create_app)

  • [ ] Step 1: Add ProxyHeadersMiddleware

In backend/app/main.py, inside create_app(), after the CORS middleware block (line 195), add:

    # Trust X-Forwarded-For from load balancer / CloudFront
    # In production behind WAF+ALB, only trust private network ranges
    from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
    app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"])

Note: These CIDRs cover VPC-internal traffic from ALB/CloudFront. If the ALB uses public IPs, expand to include the specific ALB subnets or use "*" behind WAF only.

  • [ ] Step 2: Run full test suite

Run: python -m pytest backend/tests/ -v --tb=short Expected: All pass.

  • [ ] Step 3: Commit
git add backend/app/main.py
git commit -m "security: add ProxyHeadersMiddleware for real client IP

Adds uvicorn ProxyHeadersMiddleware so request.client.host returns
the real client IP from X-Forwarded-For instead of the proxy IP.
Fixes rate limiting and audit logging behind CloudFront."

Task 11: API Key Prefix Index for O(1) Lookup

Files: - Modify: backend/app/models/lender.py (add api_key_prefix column) - Create: backend/alembic/versions/035_api_key_prefix.py - Modify: backend/app/dependencies.py:101-127 (use prefix lookup) - Modify: backend/app/api/auth.py (set prefix on registration and rotation) - Modify: backend/app/services/auth.py (update generate_api_key to include prefix) - Test: backend/tests/test_api_key_prefix.py

  • [ ] Step 1: Write failing test — prefix-based lookup
# backend/tests/test_api_key_prefix.py
import pytest
from app.dependencies import _authenticate_api_key

@pytest.mark.asyncio
async def test_api_key_prefix_lookup(db_session):
    """API key auth should use prefix-based O(1) lookup, not scan all lenders."""
    from app.models.lender import Lender
    from app.services.auth import hash_password, generate_api_key, hash_api_key

    raw_key = generate_api_key()
    prefix = raw_key[:12]

    lender = Lender(
        name="Prefix Test",
        email="prefix-test@example.com",
        password_hash=hash_password("password123"),
        api_key=None,
        api_key_hash=hash_api_key(raw_key),
        api_key_prefix=prefix,
        api_key_version=1,
    )
    db_session.add(lender)
    await db_session.flush()

    result = await _authenticate_api_key(db_session, raw_key)
    assert result is not None
    assert result.id == lender.id
  • [ ] Step 2: Run test to verify it fails

Expected: FAIL — api_key_prefix column doesn't exist yet.

  • [ ] Step 3: Add api_key_prefix to Lender model

In backend/app/models/lender.py, after line 22 (api_key_version), add:

    api_key_prefix: Mapped[Optional[str]] = mapped_column(String(12), nullable=True, index=True)
  • [ ] Step 4: Create Alembic migration
# backend/alembic/versions/035_api_key_prefix.py
"""Add api_key_prefix column for O(1) API key lookup.

Revision ID: 035_api_key_prefix
Revises: 034_remove_plaintext
Create Date: 2026-03-11
"""
from alembic import op
import sqlalchemy as sa

revision = "035_api_key_prefix"
down_revision = "034_remove_plaintext"
branch_labels = None
depends_on = None

def upgrade() -> None:
    op.add_column("lenders", sa.Column("api_key_prefix", sa.String(12), nullable=True))
    op.create_index("ix_lenders_api_key_prefix", "lenders", ["api_key_prefix"])

def downgrade() -> None:
    op.drop_index("ix_lenders_api_key_prefix", table_name="lenders")
    op.drop_column("lenders", "api_key_prefix")
  • [ ] Step 5: Update _authenticate_api_key to use prefix lookup

Replace the Strategy 1 block in backend/app/dependencies.py:

async def _authenticate_api_key(db: AsyncSession, api_key: str):
    """Authenticate an API key using prefix-indexed lookup + bcrypt verify."""
    from app.services.auth import verify_api_key

    prefix = api_key[:12]

    # O(1) lookup by prefix, then bcrypt verify
    result = await db.execute(
        select(Lender).where(
            Lender.is_active == True,  # noqa: E712
            Lender.api_key_prefix == prefix,
        )
    )
    for lender in result.scalars().all():
        if lender.api_key_hash and verify_api_key(api_key, lender.api_key_hash):
            return lender
        if lender.api_key_secondary_hash and verify_api_key(api_key, lender.api_key_secondary_hash):
            return lender

    return None
  • [ ] Step 6: Set prefix on registration and rotation

In backend/app/api/auth.py, lender_register (around line 185):

    raw_api_key = generate_api_key()
    lender = Lender(
        name=req.name,
        email=req.email,
        password_hash=hash_password(req.password),
        api_key=None,
        api_key_hash=hash_api_key(raw_api_key),
        api_key_prefix=raw_api_key[:12],
        api_key_version=1,
    )

In lender_rotate_key (around line 516):

    new_raw_key = generate_api_key()
    lender.api_key_hash = hash_api_key(new_raw_key)
    lender.api_key_prefix = new_raw_key[:12]
    lender.api_key_version += 1

  • [ ] Step 7: Run test to verify it passes

Run: python -m pytest backend/tests/test_api_key_prefix.py -v Expected: PASS

  • [ ] Step 8: Run full test suite

Run: python -m pytest backend/tests/ -v --tb=short Expected: All pass.

  • [ ] Step 9: Commit
git add backend/app/models/lender.py backend/app/dependencies.py backend/app/api/auth.py backend/alembic/versions/035_api_key_prefix.py backend/tests/test_api_key_prefix.py
git commit -m "perf: O(1) API key auth via prefix index lookup

Adds api_key_prefix column (first 12 chars) with index. API key auth
now queries by prefix (typically 1 row) then bcrypt-verifies, instead
of scanning all active lenders. Scales from O(n) to O(1)."

Task 12: Replace datetime.utcnow() with datetime.now(timezone.utc)

Files: - Modify: All Python files in backend/app/ containing datetime.utcnow() - Modify: All Python files in backend/tests/ containing datetime.utcnow()

  • [ ] Step 1: Find all occurrences

Run: grep -rn "datetime.utcnow()" backend/app/ backend/tests/ | wc -l Note the count and file list.

  • [ ] Step 2: Mechanical replacement — standard calls

Run:

find backend/app backend/tests -name "*.py" -exec sed -i '' 's/datetime\.utcnow()/datetime.now(timezone.utc)/g' {} +

  • [ ] Step 2b: Fix aliased imports

Some files import datetime with aliases (e.g., from datetime import datetime as _dt). Find and fix those:

grep -rn "_dt\.utcnow()\|_datetime\.utcnow()" backend/app/ backend/tests/ --include="*.py"
Replace each match manually. Known occurrence: backend/app/dependencies.py line 145 uses _dt.utcnow().

  • [ ] Step 2c: Fix callable defaults (no parentheses)

Some model columns use default=datetime.utcnow (callable, no parens). Replace with:

default=lambda: datetime.now(timezone.utc)
Find them: grep -rn "default=datetime.utcnow\b" backend/app/ --include="*.py"

  • [ ] Step 3: Add timezone import where needed

For every file that was modified, ensure it imports timezone:

from datetime import datetime, timezone

Run: grep -rn "datetime.now(timezone.utc)" backend/app/ | cut -d: -f1 | sort -u

For each file, check if timezone is imported. Add it where missing.

  • [ ] Step 3b: Verify zero remaining occurrences

Run: grep -rn "utcnow" backend/app/ backend/tests/ --include="*.py" Expected: Zero matches.

  • [ ] Step 4: Run full test suite

Run: python -m pytest backend/tests/ -v --tb=short Expected: All pass.

  • [ ] Step 5: Run linter

Run: cd backend && ruff check app/ tests/ --fix Expected: No errors.

  • [ ] Step 6: Commit
git add backend/
git commit -m "fix: replace deprecated datetime.utcnow() with timezone-aware alternative

Replaces all datetime.utcnow() calls with datetime.now(timezone.utc)
across the codebase. Prevents naive/aware datetime comparison bugs
and removes Python 3.12 deprecation warnings."

Chunk 4: Week 3-4 — Architectural Improvements

Task 13: Extract Workers to Separate Entrypoint

Files: - Create: backend/app/worker_main.py (standalone worker runner) - Modify: backend/app/main.py:85-107 (remove worker startup from lifespan) - Modify: Dockerfile.backend (document worker entrypoint)

  • [ ] Step 1: Create standalone worker runner
# backend/app/worker_main.py
"""Standalone entrypoint for background workers.

Run with: python -m app.worker_main
In production: separate Kubernetes Deployment with this as CMD.
"""
import asyncio
import signal
import logging

from app.logging_config import setup_logging
from app.config import get_settings
from app.services.cache import init_cache_pool, close_cache_pool

settings = get_settings()
setup_logging(settings.debug)
logger = logging.getLogger("worker_main")

_shutdown = asyncio.Event()


def _handle_signal():
    logger.info("Shutdown signal received")
    _shutdown.set()


async def main():
    init_cache_pool()

    from app.workers.credential_worker import run_worker_loop as credential_loop
    from app.workers.webhook_retry_worker import run_worker_loop as webhook_loop
    from app.workers.fraud_sentinel_worker import run_worker_loop as fraud_loop
    from app.workers.network_intelligence_worker import run_worker_loop as network_loop

    loop = asyncio.get_running_loop()
    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, _handle_signal)

    tasks = [
        asyncio.create_task(credential_loop()),
        asyncio.create_task(webhook_loop()),
        asyncio.create_task(fraud_loop()),
        asyncio.create_task(network_loop()),
    ]

    logger.info("All workers started (%d tasks)", len(tasks))

    await _shutdown.wait()

    logger.info("Cancelling workers...")
    for t in tasks:
        t.cancel()
    await asyncio.gather(*tasks, return_exceptions=True)

    await close_cache_pool()
    from app.database import engine, read_engine
    await engine.dispose()
    if read_engine is not engine:
        await read_engine.dispose()

    logger.info("Worker shutdown complete")


if __name__ == "__main__":
    asyncio.run(main())
  • [ ] Step 2: Remove worker startup from main.py lifespan

In backend/app/main.py, remove lines 85-107 (the worker creation and cancellation blocks). The lifespan becomes:

@asynccontextmanager
async def lifespan(app: FastAPI):
    logger.info("startup", app_name=settings.app_name, debug=settings.debug)

    from app.services.cache import init_cache_pool, close_cache_pool
    init_cache_pool()

    yield

    # Graceful shutdown
    logger.info("shutdown_started")
    from app.database import engine, read_engine
    await engine.dispose()
    if read_engine is not engine:
        await read_engine.dispose()
    logger.info("db_engines_disposed")

    await close_cache_pool()
    logger.info("redis_pool_closed")
    logger.info("shutdown_complete")
  • [ ] Step 3: Run full test suite

Run: python -m pytest backend/tests/ -v --tb=short Expected: All pass. Tests don't depend on workers running.

  • [ ] Step 4: Verify worker_main runs standalone

Run: cd /Users/lordviswa/Desktop/new_project/backend && source venv/bin/activate && python -c "import asyncio; from app.worker_main import main; print('Import OK')" Expected: Import succeeds without errors.

  • [ ] Step 5: DEPLOYMENT GATE — Do NOT deploy until K8s manifest is ready

WARNING: After this change, deploying the API server without a separate worker Deployment will silently stop all background processing (credential jobs, webhook retries, fraud detection, network intelligence). Before deploying:

  1. Create a Kubernetes Deployment for workers with CMD: python -m app.worker_main
  2. Verify workers run in staging before promoting to production
  3. Monitor credential job queue depth and webhook delivery lag after deploy

Until the K8s manifest is ready, this change is safe to merge but must NOT be deployed to production.

  • [ ] Step 6: Commit
git add backend/app/worker_main.py backend/app/main.py
git commit -m "arch: extract background workers to standalone entrypoint

Workers (credential, webhook, fraud, network) now run via
'python -m app.worker_main' as a separate process. The API server
no longer spawns workers in-process. Enables independent scaling
via separate Kubernetes Deployments."

Task 14: Decompose verify_token_proofs into Pipeline

Files: - Create: backend/app/services/verification_pipeline.py - Modify: backend/app/services/verification.py (delegate to pipeline) - Test: backend/tests/test_verification_pipeline.py

  • [ ] Step 1: Create pipeline module with step methods
# backend/app/services/verification_pipeline.py
"""Verification pipeline: decomposed 7-step flow."""
# NOTE: Do NOT use `from __future__ import annotations` here — project convention
# avoids it due to SQLAlchemy compatibility issues (see MEMORY.md).

import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional, Dict

from sqlalchemy.ext.asyncio import AsyncSession


@dataclass
class StepResult:
    name: str
    label: str
    passed: bool
    detail: str


@dataclass
class PipelineContext:
    """Mutable state passed through pipeline steps."""
    db: AsyncSession
    token: str
    lender_id: uuid.UUID
    verifier_type: str = "lender"
    product_id: Optional[uuid.UUID] = None
    steps: List[StepResult] = field(default_factory=list)
    qr_token: object = None
    credentials: List = field(default_factory=list)
    member: object = None
    member_did: str = ""
    claims_verified: Dict = field(default_factory=dict)
    all_verified: bool = False
    verification_id: str = ""
    failed_step: Optional[str] = None
    error: Optional[str] = None
    issuer_info: Optional[Dict] = None
    auto_fill: List = field(default_factory=list)
    claim_results: List = field(default_factory=list)
    risk: Optional[Dict] = None

    def add_step(self, name: str, label: str, passed: bool, detail: str) -> StepResult:
        step = StepResult(name=name, label=label, passed=passed, detail=detail)
        self.steps.append(step)
        return step

    def fail(self, step_name: str, error: str) -> None:
        self.failed_step = step_name
        self.error = error
  • [ ] Step 2: Extract each step from verification.py into pipeline module

Move the logic for DECODE, FRESHNESS, ISSUER_AUTH, ZKP_VERIFY, REVOCATION, HOLDER_BINDING, and RESULT into separate async def step_decode(ctx), async def step_freshness(ctx), etc. functions. Each returns True to continue or False to short-circuit.

This is a large refactor — extract one step at a time, running tests between each extraction.

  • [ ] Step 3: Create orchestrator function
async def run_pipeline(
    db: AsyncSession,
    token: str,
    lender_id: uuid.UUID,
    verifier_type: str = "lender",
    product_id: Optional[uuid.UUID] = None,
) -> dict:
    """Execute the 7-step verification pipeline."""
    ctx = PipelineContext(
        db=db, token=token, lender_id=lender_id,
        verifier_type=verifier_type, product_id=product_id,
    )

    for step_fn in [step_decode, step_freshness, step_issuer_auth,
                    step_zkp_verify, step_revocation, step_holder_binding,
                    step_result]:
        proceed = await step_fn(ctx)
        if not proceed:
            break

    return _build_response(ctx)
  • [ ] Step 4: Update verification.py to delegate
# In backend/app/services/verification.py, replace verify_token_proofs body:
async def verify_token_proofs(db, token, lender_id, verifier_type="lender", product_id=None):
    from app.services.verification_pipeline import run_pipeline
    return await run_pipeline(db, token, lender_id, verifier_type, product_id)
  • [ ] Step 5: Write pipeline unit tests
# backend/tests/test_verification_pipeline.py
import uuid
import pytest
from app.services.verification_pipeline import PipelineContext, step_decode

@pytest.mark.asyncio
async def test_step_decode_invalid_token(db_session):
    """DECODE step should fail for nonexistent token."""
    ctx = PipelineContext(
        db=db_session, token="nonexistent_token",
        lender_id=uuid.uuid4(), verifier_type="lender",
    )
    result = await step_decode(ctx)
    assert result is False
    assert ctx.failed_step == "DECODE"
    assert len(ctx.steps) == 1
    assert ctx.steps[0].passed is False
  • [ ] Step 6: Run full test suite

Run: python -m pytest backend/tests/ -v --tb=short Expected: All 310+ tests pass.

  • [ ] Step 7: Commit
git add backend/app/services/verification_pipeline.py backend/app/services/verification.py backend/tests/test_verification_pipeline.py
git commit -m "refactor: decompose verify_token_proofs into VerificationPipeline

Extracts the 483-line monolithic function into 7 discrete step functions
(step_decode through step_result) with a PipelineContext dataclass.
Each step is independently testable. verify_token_proofs delegates
to run_pipeline() for backwards compatibility."

Task 15: Split auth.py into Member and Lender Auth Modules

Files: - Create: backend/app/api/member_auth.py - Create: backend/app/api/lender_auth.py - Modify: backend/app/api/auth.py (keep shared endpoints, import sub-routers) - Modify: backend/app/main.py (update router registration if needed)

  • [ ] Step 1: Identify which routes go where

  • member_auth.py: /member/register, /member/login, /member/send-verification-email, /member/verify-email

  • lender_auth.py: /lender/register, /lender/login, /lender/rotate-key
  • auth.py (shared): /refresh, /logout, /logout-all, /password-reset/*, /test/auto-verify

  • [ ] Step 2: Extract member auth routes to member_auth.py

Create backend/app/api/member_auth.py with the member-specific route handlers.

IMPORTANT: Do NOT add prefix="/auth" to the sub-router — the parent auth.py router already has prefix="/auth". Adding it again would double the prefix to /auth/auth/.

# backend/app/api/member_auth.py
router = APIRouter(tags=["auth"])  # NO prefix — parent provides /auth

Move the imports each handler needs. Routes keep their decorators as-is (e.g., @router.post("/member/register")).

  • [ ] Step 3: Extract lender auth routes to lender_auth.py

Same pattern — no prefix on the sub-router.

# backend/app/api/lender_auth.py
router = APIRouter(tags=["auth"])  # NO prefix
  • [ ] Step 4: Update auth.py to include sub-routers
from app.api.member_auth import router as member_auth_router
from app.api.lender_auth import router as lender_auth_router

router.include_router(member_auth_router)
router.include_router(lender_auth_router)
  • [ ] Step 5: Run full test suite

Run: python -m pytest backend/tests/ -v --tb=short Expected: All pass. Routes are at the same paths.

  • [ ] Step 6: Commit
git add backend/app/api/auth.py backend/app/api/member_auth.py backend/app/api/lender_auth.py
git commit -m "refactor: split auth.py (562 lines) into member_auth + lender_auth

Extracts member and lender registration/login routes into dedicated
modules. Shared endpoints (refresh, logout, password reset) stay in
auth.py. Reduces file size and improves code ownership boundaries."

Deferred Items (Tracked)

These are logged for future sprints but not part of this plan:

  • RBAC model: Replace is_admin boolean with role/permission tables
  • localStorage → HTTP-only cookies: Move JWT storage to server-managed cookies
  • Frontend refactors: Split api-client.ts, extract form validation, useProtectedRoute hook
  • TOTP nonce tracking: Redis-backed set to prevent code reuse within window
  • Redis blacklist fail-closed: Evaluate fail-closed strategy on Redis outage
  • Recovery code hashing: Upgrade from SHA-256 to bcrypt
  • Trust registry auth: Require authentication on POST /trust-registry/submit
  • SNS webhook SSRF: Allowlist SubscribeURL hostname
  • Audit log scoping: Restrict /audit/logs to admin-only or scope by lender