diff --git a/src/landppt/services/progress_tracker.py b/src/landppt/services/progress_tracker.py new file mode 100644 index 0000000..ef54cf8 --- /dev/null +++ b/src/landppt/services/progress_tracker.py @@ -0,0 +1,189 @@ +""" +Progress Tracker for Speech Script Generation +""" + +import time +import threading +from typing import Dict, Any, Optional +from dataclasses import dataclass, asdict + + +@dataclass +class ProgressInfo: + """Progress information for speech script generation""" + task_id: str + project_id: str + total_slides: int + completed_slides: int + failed_slides: int + skipped_slides: int + current_slide: Optional[int] = None + current_slide_title: Optional[str] = None + status: str = "running" # running, completed, failed + message: str = "" + start_time: float = 0 + last_update: float = 0 + error_details: list = None + + def __post_init__(self): + if self.start_time == 0: + self.start_time = time.time() + self.last_update = time.time() + if self.error_details is None: + self.error_details = [] + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary for JSON serialization""" + data = asdict(self) + # Add computed properties + data['progress_percentage'] = self.progress_percentage + data['elapsed_time'] = self.elapsed_time + return data + + @property + def progress_percentage(self) -> float: + """Calculate progress percentage""" + if self.total_slides == 0: + return 0 + return (self.completed_slides / self.total_slides) * 100 + + @property + def elapsed_time(self) -> float: + """Get elapsed time in seconds""" + return time.time() - self.start_time + + +class ProgressTracker: + """Thread-safe progress tracker for speech script generation""" + + def __init__(self): + self._progress_data: Dict[str, ProgressInfo] = {} + self._lock = threading.Lock() + + def create_task(self, task_id: str, project_id: str, total_slides: int) -> ProgressInfo: + """Create a new progress tracking task""" + with self._lock: + progress = ProgressInfo( + task_id=task_id, + project_id=project_id, + total_slides=total_slides, + completed_slides=0, + failed_slides=0, + skipped_slides=0, + status="running", + message="开始生成演讲稿..." + ) + self._progress_data[task_id] = progress + return progress + + def update_progress(self, task_id: str, **kwargs) -> Optional[ProgressInfo]: + """Update progress for a task""" + with self._lock: + if task_id not in self._progress_data: + return None + + progress = self._progress_data[task_id] + + # Update fields + for key, value in kwargs.items(): + if hasattr(progress, key): + setattr(progress, key, value) + + progress.last_update = time.time() + return progress + + def get_progress(self, task_id: str) -> Optional[ProgressInfo]: + """Get progress for a task""" + with self._lock: + return self._progress_data.get(task_id) + + def complete_task(self, task_id: str, message: str = "生成完成") -> Optional[ProgressInfo]: + """Mark task as completed""" + return self.update_progress( + task_id, + status="completed", + message=message + ) + + def fail_task(self, task_id: str, error_message: str) -> Optional[ProgressInfo]: + """Mark task as failed""" + return self.update_progress( + task_id, + status="failed", + message=f"生成失败: {error_message}" + ) + + def add_slide_completed(self, task_id: str, slide_index: int, slide_title: str) -> Optional[ProgressInfo]: + """Mark a slide as completed""" + with self._lock: + if task_id not in self._progress_data: + return None + + progress = self._progress_data[task_id] + progress.completed_slides += 1 + progress.current_slide = slide_index + progress.current_slide_title = slide_title + progress.message = f"已完成第{slide_index + 1}页: {slide_title}" + progress.last_update = time.time() + + return progress + + def add_slide_failed(self, task_id: str, slide_index: int, slide_title: str, error: str) -> Optional[ProgressInfo]: + """Mark a slide as failed""" + with self._lock: + if task_id not in self._progress_data: + return None + + progress = self._progress_data[task_id] + progress.failed_slides += 1 + progress.current_slide = slide_index + progress.current_slide_title = slide_title + progress.message = f"第{slide_index + 1}页生成失败: {slide_title}" + progress.error_details.append({ + 'slide_index': slide_index, + 'slide_title': slide_title, + 'error': error + }) + progress.last_update = time.time() + + return progress + + def add_slide_skipped(self, task_id: str, slide_index: int, slide_title: str, reason: str) -> Optional[ProgressInfo]: + """Mark a slide as skipped""" + with self._lock: + if task_id not in self._progress_data: + return None + + progress = self._progress_data[task_id] + progress.skipped_slides += 1 + progress.current_slide = slide_index + progress.current_slide_title = slide_title + progress.message = f"第{slide_index + 1}页已跳过: {slide_title}" + progress.last_update = time.time() + + return progress + + def cleanup_old_tasks(self, max_age_seconds: int = 3600): + """Clean up old completed/failed tasks""" + current_time = time.time() + with self._lock: + to_remove = [] + for task_id, progress in self._progress_data.items(): + if (progress.status in ["completed", "failed"] and + current_time - progress.last_update > max_age_seconds): + to_remove.append(task_id) + + for task_id in to_remove: + del self._progress_data[task_id] + + def remove_task(self, task_id: str) -> bool: + """Remove a specific task""" + with self._lock: + if task_id in self._progress_data: + del self._progress_data[task_id] + return True + return False + + +# Global progress tracker instance +progress_tracker = ProgressTracker()