This commit is contained in:
2025-11-07 09:05:28 +08:00
parent 4f8ca5a08d
commit 349b1696a6

View File

@@ -0,0 +1,655 @@
"""
Database migration utilities for LandPPT
"""
import os
import time
import logging
from typing import List, Dict, Any
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from .database import AsyncSessionLocal, async_engine
from .models import Base
logger = logging.getLogger(__name__)
class DatabaseMigration:
"""Database migration manager"""
def __init__(self):
self.migrations = []
self._register_migrations()
def _register_migrations(self):
"""Register all available migrations"""
# Migration 001: Initial schema
self.migrations.append({
"version": "001",
"name": "initial_schema",
"description": "Create initial database schema",
"up": self._migration_001_up,
"down": self._migration_001_down
})
# Migration 002: Add indexes
self.migrations.append({
"version": "002",
"name": "add_indexes",
"description": "Add performance indexes",
"up": self._migration_002_up,
"down": self._migration_002_down
})
# Migration 003: Add project_id to todo_stages
self.migrations.append({
"version": "003",
"name": "add_project_id_to_todo_stages",
"description": "Add project_id column to todo_stages table for better indexing",
"up": self._migration_003_up,
"down": self._migration_003_down
})
# Migration 004: Add PPT templates table
self.migrations.append({
"version": "004",
"name": "add_ppt_templates_table",
"description": "Add PPT templates table and update slide_data table",
"up": self._migration_004_up,
"down": self._migration_004_down
})
# Migration 005: Add project_metadata column to projects table
self.migrations.append({
"version": "005",
"name": "add_project_metadata_to_projects",
"description": "Add project_metadata column to projects table for storing template selection and other metadata",
"up": self._migration_005_up,
"down": self._migration_005_down
})
# Migration 006: Add is_user_edited field to slide_data table
self.migrations.append({
"version": "006",
"name": "add_is_user_edited_to_slide_data",
"description": "Add is_user_edited field to slide_data table to track user manual edits",
"up": self._migration_006_up,
"down": self._migration_006_down
})
async def _migration_001_up(self, session: AsyncSession):
"""Create initial schema"""
logger.info("Running migration 001: Creating initial schema")
# Create all tables
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
logger.info("Migration 001 completed successfully")
async def _migration_001_down(self, session: AsyncSession):
"""Drop initial schema"""
logger.info("Rolling back migration 001: Dropping all tables")
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
logger.info("Migration 001 rollback completed")
async def _migration_002_up(self, session: AsyncSession):
"""Add performance indexes"""
logger.info("Running migration 002: Adding performance indexes")
indexes = [
"CREATE INDEX IF NOT EXISTS idx_projects_status ON projects(status)",
"CREATE INDEX IF NOT EXISTS idx_projects_scenario ON projects(scenario)",
"CREATE INDEX IF NOT EXISTS idx_projects_created_at ON projects(created_at)",
"CREATE INDEX IF NOT EXISTS idx_todo_stages_status ON todo_stages(status)",
"CREATE INDEX IF NOT EXISTS idx_slide_data_slide_index ON slide_data(slide_index)",
"CREATE INDEX IF NOT EXISTS idx_project_versions_timestamp ON project_versions(timestamp)"
]
for index_sql in indexes:
await session.execute(text(index_sql))
await session.commit()
logger.info("Migration 002 completed successfully")
async def _migration_002_down(self, session: AsyncSession):
"""Remove performance indexes"""
logger.info("Rolling back migration 002: Removing performance indexes")
indexes = [
"DROP INDEX IF EXISTS idx_projects_status",
"DROP INDEX IF EXISTS idx_projects_scenario",
"DROP INDEX IF EXISTS idx_projects_created_at",
"DROP INDEX IF EXISTS idx_todo_stages_status",
"DROP INDEX IF EXISTS idx_slide_data_slide_index",
"DROP INDEX IF EXISTS idx_project_versions_timestamp"
]
for index_sql in indexes:
await session.execute(text(index_sql))
await session.commit()
logger.info("Migration 002 rollback completed")
async def _migration_003_up(self, session: AsyncSession):
"""Add project_id column to todo_stages table"""
logger.info("Running migration 003: Adding project_id to todo_stages")
try:
# Check if project_id column already exists
result = await session.execute(text("""
PRAGMA table_info(todo_stages)
"""))
columns = result.fetchall()
column_names = [col[1] for col in columns]
if 'project_id' not in column_names:
# Add project_id column to todo_stages table
await session.execute(text("""
ALTER TABLE todo_stages
ADD COLUMN project_id VARCHAR(36)
"""))
logger.info("Added project_id column to todo_stages")
else:
logger.info("project_id column already exists in todo_stages")
# Create index on project_id
await session.execute(text("""
CREATE INDEX IF NOT EXISTS idx_todo_stages_project_id
ON todo_stages(project_id)
"""))
# Create index on stage_id for better performance
await session.execute(text("""
CREATE INDEX IF NOT EXISTS idx_todo_stages_stage_id
ON todo_stages(stage_id)
"""))
# Populate project_id for existing records
await session.execute(text("""
UPDATE todo_stages
SET project_id = (
SELECT tb.project_id
FROM todo_boards tb
WHERE tb.id = todo_stages.todo_board_id
)
WHERE project_id IS NULL
"""))
await session.commit()
logger.info("Migration 003 completed successfully")
except Exception as e:
await session.rollback()
logger.error(f"Migration 003 failed: {e}")
raise
async def _migration_003_down(self, session: AsyncSession):
"""Remove project_id column from todo_stages table"""
logger.info("Rolling back migration 003: Removing project_id from todo_stages")
try:
# Drop indexes first
await session.execute(text("DROP INDEX IF EXISTS idx_todo_stages_project_id"))
await session.execute(text("DROP INDEX IF EXISTS idx_todo_stages_stage_id"))
# Remove project_id column (SQLite doesn't support DROP COLUMN directly)
# We need to recreate the table without the column
await session.execute(text("""
CREATE TABLE todo_stages_backup AS
SELECT id, todo_board_id, stage_id, stage_index, title, description,
status, progress, result, created_at, updated_at
FROM todo_stages
"""))
await session.execute(text("DROP TABLE todo_stages"))
await session.execute(text("""
CREATE TABLE todo_stages (
id INTEGER PRIMARY KEY,
todo_board_id INTEGER NOT NULL,
stage_id VARCHAR(100) NOT NULL,
stage_index INTEGER NOT NULL,
title VARCHAR(255) NOT NULL,
description TEXT NOT NULL,
status VARCHAR(50) DEFAULT 'pending',
progress FLOAT DEFAULT 0.0,
result JSON,
created_at FLOAT,
updated_at FLOAT,
FOREIGN KEY (todo_board_id) REFERENCES todo_boards(id)
)
"""))
await session.execute(text("""
INSERT INTO todo_stages
SELECT * FROM todo_stages_backup
"""))
await session.execute(text("DROP TABLE todo_stages_backup"))
await session.commit()
logger.info("Migration 003 rollback completed")
except Exception as e:
await session.rollback()
logger.error(f"Migration 003 rollback failed: {e}")
raise
async def _migration_004_up(self, session: AsyncSession):
"""Migration 004: Add PPT templates table and update slide_data table"""
try:
logger.info("Running migration 004: Adding PPT templates table")
# Create ppt_templates table
create_templates_table_sql = """
CREATE TABLE IF NOT EXISTS ppt_templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id VARCHAR(36) NOT NULL,
template_type VARCHAR(50) NOT NULL,
template_name VARCHAR(255) NOT NULL,
description TEXT,
html_template TEXT NOT NULL,
applicable_scenarios JSON,
style_config JSON,
usage_count INTEGER DEFAULT 0,
created_at FLOAT NOT NULL,
updated_at FLOAT NOT NULL,
FOREIGN KEY (project_id) REFERENCES projects (project_id)
)
"""
await session.execute(text(create_templates_table_sql))
# Create indexes for ppt_templates
await session.execute(text("""
CREATE INDEX IF NOT EXISTS idx_ppt_templates_project_id
ON ppt_templates (project_id)
"""))
await session.execute(text("""
CREATE INDEX IF NOT EXISTS idx_ppt_templates_type
ON ppt_templates (template_type)
"""))
# Add template_id column to slide_data table
try:
await session.execute(text("""
ALTER TABLE slide_data
ADD COLUMN template_id INTEGER REFERENCES ppt_templates(id)
"""))
except Exception as e:
# Column might already exist, check if it's a duplicate column error
if "duplicate column name" not in str(e).lower():
raise
logger.info("template_id column already exists in slide_data table")
await session.commit()
logger.info("Migration 004 completed successfully")
except Exception as e:
await session.rollback()
logger.error(f"Migration 004 failed: {e}")
raise
async def _migration_004_down(self, session: AsyncSession):
"""Migration 004 rollback: Remove PPT templates table and template_id column"""
try:
logger.info("Rolling back migration 004")
# Drop ppt_templates table
await session.execute(text("DROP TABLE IF EXISTS ppt_templates"))
# Remove template_id column from slide_data table
# SQLite doesn't support DROP COLUMN directly, so we need to recreate the table
await session.execute(text("""
CREATE TABLE slide_data_backup AS
SELECT id, project_id, slide_index, slide_id, title, content_type,
html_content, slide_metadata, created_at, updated_at
FROM slide_data
"""))
await session.execute(text("DROP TABLE slide_data"))
await session.execute(text("""
CREATE TABLE slide_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id VARCHAR(36) NOT NULL,
slide_index INTEGER NOT NULL,
slide_id VARCHAR(100) NOT NULL,
title VARCHAR(255) NOT NULL,
content_type VARCHAR(50) NOT NULL,
html_content TEXT NOT NULL,
slide_metadata JSON,
created_at FLOAT NOT NULL,
updated_at FLOAT NOT NULL,
FOREIGN KEY (project_id) REFERENCES projects (project_id)
)
"""))
await session.execute(text("""
INSERT INTO slide_data
SELECT * FROM slide_data_backup
"""))
await session.execute(text("DROP TABLE slide_data_backup"))
await session.commit()
logger.info("Migration 004 rollback completed")
except Exception as e:
await session.rollback()
logger.error(f"Migration 004 rollback failed: {e}")
raise
async def _migration_005_up(self, session: AsyncSession):
"""Migration 005: Add project_metadata column to projects table"""
try:
logger.info("Running migration 005: Adding project_metadata column to projects table")
# Check if project_metadata column already exists
result = await session.execute(text("""
PRAGMA table_info(projects)
"""))
columns = result.fetchall()
column_names = [col[1] for col in columns]
if 'project_metadata' not in column_names:
# Add project_metadata column to projects table
await session.execute(text("""
ALTER TABLE projects
ADD COLUMN project_metadata JSON
"""))
logger.info("Added project_metadata column to projects table")
else:
logger.info("project_metadata column already exists in projects table")
await session.commit()
logger.info("Migration 005 completed successfully")
except Exception as e:
await session.rollback()
logger.error(f"Migration 005 failed: {e}")
raise
async def _migration_005_down(self, session: AsyncSession):
"""Migration 005 rollback: Remove project_metadata column from projects table"""
try:
logger.info("Rolling back migration 005: Removing project_metadata column from projects table")
# SQLite doesn't support DROP COLUMN directly, so we need to recreate the table
await session.execute(text("""
CREATE TABLE projects_backup AS
SELECT id, project_id, title, scenario, topic, requirements, status,
outline, slides_html, slides_data, confirmed_requirements,
version, created_at, updated_at
FROM projects
"""))
await session.execute(text("DROP TABLE projects"))
await session.execute(text("""
CREATE TABLE projects (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id VARCHAR(36) UNIQUE NOT NULL,
title VARCHAR(255) NOT NULL,
scenario VARCHAR(100) NOT NULL,
topic VARCHAR(255) NOT NULL,
requirements TEXT,
status VARCHAR(50) DEFAULT 'draft',
outline JSON,
slides_html TEXT,
slides_data JSON,
confirmed_requirements JSON,
version INTEGER DEFAULT 1,
created_at FLOAT NOT NULL,
updated_at FLOAT NOT NULL
)
"""))
await session.execute(text("""
INSERT INTO projects
SELECT * FROM projects_backup
"""))
await session.execute(text("DROP TABLE projects_backup"))
# Recreate indexes
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_projects_status ON projects(status)"))
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_projects_scenario ON projects(scenario)"))
await session.execute(text("CREATE INDEX IF NOT EXISTS idx_projects_created_at ON projects(created_at)"))
await session.commit()
logger.info("Migration 005 rollback completed")
except Exception as e:
await session.rollback()
logger.error(f"Migration 005 rollback failed: {e}")
raise
async def _migration_006_up(self, session: AsyncSession):
"""Migration 006: Add is_user_edited field to slide_data table"""
try:
logger.info("Running migration 006: Adding is_user_edited field to slide_data table")
# Check if is_user_edited column already exists
result = await session.execute(text("""
PRAGMA table_info(slide_data)
"""))
columns = result.fetchall()
column_names = [col[1] for col in columns]
if 'is_user_edited' not in column_names:
# Add is_user_edited column to slide_data table
await session.execute(text("""
ALTER TABLE slide_data
ADD COLUMN is_user_edited BOOLEAN DEFAULT 0 NOT NULL
"""))
logger.info("Added is_user_edited column to slide_data table")
else:
logger.info("is_user_edited column already exists in slide_data table")
await session.commit()
logger.info("Migration 006 completed successfully")
except Exception as e:
await session.rollback()
logger.error(f"Migration 006 failed: {e}")
raise
async def _migration_006_down(self, session: AsyncSession):
"""Migration 006 rollback: Remove is_user_edited field from slide_data table"""
try:
logger.info("Rolling back migration 006: Removing is_user_edited field from slide_data table")
# SQLite doesn't support DROP COLUMN directly, so we need to recreate the table
await session.execute(text("""
CREATE TABLE slide_data_backup AS
SELECT id, project_id, slide_index, slide_id, title, content_type,
html_content, slide_metadata, template_id, created_at, updated_at
FROM slide_data
"""))
await session.execute(text("DROP TABLE slide_data"))
await session.execute(text("""
CREATE TABLE slide_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
project_id VARCHAR(36) NOT NULL,
slide_index INTEGER NOT NULL,
slide_id VARCHAR(100) NOT NULL,
title VARCHAR(255) NOT NULL,
content_type VARCHAR(50) NOT NULL,
html_content TEXT NOT NULL,
slide_metadata JSON,
template_id INTEGER REFERENCES ppt_templates(id),
created_at FLOAT NOT NULL,
updated_at FLOAT NOT NULL,
FOREIGN KEY (project_id) REFERENCES projects (project_id)
)
"""))
await session.execute(text("""
INSERT INTO slide_data
SELECT * FROM slide_data_backup
"""))
await session.execute(text("DROP TABLE slide_data_backup"))
await session.commit()
logger.info("Migration 006 rollback completed")
except Exception as e:
await session.rollback()
logger.error(f"Migration 006 rollback failed: {e}")
raise
async def _create_migration_table(self, session: AsyncSession):
"""Create migration tracking table"""
create_table_sql = """
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(10) PRIMARY KEY,
name VARCHAR(100) NOT NULL,
description TEXT,
applied_at FLOAT NOT NULL,
rollback_sql TEXT
)
"""
await session.execute(text(create_table_sql))
await session.commit()
async def _get_applied_migrations(self, session: AsyncSession) -> List[str]:
"""Get list of applied migration versions"""
try:
result = await session.execute(text("SELECT version FROM schema_migrations ORDER BY version"))
return [row[0] for row in result.fetchall()]
except Exception:
# Table doesn't exist yet
return []
async def _record_migration(self, session: AsyncSession, migration: Dict[str, Any]):
"""Record a migration as applied"""
insert_sql = """
INSERT INTO schema_migrations (version, name, description, applied_at)
VALUES (:version, :name, :description, :applied_at)
"""
await session.execute(text(insert_sql), {
"version": migration["version"],
"name": migration["name"],
"description": migration["description"],
"applied_at": time.time()
})
await session.commit()
async def _remove_migration_record(self, session: AsyncSession, version: str):
"""Remove migration record"""
delete_sql = "DELETE FROM schema_migrations WHERE version = :version"
await session.execute(text(delete_sql), {"version": version})
await session.commit()
async def migrate_up(self, target_version: str = None) -> bool:
"""Run migrations up to target version"""
try:
async with AsyncSessionLocal() as session:
# Create migration table if it doesn't exist
await self._create_migration_table(session)
# Get applied migrations
applied = await self._get_applied_migrations(session)
# Find migrations to apply
to_apply = []
for migration in self.migrations:
if migration["version"] not in applied:
to_apply.append(migration)
if target_version and migration["version"] == target_version:
break
if not to_apply:
logger.info("No migrations to apply")
return True
# Apply migrations
for migration in to_apply:
logger.info(f"Applying migration {migration['version']}: {migration['name']}")
try:
await migration["up"](session)
await self._record_migration(session, migration)
logger.info(f"Migration {migration['version']} applied successfully")
except Exception as e:
logger.error(f"Failed to apply migration {migration['version']}: {e}")
raise
logger.info("All migrations applied successfully")
return True
except Exception as e:
logger.error(f"Migration failed: {e}")
return False
async def migrate_down(self, target_version: str) -> bool:
"""Rollback migrations down to target version"""
try:
async with AsyncSessionLocal() as session:
# Get applied migrations
applied = await self._get_applied_migrations(session)
# Find migrations to rollback
to_rollback = []
for migration in reversed(self.migrations):
if migration["version"] in applied and migration["version"] > target_version:
to_rollback.append(migration)
if not to_rollback:
logger.info("No migrations to rollback")
return True
# Rollback migrations
for migration in to_rollback:
logger.info(f"Rolling back migration {migration['version']}: {migration['name']}")
try:
await migration["down"](session)
await self._remove_migration_record(session, migration["version"])
logger.info(f"Migration {migration['version']} rolled back successfully")
except Exception as e:
logger.error(f"Failed to rollback migration {migration['version']}: {e}")
raise
logger.info("Migrations rolled back successfully")
return True
except Exception as e:
logger.error(f"Migration rollback failed: {e}")
return False
async def get_migration_status(self) -> Dict[str, Any]:
"""Get current migration status"""
try:
async with AsyncSessionLocal() as session:
await self._create_migration_table(session)
applied = await self._get_applied_migrations(session)
status = {
"current_version": applied[-1] if applied else None,
"applied_migrations": applied,
"available_migrations": [m["version"] for m in self.migrations],
"pending_migrations": [m["version"] for m in self.migrations if m["version"] not in applied]
}
return status
except Exception as e:
logger.error(f"Failed to get migration status: {e}")
return {"error": str(e)}
# Global migration manager instance
migration_manager = DatabaseMigration()