Codebase Remediation Implementation Plan¶
For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Fix all critical, high, and medium findings from the codebase audit — security, architecture, and performance — in priority order.
Architecture: Incremental fixes to the existing FastAPI monolith. No new services or major structural changes until Week 3-4. Each task is independently deployable and backwards-compatible.
Tech Stack: Python 3.12 / FastAPI / SQLAlchemy 2.0 / Alembic / Redis / Terraform / Docker
Chunk 1: Critical Security (Week "Now")¶
Task 1: Remove Plaintext API Key Storage and Fallback¶
Files:
- Modify: backend/app/dependencies.py:101-127
- Modify: backend/app/api/auth.py:189 and auth.py:517
- Modify: backend/app/models/lender.py:17
- Create: backend/alembic/versions/034_remove_plaintext_api_key.py
- Test: backend/tests/test_api_key_security.py
- [ ] Step 1: Write failing test — plaintext fallback must not authenticate
# backend/tests/test_api_key_security.py
import pytest
from unittest.mock import patch
from app.dependencies import _authenticate_api_key
@pytest.mark.asyncio
async def test_plaintext_api_key_rejected(db_session):
"""Plaintext API key stored in lender.api_key column must NOT authenticate."""
from app.models.lender import Lender
from app.services.auth import hash_password
lender = Lender(
name="Test Lender",
email="plaintext-test@example.com",
password_hash=hash_password("password123"),
api_key="raw_key_plaintext_12345",
api_key_hash=None, # No hash — only plaintext
api_key_version=1,
)
db_session.add(lender)
await db_session.flush()
result = await _authenticate_api_key(db_session, "raw_key_plaintext_12345")
assert result is None, "Plaintext-only API keys must be rejected"
- [ ] Step 2: Run test to verify it fails
Run: cd /Users/lordviswa/Desktop/new_project && source backend/venv/bin/activate && python -m pytest backend/tests/test_api_key_security.py -v
Expected: FAIL — the plaintext fallback currently returns the lender.
- [ ] Step 3: Remove plaintext fallback from
_authenticate_api_key
In backend/app/dependencies.py, delete lines 121-125 (Strategy 2 block):
# DELETE THESE LINES:
# Strategy 2: Legacy plaintext match (for lenders not yet migrated to hashed keys)
result = await db.execute(select(Lender).where(Lender.api_key == api_key))
lender = result.scalar_one_or_none()
if lender is not None and lender.is_active:
return lender
The function should end with just return None after the bcrypt loop.
- [ ] Step 4: Run test to verify it passes
Run: python -m pytest backend/tests/test_api_key_security.py -v
Expected: PASS
- [ ] Step 5: Stop writing plaintext key on registration
In backend/app/api/auth.py line 189, change:
- [ ] Step 6: Stop writing plaintext key on rotation
In backend/app/api/auth.py line 517, delete:
- [ ] Step 7: Make api_key column nullable in the model
In backend/app/models/lender.py line 17, change:
# Before:
api_key: Mapped[str] = mapped_column(String(256), unique=True, nullable=False)
# After:
api_key: Mapped[Optional[str]] = mapped_column(String(256), unique=True, nullable=True)
Add Optional import if not already present.
- [ ] Step 8: Discover the unique constraint name
Run against local DB:
SELECT constraint_name FROM information_schema.table_constraints
WHERE table_name='lenders' AND constraint_type='UNIQUE';
api_key is likely auto-generated as lenders_api_key_key (PostgreSQL convention). Use the actual name in the migration below.
- [ ] Step 9: Create Alembic migration to NULL out plaintext keys
# backend/alembic/versions/034_remove_plaintext_api_key.py
"""Remove plaintext API key storage.
Revision ID: 034_remove_plaintext
Revises: 033_plaid_links
Create Date: 2026-03-11
"""
from alembic import op
import sqlalchemy as sa
revision = "034_remove_plaintext"
down_revision = "033_plaid_links"
branch_labels = None
depends_on = None
def upgrade() -> None:
# NULL out plaintext keys for all lenders that have a bcrypt hash
op.execute(
"UPDATE lenders SET api_key = NULL WHERE api_key_hash IS NOT NULL"
)
# Make column nullable
op.alter_column("lenders", "api_key", existing_type=sa.String(256), nullable=True)
# Drop the unique constraint since NULLs will dominate
# NOTE: Replace constraint name below with actual name from Step 8
op.drop_constraint("lenders_api_key_key", "lenders", type_="unique")
def downgrade() -> None:
op.create_unique_constraint("lenders_api_key_key", "lenders", ["api_key"])
op.alter_column("lenders", "api_key", existing_type=sa.String(256), nullable=False)
- [ ] Step 9b: Grep for all reads of
lender.api_keyto ensure nothing breaks
Run: grep -rn "lender\.api_key\b" backend/app/ --include="*.py" | grep -v "api_key_hash\|api_key_secondary\|api_key_version\|api_key_last_used\|api_key_prefix"
Verify each remaining reference either writes None or is already handled by this task.
- [ ] Step 10: Run full test suite
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All 310+ tests pass. Fix any tests that relied on plaintext api_key being set.
- [ ] Step 11: Commit
git add backend/app/dependencies.py backend/app/api/auth.py backend/app/models/lender.py backend/alembic/versions/034_remove_plaintext_api_key.py backend/tests/test_api_key_security.py
git commit -m "security: remove plaintext API key storage and legacy fallback auth
Removes Strategy 2 plaintext API key authentication from dependencies.py.
Stops writing raw API keys to lender.api_key on registration and rotation.
Migration 034 NULLs existing plaintext keys where bcrypt hash exists."
Task 2: Remove Ed25519 Private Key from README.md¶
Files:
- Modify: README.md (line with private key hex)
- [ ] Step 1: Find and remove the private key from README.md
Search for the hex string 6a416f05f80a204459d2f181d8c2f1e0794ac0e2c9e0361a7309a56f3cf3a4b5 in README.md.
Replace the actual key value with a placeholder:
Also search for any other .env example blocks in README.md that contain real secret values and replace them with placeholders.
- [ ] Step 2: Verify no other tracked files contain the private key
Run: grep -r "6a416f05f80a204459d2f181d8c2f1e0794ac0e2c9e0361a7309a56f3cf3a4b5" --include="*.py" --include="*.md" --include="*.yml" --include="*.json" .
Expected: No matches (the .env file is gitignored).
- [ ] Step 3: Commit
git add README.md
git commit -m "security: remove Ed25519 issuer private key from README
Replace hardcoded issuer private key with placeholder. The production
key must be rotated — this key has been in version control history."
- [ ] Step 4: Note for manual follow-up
The key is still in git history. A full rotation requires:
1. Generate new Ed25519 keypair
2. Update production .env / Secrets Manager with new key
3. Re-issue all credentials signed with the old key (or accept them as legacy)
4. Use git filter-repo to purge the old key from history (optional but recommended)
Task 3: Note — Rotate SendGrid API Key (Manual)¶
This requires logging into the SendGrid dashboard and rotating the key. Not automatable via code. The new key must be set in:
- backend/.env locally
- AWS Secrets Manager / SSM Parameter Store for production
- GitHub Actions secrets if used in CI
Chunk 2: Week 1 — Low-Risk Infrastructure Fixes¶
Task 4: Redis Singleton Connection Pool¶
Files:
- Modify: backend/app/services/cache.py (full rewrite of connection pattern)
- Modify: backend/app/main.py:77-123 (add pool init/teardown to lifespan)
- Test: backend/tests/test_cache.py
- [ ] Step 1: Write failing test — pool is reused across calls
# backend/tests/test_cache.py
import pytest
from unittest.mock import patch, AsyncMock
from app.services import cache
@pytest.mark.asyncio
async def test_cache_uses_singleton_pool():
"""Cache operations must reuse a single Redis pool, not create new connections."""
with patch.object(cache, "_pool", new=AsyncMock()) as mock_pool:
mock_pool.get = AsyncMock(return_value=None)
mock_pool.setex = AsyncMock()
mock_pool.delete = AsyncMock()
await cache.get_cached_trust_entry("did:key:z123")
await cache.set_cached_trust_entry("did:key:z123", {"name": "Test"})
await cache.invalidate_trust_cache("did:key:z123")
# Pool should NOT be closed between operations
mock_pool.aclose.assert_not_called()
- [ ] Step 2: Run test to verify it fails
Run: python -m pytest backend/tests/test_cache.py::test_cache_uses_singleton_pool -v
Expected: FAIL — current code creates/closes connections per call.
- [ ] Step 3: Refactor cache.py to use singleton pool
Replace _get_redis() pattern with a module-level pool:
# backend/app/services/cache.py — new connection management
import redis.asyncio as aioredis
from typing import Optional
from app.config import get_settings
settings = get_settings()
# Module-level singleton — initialized via init_cache_pool(), closed via close_cache_pool()
_pool: Optional[aioredis.Redis] = None
def init_cache_pool() -> None:
"""Initialize the Redis connection pool. Call once at app startup."""
global _pool
_pool = aioredis.from_url(settings.redis_url, decode_responses=True)
async def close_cache_pool() -> None:
"""Close the Redis connection pool. Call at app shutdown."""
global _pool
if _pool:
await _pool.aclose()
_pool = None
def _get_redis() -> aioredis.Redis:
"""Return the shared Redis pool. Falls back to creating one if not initialized."""
global _pool
if _pool is None:
logger.warning("Redis pool not initialized via init_cache_pool() — creating fallback")
_pool = aioredis.from_url(settings.redis_url, decode_responses=True)
return _pool
Then remove every await r.aclose() call from every function in the file.
Each function should use r = _get_redis() and operate directly — no close.
Example — get_cached_trust_entry becomes:
async def get_cached_trust_entry(did: str) -> Optional[dict]:
try:
r = _get_redis()
data = await r.get(f"{PREFIX_TRUST_ENTRY}{did}")
if data:
return json.loads(data)
except Exception as e:
logger.debug("cache_miss_trust_entry", did=did, error=str(e))
return None
Apply this pattern to all 10 functions: remove the await r.aclose() line from each.
- [ ] Step 4: Wire pool init/teardown into main.py lifespan
In backend/app/main.py, inside lifespan():
After _apply_migrations() (line 81), add:
In the shutdown section (before logger.info("shutdown_complete")), replace the existing Redis close block (lines 116-122) with:
from app.services.cache import close_cache_pool
await close_cache_pool()
logger.info("redis_pool_closed")
- [ ] Step 5: Run test to verify it passes
Run: python -m pytest backend/tests/test_cache.py -v
Expected: PASS
- [ ] Step 6: Run full test suite
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All tests pass.
- [ ] Step 7: Commit
git add backend/app/services/cache.py backend/app/main.py backend/tests/test_cache.py
git commit -m "perf: use singleton Redis pool for cache operations
Replaces per-call Redis connection creation/teardown with a module-level
singleton pool initialized at app startup. Eliminates TCP connection
overhead on every cache hit."
Task 5: Move Webhook Dispatch Off Verification Hot Path¶
Files:
- Modify: backend/app/api/verification.py:162-174
- Test: existing verification tests should still pass
- [ ] Step 1: Change inline await to background task
In backend/app/api/verification.py, replace lines 162-174:
# Before (blocking):
await dispatch_event(db, event_type, lender.id, { ... })
# After (background):
webhook_payload = {
"event": event_type,
"member_did": result.get("member_did", ""),
"claims_verified": result.get("claims_verified", {}),
"all_verified": all_verified,
"verification_id": result.get("verification_id", ""),
"failed_step": result.get("failed_step"),
"claim_results": result.get("claim_results", []),
"risk_score": risk_data["score"] if risk_data else None,
"risk_level": risk_data["level"] if risk_data else None,
}
background_tasks.add_task(dispatch_event, db, event_type, lender.id, webhook_payload)
Note: background_tasks: BackgroundTasks is already a parameter on the verify_proof function (line 88). And BackgroundTasks is already imported (line 6).
- [ ] Step 2: Run verification tests
Run: python -m pytest backend/tests/test_verification*.py backend/tests/test_e2e*.py -v --tb=short
Expected: All pass.
- [ ] Step 3: Commit
git add backend/app/api/verification.py
git commit -m "perf: move webhook dispatch to background task in verify_proof
Webhook delivery no longer blocks the verification response to lenders.
Uses FastAPI BackgroundTasks which was already imported and available."
Task 6: Delete _apply_migrations() from main.py¶
Files:
- Modify: backend/app/main.py:40-83
- Verify: backend/alembic/versions/014_multi_cu_members.py already exists and covers the same columns
- [ ] Step 1: Verify Alembic migration 014 exists and covers the same columns
Run: cat backend/alembic/versions/014_multi_cu_members.py
Confirm it adds encrypted_private_key to trust_registry and issuer_id to members. If it does, the raw SQL in _apply_migrations() is fully redundant.
- [ ] Step 2: Remove
_apply_migrations()and its call
In backend/app/main.py:
- Delete the entire _apply_migrations() function (lines 40-74)
- Delete the try/except block that calls it (lines 80-83):
# DELETE:
try:
await _apply_migrations()
except Exception as e:
logger.warning("startup_migration_skipped: %s", str(e))
text from sqlalchemy (if it's only used in _apply_migrations)
- [ ] Step 3: Run full test suite
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All pass. Alembic handles schema at deploy time via alembic upgrade head in the Dockerfile CMD.
- [ ] Step 4: Commit
git add backend/app/main.py
git commit -m "fix: remove raw SQL startup migration from main.py
_apply_migrations() duplicated Alembic migration 014 and ran on every
pod startup. Alembic is the single authority for schema state via
'alembic upgrade head' in the Docker entrypoint."
Task 7: Enable RDS TLS Certificate Verification¶
Files:
- Modify: Dockerfile.backend:19-25 (add CA cert download)
- Modify: backend/app/database.py:11-18 and 36-40
- [ ] Step 1: Add RDS CA cert to Dockerfile
In Dockerfile.backend, in Stage 2 (runtime), after the RUN useradd line (line 27), add:
# Download AWS RDS CA certificate bundle for TLS verification
ADD https://truststore.pki.rds.amazonaws.com/us-east-1/us-east-1-bundle.pem /etc/ssl/certs/rds-ca-bundle.pem
RUN chmod 644 /etc/ssl/certs/rds-ca-bundle.pem
- [ ] Step 2: Enable TLS verification in database.py
Replace lines 11-18 in backend/app/database.py:
# Before:
_connect_args = {}
if ".rds.amazonaws.com" in settings.database_url:
_ctx = _ssl.create_default_context()
_ctx.check_hostname = False
_ctx.verify_mode = _ssl.CERT_NONE
_connect_args["ssl"] = _ctx
# After:
_connect_args = {}
if ".rds.amazonaws.com" in settings.database_url:
_ctx = _ssl.create_default_context(cafile="/etc/ssl/certs/rds-ca-bundle.pem")
# check_hostname and CERT_REQUIRED are defaults for create_default_context
_connect_args["ssl"] = _ctx
Apply the same fix for the read-replica block (lines 35-40):
# Before:
if ".rds.amazonaws.com" in settings.database_read_url:
_read_ctx = _ssl.create_default_context()
_read_ctx.check_hostname = False
_read_ctx.verify_mode = _ssl.CERT_NONE
_read_connect_args["ssl"] = _read_ctx
# After:
if ".rds.amazonaws.com" in settings.database_read_url:
_read_ctx = _ssl.create_default_context(cafile="/etc/ssl/certs/rds-ca-bundle.pem")
_read_connect_args["ssl"] = _read_ctx
- [ ] Step 3: Run tests locally (uses local Postgres, not RDS — SSL path won't trigger)
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All pass (the RDS check only triggers when the URL contains .rds.amazonaws.com).
- [ ] Step 4: Commit
git add Dockerfile.backend backend/app/database.py
git commit -m "security: enable RDS TLS certificate verification
Bundle AWS RDS us-east-1 CA cert in Docker image. Replace CERT_NONE
with verified TLS for both primary and read-replica connections.
Prevents MITM attacks on database connections within the VPC."
Task 8: Terraform — Make block_all_traffic a Variable¶
Files:
- Modify: terraform/main.tf:83
- Modify: terraform/variables.tf (add variable)
- [ ] Step 1: Add variable to variables.tf
Append to terraform/variables.tf:
variable "block_all_traffic" {
description = "Block all traffic via WAF (emergency kill switch)"
type = bool
default = false
}
- [ ] Step 2: Reference variable in main.tf
In terraform/main.tf line 83, change:
- [ ] Step 3: Commit
git add terraform/main.tf terraform/variables.tf
git commit -m "infra: make WAF block_all_traffic a Terraform variable
Replaces hardcoded true with var.block_all_traffic (default false).
Traffic can be blocked/unblocked via terraform apply without editing code."
Chunk 3: Week 2 — Auth & Data Integrity Fixes¶
Task 9: Token Refresh — Validate User Exists and Is Active¶
Files:
- Modify: backend/app/api/auth.py:340-356
- Test: backend/tests/test_auth_refresh.py
- [ ] Step 1: Write failing test — refresh for deleted user must fail
# backend/tests/test_auth_refresh.py
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_refresh_token_rejected_for_deleted_user(client: AsyncClient, db_session):
"""Refresh token for a deleted/inactive user must return 401."""
from app.models.member import Member
from app.services.auth import hash_password, create_refresh_token
member = Member(
email="deleted-refresh@example.com",
password_hash=hash_password("password123"),
encrypted_name="Deleted User",
is_active=False, # Deactivated
)
db_session.add(member)
await db_session.flush()
refresh = create_refresh_token(str(member.id), "member")
resp = await client.post("/api/v1/auth/refresh", json={"refresh_token": refresh})
assert resp.status_code == 401, f"Expected 401 for inactive user, got {resp.status_code}"
- [ ] Step 2: Run test to verify it fails
Run: python -m pytest backend/tests/test_auth_refresh.py::test_refresh_token_rejected_for_deleted_user -v
Expected: FAIL — current refresh endpoint returns 200 without checking DB.
- [ ] Step 3: Add user validation to refresh endpoint
In backend/app/api/auth.py, replace lines 340-356:
@router.post(
"/refresh",
response_model=TokenResponse,
responses={401: {"description": "Invalid or expired refresh token"}, 429: {"description": "Rate limit exceeded"}},
)
@limiter.limit("5/minute")
async def refresh_token(request: Request, req: RefreshRequest, db: AsyncSession = Depends(get_db)):
"""Exchange a refresh token for a new access/refresh token pair."""
try:
payload = decode_token(req.refresh_token)
if not payload.get("refresh"):
raise HTTPException(status_code=401, detail="Not a refresh token")
user_id = payload["sub"]
user_type = payload["type"]
except Exception:
raise HTTPException(status_code=401, detail="Invalid refresh token")
# Verify user still exists and is active
model_cls = Member if user_type == "member" else Lender
result = await db.execute(select(model_cls).where(model_cls.id == user_id))
user = result.scalar_one_or_none()
if user is None or not user.is_active:
raise HTTPException(status_code=401, detail="Account not found or deactivated")
AUTH_OPERATIONS.labels("refresh", user_type, "success").inc()
return TokenResponse(
access_token=create_access_token(user_id, user_type),
refresh_token=create_refresh_token(user_id, user_type),
user_type=user_type,
)
This requires db: AsyncSession = Depends(get_db) as a new parameter on the function, and imports for Member and Lender (check if already imported at top of auth.py).
- [ ] Step 4: Run test to verify it passes
Run: python -m pytest backend/tests/test_auth_refresh.py -v
Expected: PASS
- [ ] Step 5: Run full test suite
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All pass.
- [ ] Step 6: Commit
git add backend/app/api/auth.py backend/tests/test_auth_refresh.py
git commit -m "security: validate user exists and is active on token refresh
The /auth/refresh endpoint now performs a DB lookup to verify the user
still exists and is_active before issuing new tokens. Prevents banned
or deleted users from refreshing indefinitely."
Task 10: Fix IP Forwarding for CloudFront Proxy¶
Files:
- Modify: backend/app/main.py (add ProxyHeadersMiddleware in create_app)
- [ ] Step 1: Add ProxyHeadersMiddleware
In backend/app/main.py, inside create_app(), after the CORS middleware block (line 195), add:
# Trust X-Forwarded-For from load balancer / CloudFront
# In production behind WAF+ALB, only trust private network ranges
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts=["10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"])
Note: These CIDRs cover VPC-internal traffic from ALB/CloudFront. If the ALB uses
public IPs, expand to include the specific ALB subnets or use "*" behind WAF only.
- [ ] Step 2: Run full test suite
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All pass.
- [ ] Step 3: Commit
git add backend/app/main.py
git commit -m "security: add ProxyHeadersMiddleware for real client IP
Adds uvicorn ProxyHeadersMiddleware so request.client.host returns
the real client IP from X-Forwarded-For instead of the proxy IP.
Fixes rate limiting and audit logging behind CloudFront."
Task 11: API Key Prefix Index for O(1) Lookup¶
Files:
- Modify: backend/app/models/lender.py (add api_key_prefix column)
- Create: backend/alembic/versions/035_api_key_prefix.py
- Modify: backend/app/dependencies.py:101-127 (use prefix lookup)
- Modify: backend/app/api/auth.py (set prefix on registration and rotation)
- Modify: backend/app/services/auth.py (update generate_api_key to include prefix)
- Test: backend/tests/test_api_key_prefix.py
- [ ] Step 1: Write failing test — prefix-based lookup
# backend/tests/test_api_key_prefix.py
import pytest
from app.dependencies import _authenticate_api_key
@pytest.mark.asyncio
async def test_api_key_prefix_lookup(db_session):
"""API key auth should use prefix-based O(1) lookup, not scan all lenders."""
from app.models.lender import Lender
from app.services.auth import hash_password, generate_api_key, hash_api_key
raw_key = generate_api_key()
prefix = raw_key[:12]
lender = Lender(
name="Prefix Test",
email="prefix-test@example.com",
password_hash=hash_password("password123"),
api_key=None,
api_key_hash=hash_api_key(raw_key),
api_key_prefix=prefix,
api_key_version=1,
)
db_session.add(lender)
await db_session.flush()
result = await _authenticate_api_key(db_session, raw_key)
assert result is not None
assert result.id == lender.id
- [ ] Step 2: Run test to verify it fails
Expected: FAIL — api_key_prefix column doesn't exist yet.
- [ ] Step 3: Add
api_key_prefixto Lender model
In backend/app/models/lender.py, after line 22 (api_key_version), add:
- [ ] Step 4: Create Alembic migration
# backend/alembic/versions/035_api_key_prefix.py
"""Add api_key_prefix column for O(1) API key lookup.
Revision ID: 035_api_key_prefix
Revises: 034_remove_plaintext
Create Date: 2026-03-11
"""
from alembic import op
import sqlalchemy as sa
revision = "035_api_key_prefix"
down_revision = "034_remove_plaintext"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column("lenders", sa.Column("api_key_prefix", sa.String(12), nullable=True))
op.create_index("ix_lenders_api_key_prefix", "lenders", ["api_key_prefix"])
def downgrade() -> None:
op.drop_index("ix_lenders_api_key_prefix", table_name="lenders")
op.drop_column("lenders", "api_key_prefix")
- [ ] Step 5: Update
_authenticate_api_keyto use prefix lookup
Replace the Strategy 1 block in backend/app/dependencies.py:
async def _authenticate_api_key(db: AsyncSession, api_key: str):
"""Authenticate an API key using prefix-indexed lookup + bcrypt verify."""
from app.services.auth import verify_api_key
prefix = api_key[:12]
# O(1) lookup by prefix, then bcrypt verify
result = await db.execute(
select(Lender).where(
Lender.is_active == True, # noqa: E712
Lender.api_key_prefix == prefix,
)
)
for lender in result.scalars().all():
if lender.api_key_hash and verify_api_key(api_key, lender.api_key_hash):
return lender
if lender.api_key_secondary_hash and verify_api_key(api_key, lender.api_key_secondary_hash):
return lender
return None
- [ ] Step 6: Set prefix on registration and rotation
In backend/app/api/auth.py, lender_register (around line 185):
raw_api_key = generate_api_key()
lender = Lender(
name=req.name,
email=req.email,
password_hash=hash_password(req.password),
api_key=None,
api_key_hash=hash_api_key(raw_api_key),
api_key_prefix=raw_api_key[:12],
api_key_version=1,
)
In lender_rotate_key (around line 516):
new_raw_key = generate_api_key()
lender.api_key_hash = hash_api_key(new_raw_key)
lender.api_key_prefix = new_raw_key[:12]
lender.api_key_version += 1
- [ ] Step 7: Run test to verify it passes
Run: python -m pytest backend/tests/test_api_key_prefix.py -v
Expected: PASS
- [ ] Step 8: Run full test suite
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All pass.
- [ ] Step 9: Commit
git add backend/app/models/lender.py backend/app/dependencies.py backend/app/api/auth.py backend/alembic/versions/035_api_key_prefix.py backend/tests/test_api_key_prefix.py
git commit -m "perf: O(1) API key auth via prefix index lookup
Adds api_key_prefix column (first 12 chars) with index. API key auth
now queries by prefix (typically 1 row) then bcrypt-verifies, instead
of scanning all active lenders. Scales from O(n) to O(1)."
Task 12: Replace datetime.utcnow() with datetime.now(timezone.utc)¶
Files:
- Modify: All Python files in backend/app/ containing datetime.utcnow()
- Modify: All Python files in backend/tests/ containing datetime.utcnow()
- [ ] Step 1: Find all occurrences
Run: grep -rn "datetime.utcnow()" backend/app/ backend/tests/ | wc -l
Note the count and file list.
- [ ] Step 2: Mechanical replacement — standard calls
Run:
find backend/app backend/tests -name "*.py" -exec sed -i '' 's/datetime\.utcnow()/datetime.now(timezone.utc)/g' {} +
- [ ] Step 2b: Fix aliased imports
Some files import datetime with aliases (e.g., from datetime import datetime as _dt).
Find and fix those:
backend/app/dependencies.py line 145 uses _dt.utcnow().
- [ ] Step 2c: Fix callable defaults (no parentheses)
Some model columns use default=datetime.utcnow (callable, no parens). Replace with:
grep -rn "default=datetime.utcnow\b" backend/app/ --include="*.py"
- [ ] Step 3: Add
timezoneimport where needed
For every file that was modified, ensure it imports timezone:
Run: grep -rn "datetime.now(timezone.utc)" backend/app/ | cut -d: -f1 | sort -u
For each file, check if timezone is imported. Add it where missing.
- [ ] Step 3b: Verify zero remaining occurrences
Run: grep -rn "utcnow" backend/app/ backend/tests/ --include="*.py"
Expected: Zero matches.
- [ ] Step 4: Run full test suite
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All pass.
- [ ] Step 5: Run linter
Run: cd backend && ruff check app/ tests/ --fix
Expected: No errors.
- [ ] Step 6: Commit
git add backend/
git commit -m "fix: replace deprecated datetime.utcnow() with timezone-aware alternative
Replaces all datetime.utcnow() calls with datetime.now(timezone.utc)
across the codebase. Prevents naive/aware datetime comparison bugs
and removes Python 3.12 deprecation warnings."
Chunk 4: Week 3-4 — Architectural Improvements¶
Task 13: Extract Workers to Separate Entrypoint¶
Files:
- Create: backend/app/worker_main.py (standalone worker runner)
- Modify: backend/app/main.py:85-107 (remove worker startup from lifespan)
- Modify: Dockerfile.backend (document worker entrypoint)
- [ ] Step 1: Create standalone worker runner
# backend/app/worker_main.py
"""Standalone entrypoint for background workers.
Run with: python -m app.worker_main
In production: separate Kubernetes Deployment with this as CMD.
"""
import asyncio
import signal
import logging
from app.logging_config import setup_logging
from app.config import get_settings
from app.services.cache import init_cache_pool, close_cache_pool
settings = get_settings()
setup_logging(settings.debug)
logger = logging.getLogger("worker_main")
_shutdown = asyncio.Event()
def _handle_signal():
logger.info("Shutdown signal received")
_shutdown.set()
async def main():
init_cache_pool()
from app.workers.credential_worker import run_worker_loop as credential_loop
from app.workers.webhook_retry_worker import run_worker_loop as webhook_loop
from app.workers.fraud_sentinel_worker import run_worker_loop as fraud_loop
from app.workers.network_intelligence_worker import run_worker_loop as network_loop
loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(sig, _handle_signal)
tasks = [
asyncio.create_task(credential_loop()),
asyncio.create_task(webhook_loop()),
asyncio.create_task(fraud_loop()),
asyncio.create_task(network_loop()),
]
logger.info("All workers started (%d tasks)", len(tasks))
await _shutdown.wait()
logger.info("Cancelling workers...")
for t in tasks:
t.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
await close_cache_pool()
from app.database import engine, read_engine
await engine.dispose()
if read_engine is not engine:
await read_engine.dispose()
logger.info("Worker shutdown complete")
if __name__ == "__main__":
asyncio.run(main())
- [ ] Step 2: Remove worker startup from main.py lifespan
In backend/app/main.py, remove lines 85-107 (the worker creation and cancellation blocks). The lifespan becomes:
@asynccontextmanager
async def lifespan(app: FastAPI):
logger.info("startup", app_name=settings.app_name, debug=settings.debug)
from app.services.cache import init_cache_pool, close_cache_pool
init_cache_pool()
yield
# Graceful shutdown
logger.info("shutdown_started")
from app.database import engine, read_engine
await engine.dispose()
if read_engine is not engine:
await read_engine.dispose()
logger.info("db_engines_disposed")
await close_cache_pool()
logger.info("redis_pool_closed")
logger.info("shutdown_complete")
- [ ] Step 3: Run full test suite
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All pass. Tests don't depend on workers running.
- [ ] Step 4: Verify worker_main runs standalone
Run: cd /Users/lordviswa/Desktop/new_project/backend && source venv/bin/activate && python -c "import asyncio; from app.worker_main import main; print('Import OK')"
Expected: Import succeeds without errors.
- [ ] Step 5: DEPLOYMENT GATE — Do NOT deploy until K8s manifest is ready
WARNING: After this change, deploying the API server without a separate worker Deployment will silently stop all background processing (credential jobs, webhook retries, fraud detection, network intelligence). Before deploying:
- Create a Kubernetes Deployment for workers with CMD:
python -m app.worker_main- Verify workers run in staging before promoting to production
- Monitor credential job queue depth and webhook delivery lag after deploy
Until the K8s manifest is ready, this change is safe to merge but must NOT be deployed to production.
- [ ] Step 6: Commit
git add backend/app/worker_main.py backend/app/main.py
git commit -m "arch: extract background workers to standalone entrypoint
Workers (credential, webhook, fraud, network) now run via
'python -m app.worker_main' as a separate process. The API server
no longer spawns workers in-process. Enables independent scaling
via separate Kubernetes Deployments."
Task 14: Decompose verify_token_proofs into Pipeline¶
Files:
- Create: backend/app/services/verification_pipeline.py
- Modify: backend/app/services/verification.py (delegate to pipeline)
- Test: backend/tests/test_verification_pipeline.py
- [ ] Step 1: Create pipeline module with step methods
# backend/app/services/verification_pipeline.py
"""Verification pipeline: decomposed 7-step flow."""
# NOTE: Do NOT use `from __future__ import annotations` here — project convention
# avoids it due to SQLAlchemy compatibility issues (see MEMORY.md).
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional, Dict
from sqlalchemy.ext.asyncio import AsyncSession
@dataclass
class StepResult:
name: str
label: str
passed: bool
detail: str
@dataclass
class PipelineContext:
"""Mutable state passed through pipeline steps."""
db: AsyncSession
token: str
lender_id: uuid.UUID
verifier_type: str = "lender"
product_id: Optional[uuid.UUID] = None
steps: List[StepResult] = field(default_factory=list)
qr_token: object = None
credentials: List = field(default_factory=list)
member: object = None
member_did: str = ""
claims_verified: Dict = field(default_factory=dict)
all_verified: bool = False
verification_id: str = ""
failed_step: Optional[str] = None
error: Optional[str] = None
issuer_info: Optional[Dict] = None
auto_fill: List = field(default_factory=list)
claim_results: List = field(default_factory=list)
risk: Optional[Dict] = None
def add_step(self, name: str, label: str, passed: bool, detail: str) -> StepResult:
step = StepResult(name=name, label=label, passed=passed, detail=detail)
self.steps.append(step)
return step
def fail(self, step_name: str, error: str) -> None:
self.failed_step = step_name
self.error = error
- [ ] Step 2: Extract each step from verification.py into pipeline module
Move the logic for DECODE, FRESHNESS, ISSUER_AUTH, ZKP_VERIFY, REVOCATION, HOLDER_BINDING, and RESULT into separate async def step_decode(ctx), async def step_freshness(ctx), etc. functions. Each returns True to continue or False to short-circuit.
This is a large refactor — extract one step at a time, running tests between each extraction.
- [ ] Step 3: Create orchestrator function
async def run_pipeline(
db: AsyncSession,
token: str,
lender_id: uuid.UUID,
verifier_type: str = "lender",
product_id: Optional[uuid.UUID] = None,
) -> dict:
"""Execute the 7-step verification pipeline."""
ctx = PipelineContext(
db=db, token=token, lender_id=lender_id,
verifier_type=verifier_type, product_id=product_id,
)
for step_fn in [step_decode, step_freshness, step_issuer_auth,
step_zkp_verify, step_revocation, step_holder_binding,
step_result]:
proceed = await step_fn(ctx)
if not proceed:
break
return _build_response(ctx)
- [ ] Step 4: Update verification.py to delegate
# In backend/app/services/verification.py, replace verify_token_proofs body:
async def verify_token_proofs(db, token, lender_id, verifier_type="lender", product_id=None):
from app.services.verification_pipeline import run_pipeline
return await run_pipeline(db, token, lender_id, verifier_type, product_id)
- [ ] Step 5: Write pipeline unit tests
# backend/tests/test_verification_pipeline.py
import uuid
import pytest
from app.services.verification_pipeline import PipelineContext, step_decode
@pytest.mark.asyncio
async def test_step_decode_invalid_token(db_session):
"""DECODE step should fail for nonexistent token."""
ctx = PipelineContext(
db=db_session, token="nonexistent_token",
lender_id=uuid.uuid4(), verifier_type="lender",
)
result = await step_decode(ctx)
assert result is False
assert ctx.failed_step == "DECODE"
assert len(ctx.steps) == 1
assert ctx.steps[0].passed is False
- [ ] Step 6: Run full test suite
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All 310+ tests pass.
- [ ] Step 7: Commit
git add backend/app/services/verification_pipeline.py backend/app/services/verification.py backend/tests/test_verification_pipeline.py
git commit -m "refactor: decompose verify_token_proofs into VerificationPipeline
Extracts the 483-line monolithic function into 7 discrete step functions
(step_decode through step_result) with a PipelineContext dataclass.
Each step is independently testable. verify_token_proofs delegates
to run_pipeline() for backwards compatibility."
Task 15: Split auth.py into Member and Lender Auth Modules¶
Files:
- Create: backend/app/api/member_auth.py
- Create: backend/app/api/lender_auth.py
- Modify: backend/app/api/auth.py (keep shared endpoints, import sub-routers)
- Modify: backend/app/main.py (update router registration if needed)
-
[ ] Step 1: Identify which routes go where
-
member_auth.py:/member/register,/member/login,/member/send-verification-email,/member/verify-email lender_auth.py:/lender/register,/lender/login,/lender/rotate-key-
auth.py(shared):/refresh,/logout,/logout-all,/password-reset/*,/test/auto-verify -
[ ] Step 2: Extract member auth routes to member_auth.py
Create backend/app/api/member_auth.py with the member-specific route handlers.
IMPORTANT: Do NOT add prefix="/auth" to the sub-router — the parent auth.py
router already has prefix="/auth". Adding it again would double the prefix to /auth/auth/.
# backend/app/api/member_auth.py
router = APIRouter(tags=["auth"]) # NO prefix — parent provides /auth
Move the imports each handler needs. Routes keep their decorators as-is (e.g., @router.post("/member/register")).
- [ ] Step 3: Extract lender auth routes to lender_auth.py
Same pattern — no prefix on the sub-router.
- [ ] Step 4: Update auth.py to include sub-routers
from app.api.member_auth import router as member_auth_router
from app.api.lender_auth import router as lender_auth_router
router.include_router(member_auth_router)
router.include_router(lender_auth_router)
- [ ] Step 5: Run full test suite
Run: python -m pytest backend/tests/ -v --tb=short
Expected: All pass. Routes are at the same paths.
- [ ] Step 6: Commit
git add backend/app/api/auth.py backend/app/api/member_auth.py backend/app/api/lender_auth.py
git commit -m "refactor: split auth.py (562 lines) into member_auth + lender_auth
Extracts member and lender registration/login routes into dedicated
modules. Shared endpoints (refresh, logout, password reset) stay in
auth.py. Reduces file size and improves code ownership boundaries."
Deferred Items (Tracked)¶
These are logged for future sprints but not part of this plan:
- RBAC model: Replace
is_adminboolean with role/permission tables - localStorage → HTTP-only cookies: Move JWT storage to server-managed cookies
- Frontend refactors: Split api-client.ts, extract form validation, useProtectedRoute hook
- TOTP nonce tracking: Redis-backed set to prevent code reuse within window
- Redis blacklist fail-closed: Evaluate fail-closed strategy on Redis outage
- Recovery code hashing: Upgrade from SHA-256 to bcrypt
- Trust registry auth: Require authentication on
POST /trust-registry/submit - SNS webhook SSRF: Allowlist
SubscribeURLhostname - Audit log scoping: Restrict
/audit/logsto admin-only or scope by lender