From 349b1696a637d6d0ba510c5ac95b0d64b07b268f Mon Sep 17 00:00:00 2001 From: 13315423919 <13315423919@qq.com> Date: Fri, 7 Nov 2025 09:05:28 +0800 Subject: [PATCH] Add File --- src/landppt/database/migrations.py | 655 +++++++++++++++++++++++++++++ 1 file changed, 655 insertions(+) create mode 100644 src/landppt/database/migrations.py diff --git a/src/landppt/database/migrations.py b/src/landppt/database/migrations.py new file mode 100644 index 0000000..a5dee3c --- /dev/null +++ b/src/landppt/database/migrations.py @@ -0,0 +1,655 @@ +""" +Database migration utilities for LandPPT +""" + +import os +import time +import logging +from typing import List, Dict, Any +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from .database import AsyncSessionLocal, async_engine +from .models import Base + +logger = logging.getLogger(__name__) + + +class DatabaseMigration: + """Database migration manager""" + + def __init__(self): + self.migrations = [] + self._register_migrations() + + def _register_migrations(self): + """Register all available migrations""" + # Migration 001: Initial schema + self.migrations.append({ + "version": "001", + "name": "initial_schema", + "description": "Create initial database schema", + "up": self._migration_001_up, + "down": self._migration_001_down + }) + + # Migration 002: Add indexes + self.migrations.append({ + "version": "002", + "name": "add_indexes", + "description": "Add performance indexes", + "up": self._migration_002_up, + "down": self._migration_002_down + }) + + # Migration 003: Add project_id to todo_stages + self.migrations.append({ + "version": "003", + "name": "add_project_id_to_todo_stages", + "description": "Add project_id column to todo_stages table for better indexing", + "up": self._migration_003_up, + "down": self._migration_003_down + }) + + # Migration 004: Add PPT templates table + self.migrations.append({ + "version": "004", + "name": "add_ppt_templates_table", + "description": "Add PPT templates table and update slide_data table", + "up": self._migration_004_up, + "down": self._migration_004_down + }) + + # Migration 005: Add project_metadata column to projects table + self.migrations.append({ + "version": "005", + "name": "add_project_metadata_to_projects", + "description": "Add project_metadata column to projects table for storing template selection and other metadata", + "up": self._migration_005_up, + "down": self._migration_005_down + }) + + # Migration 006: Add is_user_edited field to slide_data table + self.migrations.append({ + "version": "006", + "name": "add_is_user_edited_to_slide_data", + "description": "Add is_user_edited field to slide_data table to track user manual edits", + "up": self._migration_006_up, + "down": self._migration_006_down + }) + + async def _migration_001_up(self, session: AsyncSession): + """Create initial schema""" + logger.info("Running migration 001: Creating initial schema") + + # Create all tables + async with async_engine.begin() as conn: + await conn.run_sync(Base.metadata.create_all) + + logger.info("Migration 001 completed successfully") + + async def _migration_001_down(self, session: AsyncSession): + """Drop initial schema""" + logger.info("Rolling back migration 001: Dropping all tables") + + async with async_engine.begin() as conn: + await conn.run_sync(Base.metadata.drop_all) + + logger.info("Migration 001 rollback completed") + + async def _migration_002_up(self, session: AsyncSession): + """Add performance indexes""" + logger.info("Running migration 002: Adding performance indexes") + + indexes = [ + "CREATE INDEX IF NOT EXISTS idx_projects_status ON projects(status)", + "CREATE INDEX IF NOT EXISTS idx_projects_scenario ON projects(scenario)", + "CREATE INDEX IF NOT EXISTS idx_projects_created_at ON projects(created_at)", + "CREATE INDEX IF NOT EXISTS idx_todo_stages_status ON todo_stages(status)", + "CREATE INDEX IF NOT EXISTS idx_slide_data_slide_index ON slide_data(slide_index)", + "CREATE INDEX IF NOT EXISTS idx_project_versions_timestamp ON project_versions(timestamp)" + ] + + for index_sql in indexes: + await session.execute(text(index_sql)) + + await session.commit() + logger.info("Migration 002 completed successfully") + + async def _migration_002_down(self, session: AsyncSession): + """Remove performance indexes""" + logger.info("Rolling back migration 002: Removing performance indexes") + + indexes = [ + "DROP INDEX IF EXISTS idx_projects_status", + "DROP INDEX IF EXISTS idx_projects_scenario", + "DROP INDEX IF EXISTS idx_projects_created_at", + "DROP INDEX IF EXISTS idx_todo_stages_status", + "DROP INDEX IF EXISTS idx_slide_data_slide_index", + "DROP INDEX IF EXISTS idx_project_versions_timestamp" + ] + + for index_sql in indexes: + await session.execute(text(index_sql)) + + await session.commit() + logger.info("Migration 002 rollback completed") + + async def _migration_003_up(self, session: AsyncSession): + """Add project_id column to todo_stages table""" + logger.info("Running migration 003: Adding project_id to todo_stages") + + try: + # Check if project_id column already exists + result = await session.execute(text(""" + PRAGMA table_info(todo_stages) + """)) + columns = result.fetchall() + column_names = [col[1] for col in columns] + + if 'project_id' not in column_names: + # Add project_id column to todo_stages table + await session.execute(text(""" + ALTER TABLE todo_stages + ADD COLUMN project_id VARCHAR(36) + """)) + logger.info("Added project_id column to todo_stages") + else: + logger.info("project_id column already exists in todo_stages") + + # Create index on project_id + await session.execute(text(""" + CREATE INDEX IF NOT EXISTS idx_todo_stages_project_id + ON todo_stages(project_id) + """)) + + # Create index on stage_id for better performance + await session.execute(text(""" + CREATE INDEX IF NOT EXISTS idx_todo_stages_stage_id + ON todo_stages(stage_id) + """)) + + # Populate project_id for existing records + await session.execute(text(""" + UPDATE todo_stages + SET project_id = ( + SELECT tb.project_id + FROM todo_boards tb + WHERE tb.id = todo_stages.todo_board_id + ) + WHERE project_id IS NULL + """)) + + await session.commit() + logger.info("Migration 003 completed successfully") + + except Exception as e: + await session.rollback() + logger.error(f"Migration 003 failed: {e}") + raise + + async def _migration_003_down(self, session: AsyncSession): + """Remove project_id column from todo_stages table""" + logger.info("Rolling back migration 003: Removing project_id from todo_stages") + + try: + # Drop indexes first + await session.execute(text("DROP INDEX IF EXISTS idx_todo_stages_project_id")) + await session.execute(text("DROP INDEX IF EXISTS idx_todo_stages_stage_id")) + + # Remove project_id column (SQLite doesn't support DROP COLUMN directly) + # We need to recreate the table without the column + await session.execute(text(""" + CREATE TABLE todo_stages_backup AS + SELECT id, todo_board_id, stage_id, stage_index, title, description, + status, progress, result, created_at, updated_at + FROM todo_stages + """)) + + await session.execute(text("DROP TABLE todo_stages")) + + await session.execute(text(""" + CREATE TABLE todo_stages ( + id INTEGER PRIMARY KEY, + todo_board_id INTEGER NOT NULL, + stage_id VARCHAR(100) NOT NULL, + stage_index INTEGER NOT NULL, + title VARCHAR(255) NOT NULL, + description TEXT NOT NULL, + status VARCHAR(50) DEFAULT 'pending', + progress FLOAT DEFAULT 0.0, + result JSON, + created_at FLOAT, + updated_at FLOAT, + FOREIGN KEY (todo_board_id) REFERENCES todo_boards(id) + ) + """)) + + await session.execute(text(""" + INSERT INTO todo_stages + SELECT * FROM todo_stages_backup + """)) + + await session.execute(text("DROP TABLE todo_stages_backup")) + + await session.commit() + logger.info("Migration 003 rollback completed") + + except Exception as e: + await session.rollback() + logger.error(f"Migration 003 rollback failed: {e}") + raise + + async def _migration_004_up(self, session: AsyncSession): + """Migration 004: Add PPT templates table and update slide_data table""" + try: + logger.info("Running migration 004: Adding PPT templates table") + + # Create ppt_templates table + create_templates_table_sql = """ + CREATE TABLE IF NOT EXISTS ppt_templates ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + project_id VARCHAR(36) NOT NULL, + template_type VARCHAR(50) NOT NULL, + template_name VARCHAR(255) NOT NULL, + description TEXT, + html_template TEXT NOT NULL, + applicable_scenarios JSON, + style_config JSON, + usage_count INTEGER DEFAULT 0, + created_at FLOAT NOT NULL, + updated_at FLOAT NOT NULL, + FOREIGN KEY (project_id) REFERENCES projects (project_id) + ) + """ + + await session.execute(text(create_templates_table_sql)) + + # Create indexes for ppt_templates + await session.execute(text(""" + CREATE INDEX IF NOT EXISTS idx_ppt_templates_project_id + ON ppt_templates (project_id) + """)) + + await session.execute(text(""" + CREATE INDEX IF NOT EXISTS idx_ppt_templates_type + ON ppt_templates (template_type) + """)) + + # Add template_id column to slide_data table + try: + await session.execute(text(""" + ALTER TABLE slide_data + ADD COLUMN template_id INTEGER REFERENCES ppt_templates(id) + """)) + except Exception as e: + # Column might already exist, check if it's a duplicate column error + if "duplicate column name" not in str(e).lower(): + raise + logger.info("template_id column already exists in slide_data table") + + await session.commit() + logger.info("Migration 004 completed successfully") + + except Exception as e: + await session.rollback() + logger.error(f"Migration 004 failed: {e}") + raise + + async def _migration_004_down(self, session: AsyncSession): + """Migration 004 rollback: Remove PPT templates table and template_id column""" + try: + logger.info("Rolling back migration 004") + + # Drop ppt_templates table + await session.execute(text("DROP TABLE IF EXISTS ppt_templates")) + + # Remove template_id column from slide_data table + # SQLite doesn't support DROP COLUMN directly, so we need to recreate the table + await session.execute(text(""" + CREATE TABLE slide_data_backup AS + SELECT id, project_id, slide_index, slide_id, title, content_type, + html_content, slide_metadata, created_at, updated_at + FROM slide_data + """)) + + await session.execute(text("DROP TABLE slide_data")) + + await session.execute(text(""" + CREATE TABLE slide_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + project_id VARCHAR(36) NOT NULL, + slide_index INTEGER NOT NULL, + slide_id VARCHAR(100) NOT NULL, + title VARCHAR(255) NOT NULL, + content_type VARCHAR(50) NOT NULL, + html_content TEXT NOT NULL, + slide_metadata JSON, + created_at FLOAT NOT NULL, + updated_at FLOAT NOT NULL, + FOREIGN KEY (project_id) REFERENCES projects (project_id) + ) + """)) + + await session.execute(text(""" + INSERT INTO slide_data + SELECT * FROM slide_data_backup + """)) + + await session.execute(text("DROP TABLE slide_data_backup")) + + await session.commit() + logger.info("Migration 004 rollback completed") + + except Exception as e: + await session.rollback() + logger.error(f"Migration 004 rollback failed: {e}") + raise + + async def _migration_005_up(self, session: AsyncSession): + """Migration 005: Add project_metadata column to projects table""" + try: + logger.info("Running migration 005: Adding project_metadata column to projects table") + + # Check if project_metadata column already exists + result = await session.execute(text(""" + PRAGMA table_info(projects) + """)) + columns = result.fetchall() + column_names = [col[1] for col in columns] + + if 'project_metadata' not in column_names: + # Add project_metadata column to projects table + await session.execute(text(""" + ALTER TABLE projects + ADD COLUMN project_metadata JSON + """)) + logger.info("Added project_metadata column to projects table") + else: + logger.info("project_metadata column already exists in projects table") + + await session.commit() + logger.info("Migration 005 completed successfully") + + except Exception as e: + await session.rollback() + logger.error(f"Migration 005 failed: {e}") + raise + + async def _migration_005_down(self, session: AsyncSession): + """Migration 005 rollback: Remove project_metadata column from projects table""" + try: + logger.info("Rolling back migration 005: Removing project_metadata column from projects table") + + # SQLite doesn't support DROP COLUMN directly, so we need to recreate the table + await session.execute(text(""" + CREATE TABLE projects_backup AS + SELECT id, project_id, title, scenario, topic, requirements, status, + outline, slides_html, slides_data, confirmed_requirements, + version, created_at, updated_at + FROM projects + """)) + + await session.execute(text("DROP TABLE projects")) + + await session.execute(text(""" + CREATE TABLE projects ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + project_id VARCHAR(36) UNIQUE NOT NULL, + title VARCHAR(255) NOT NULL, + scenario VARCHAR(100) NOT NULL, + topic VARCHAR(255) NOT NULL, + requirements TEXT, + status VARCHAR(50) DEFAULT 'draft', + outline JSON, + slides_html TEXT, + slides_data JSON, + confirmed_requirements JSON, + version INTEGER DEFAULT 1, + created_at FLOAT NOT NULL, + updated_at FLOAT NOT NULL + ) + """)) + + await session.execute(text(""" + INSERT INTO projects + SELECT * FROM projects_backup + """)) + + await session.execute(text("DROP TABLE projects_backup")) + + # Recreate indexes + await session.execute(text("CREATE INDEX IF NOT EXISTS idx_projects_status ON projects(status)")) + await session.execute(text("CREATE INDEX IF NOT EXISTS idx_projects_scenario ON projects(scenario)")) + await session.execute(text("CREATE INDEX IF NOT EXISTS idx_projects_created_at ON projects(created_at)")) + + await session.commit() + logger.info("Migration 005 rollback completed") + + except Exception as e: + await session.rollback() + logger.error(f"Migration 005 rollback failed: {e}") + raise + + async def _migration_006_up(self, session: AsyncSession): + """Migration 006: Add is_user_edited field to slide_data table""" + try: + logger.info("Running migration 006: Adding is_user_edited field to slide_data table") + + # Check if is_user_edited column already exists + result = await session.execute(text(""" + PRAGMA table_info(slide_data) + """)) + columns = result.fetchall() + column_names = [col[1] for col in columns] + + if 'is_user_edited' not in column_names: + # Add is_user_edited column to slide_data table + await session.execute(text(""" + ALTER TABLE slide_data + ADD COLUMN is_user_edited BOOLEAN DEFAULT 0 NOT NULL + """)) + logger.info("Added is_user_edited column to slide_data table") + else: + logger.info("is_user_edited column already exists in slide_data table") + + await session.commit() + logger.info("Migration 006 completed successfully") + + except Exception as e: + await session.rollback() + logger.error(f"Migration 006 failed: {e}") + raise + + async def _migration_006_down(self, session: AsyncSession): + """Migration 006 rollback: Remove is_user_edited field from slide_data table""" + try: + logger.info("Rolling back migration 006: Removing is_user_edited field from slide_data table") + + # SQLite doesn't support DROP COLUMN directly, so we need to recreate the table + await session.execute(text(""" + CREATE TABLE slide_data_backup AS + SELECT id, project_id, slide_index, slide_id, title, content_type, + html_content, slide_metadata, template_id, created_at, updated_at + FROM slide_data + """)) + + await session.execute(text("DROP TABLE slide_data")) + + await session.execute(text(""" + CREATE TABLE slide_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + project_id VARCHAR(36) NOT NULL, + slide_index INTEGER NOT NULL, + slide_id VARCHAR(100) NOT NULL, + title VARCHAR(255) NOT NULL, + content_type VARCHAR(50) NOT NULL, + html_content TEXT NOT NULL, + slide_metadata JSON, + template_id INTEGER REFERENCES ppt_templates(id), + created_at FLOAT NOT NULL, + updated_at FLOAT NOT NULL, + FOREIGN KEY (project_id) REFERENCES projects (project_id) + ) + """)) + + await session.execute(text(""" + INSERT INTO slide_data + SELECT * FROM slide_data_backup + """)) + + await session.execute(text("DROP TABLE slide_data_backup")) + + await session.commit() + logger.info("Migration 006 rollback completed") + + except Exception as e: + await session.rollback() + logger.error(f"Migration 006 rollback failed: {e}") + raise + + async def _create_migration_table(self, session: AsyncSession): + """Create migration tracking table""" + create_table_sql = """ + CREATE TABLE IF NOT EXISTS schema_migrations ( + version VARCHAR(10) PRIMARY KEY, + name VARCHAR(100) NOT NULL, + description TEXT, + applied_at FLOAT NOT NULL, + rollback_sql TEXT + ) + """ + + await session.execute(text(create_table_sql)) + await session.commit() + + async def _get_applied_migrations(self, session: AsyncSession) -> List[str]: + """Get list of applied migration versions""" + try: + result = await session.execute(text("SELECT version FROM schema_migrations ORDER BY version")) + return [row[0] for row in result.fetchall()] + except Exception: + # Table doesn't exist yet + return [] + + async def _record_migration(self, session: AsyncSession, migration: Dict[str, Any]): + """Record a migration as applied""" + insert_sql = """ + INSERT INTO schema_migrations (version, name, description, applied_at) + VALUES (:version, :name, :description, :applied_at) + """ + + await session.execute(text(insert_sql), { + "version": migration["version"], + "name": migration["name"], + "description": migration["description"], + "applied_at": time.time() + }) + await session.commit() + + async def _remove_migration_record(self, session: AsyncSession, version: str): + """Remove migration record""" + delete_sql = "DELETE FROM schema_migrations WHERE version = :version" + await session.execute(text(delete_sql), {"version": version}) + await session.commit() + + async def migrate_up(self, target_version: str = None) -> bool: + """Run migrations up to target version""" + try: + async with AsyncSessionLocal() as session: + # Create migration table if it doesn't exist + await self._create_migration_table(session) + + # Get applied migrations + applied = await self._get_applied_migrations(session) + + # Find migrations to apply + to_apply = [] + for migration in self.migrations: + if migration["version"] not in applied: + to_apply.append(migration) + if target_version and migration["version"] == target_version: + break + + if not to_apply: + logger.info("No migrations to apply") + return True + + # Apply migrations + for migration in to_apply: + logger.info(f"Applying migration {migration['version']}: {migration['name']}") + + try: + await migration["up"](session) + await self._record_migration(session, migration) + logger.info(f"Migration {migration['version']} applied successfully") + except Exception as e: + logger.error(f"Failed to apply migration {migration['version']}: {e}") + raise + + logger.info("All migrations applied successfully") + return True + + except Exception as e: + logger.error(f"Migration failed: {e}") + return False + + async def migrate_down(self, target_version: str) -> bool: + """Rollback migrations down to target version""" + try: + async with AsyncSessionLocal() as session: + # Get applied migrations + applied = await self._get_applied_migrations(session) + + # Find migrations to rollback + to_rollback = [] + for migration in reversed(self.migrations): + if migration["version"] in applied and migration["version"] > target_version: + to_rollback.append(migration) + + if not to_rollback: + logger.info("No migrations to rollback") + return True + + # Rollback migrations + for migration in to_rollback: + logger.info(f"Rolling back migration {migration['version']}: {migration['name']}") + + try: + await migration["down"](session) + await self._remove_migration_record(session, migration["version"]) + logger.info(f"Migration {migration['version']} rolled back successfully") + except Exception as e: + logger.error(f"Failed to rollback migration {migration['version']}: {e}") + raise + + logger.info("Migrations rolled back successfully") + return True + + except Exception as e: + logger.error(f"Migration rollback failed: {e}") + return False + + async def get_migration_status(self) -> Dict[str, Any]: + """Get current migration status""" + try: + async with AsyncSessionLocal() as session: + await self._create_migration_table(session) + applied = await self._get_applied_migrations(session) + + status = { + "current_version": applied[-1] if applied else None, + "applied_migrations": applied, + "available_migrations": [m["version"] for m in self.migrations], + "pending_migrations": [m["version"] for m in self.migrations if m["version"] not in applied] + } + + return status + + except Exception as e: + logger.error(f"Failed to get migration status: {e}") + return {"error": str(e)} + + +# Global migration manager instance +migration_manager = DatabaseMigration()