This commit is contained in:
2025-11-07 09:05:21 +08:00
parent 11d4d33e0b
commit 603e0e4701

View File

@@ -0,0 +1,792 @@
"""
Speech Script Generation Service
Provides AI-powered speech script generation for PPT presentations
"""
import logging
import time
import uuid
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
from ..ai.base import AIMessage, MessageRole
from ..ai.providers import get_ai_provider, get_role_provider
from ..core.config import ai_config
from ..api.models import PPTProject
from .progress_tracker import progress_tracker
logger = logging.getLogger(__name__)
class SpeechTone(str, Enum):
"""Speech tone options"""
FORMAL = "formal"
CASUAL = "casual"
PERSUASIVE = "persuasive"
EDUCATIONAL = "educational"
CONVERSATIONAL = "conversational"
AUTHORITATIVE = "authoritative"
STORYTELLING = "storytelling"
class TargetAudience(str, Enum):
"""Target audience options"""
EXECUTIVES = "executives"
STUDENTS = "students"
GENERAL_PUBLIC = "general_public"
TECHNICAL_EXPERTS = "technical_experts"
COLLEAGUES = "colleagues"
CLIENTS = "clients"
INVESTORS = "investors"
class LanguageComplexity(str, Enum):
"""Language complexity levels"""
SIMPLE = "simple"
MODERATE = "moderate"
ADVANCED = "advanced"
@dataclass
class SpeechScriptCustomization:
"""Speech script customization options"""
tone: SpeechTone = SpeechTone.CONVERSATIONAL
target_audience: TargetAudience = TargetAudience.GENERAL_PUBLIC
language_complexity: LanguageComplexity = LanguageComplexity.MODERATE
custom_style_prompt: Optional[str] = None
include_transitions: bool = True
include_timing_notes: bool = False
speaking_pace: str = "normal" # slow, normal, fast
@dataclass
class SlideScriptData:
"""Data for a single slide's speech script"""
slide_index: int
slide_title: str
script_content: str
estimated_duration: Optional[str] = None
speaker_notes: Optional[str] = None
@dataclass
class SpeechScriptResult:
"""Result of speech script generation"""
success: bool
scripts: List[SlideScriptData]
total_estimated_duration: Optional[str] = None
generation_metadata: Optional[Dict[str, Any]] = None
error_message: Optional[str] = None
class SpeechScriptService:
"""Service for generating AI-powered speech scripts for presentations"""
def __init__(self):
self.ai_provider = None
self.provider_settings: Optional[Dict[str, Optional[str]]] = None
self._initialize_ai_provider()
def _initialize_ai_provider(self):
"""Initialize AI provider"""
try:
provider, settings = get_role_provider("speech_script")
self.ai_provider = provider
self.provider_settings = settings
except Exception as e:
logger.warning(f"Failed to load provider for speech script role, fallback to default provider: {e}")
try:
self.ai_provider = get_ai_provider()
self.provider_settings = {
"provider": ai_config.default_ai_provider,
"model": ai_config.get_provider_config().get("model")
}
except Exception as fallback_error:
logger.error(f"Failed to initialize AI provider: {fallback_error}")
self.ai_provider = None
self.provider_settings = None
async def generate_single_slide_script(
self,
project: PPTProject,
slide_index: int,
customization: SpeechScriptCustomization
) -> SpeechScriptResult:
"""Generate speech script for a single slide"""
try:
if not self.ai_provider:
return SpeechScriptResult(
success=False,
scripts=[],
error_message="AI provider not available"
)
if not project.slides_data or slide_index >= len(project.slides_data):
return SpeechScriptResult(
success=False,
scripts=[],
error_message="Invalid slide index"
)
slide = project.slides_data[slide_index]
# Get context from previous slide if available
previous_slide_context = ""
if slide_index > 0:
prev_slide = project.slides_data[slide_index - 1]
previous_slide_context = self._extract_slide_context(prev_slide)
# Generate script
script_content = await self._generate_script_for_slide(
slide, slide_index, len(project.slides_data),
project, previous_slide_context, customization
)
# Estimate duration
estimated_duration = self._estimate_speaking_duration(script_content)
slide_script = SlideScriptData(
slide_index=slide_index,
slide_title=slide.get('title', f'{slide_index + 1}'),
script_content=script_content,
estimated_duration=estimated_duration
)
return SpeechScriptResult(
success=True,
scripts=[slide_script],
total_estimated_duration=estimated_duration,
generation_metadata={
"generation_time": time.time(),
"customization": customization.__dict__
}
)
except Exception as e:
logger.error(f"Error generating single slide script: {e}")
return SpeechScriptResult(
success=False,
scripts=[],
error_message=str(e)
)
async def generate_multi_slide_scripts(
self,
project: PPTProject,
slide_indices: List[int],
customization: SpeechScriptCustomization
) -> SpeechScriptResult:
"""Generate speech scripts for multiple slides"""
try:
if not self.ai_provider:
return SpeechScriptResult(
success=False,
scripts=[],
error_message="AI provider not available"
)
if not project.slides_data:
return SpeechScriptResult(
success=False,
scripts=[],
error_message="No slides data available"
)
scripts = []
total_duration_seconds = 0
for i, slide_index in enumerate(slide_indices):
if slide_index >= len(project.slides_data):
continue
slide = project.slides_data[slide_index]
# Get context from previous slide in the sequence
previous_slide_context = ""
if i > 0:
prev_index = slide_indices[i - 1]
if prev_index < len(project.slides_data):
prev_slide = project.slides_data[prev_index]
previous_slide_context = self._extract_slide_context(prev_slide)
elif slide_index > 0:
# Use actual previous slide if this is the first in selection
prev_slide = project.slides_data[slide_index - 1]
previous_slide_context = self._extract_slide_context(prev_slide)
# Generate script
script_content = await self._generate_script_for_slide(
slide, slide_index, len(project.slides_data),
project, previous_slide_context, customization
)
# Estimate duration
estimated_duration = self._estimate_speaking_duration(script_content)
duration_seconds = self._parse_duration_to_seconds(estimated_duration)
total_duration_seconds += duration_seconds
slide_script = SlideScriptData(
slide_index=slide_index,
slide_title=slide.get('title', f'{slide_index + 1}'),
script_content=script_content,
estimated_duration=estimated_duration
)
scripts.append(slide_script)
total_duration = self._format_duration_from_seconds(total_duration_seconds)
return SpeechScriptResult(
success=True,
scripts=scripts,
total_estimated_duration=total_duration,
generation_metadata={
"generation_time": time.time(),
"customization": customization.__dict__,
"slide_count": len(scripts)
}
)
except Exception as e:
logger.error(f"Error generating multi-slide scripts: {e}")
return SpeechScriptResult(
success=False,
scripts=[],
error_message=str(e)
)
async def generate_full_presentation_scripts(
self,
project: PPTProject,
customization: SpeechScriptCustomization,
progress_callback=None,
task_id: str = None
) -> SpeechScriptResult:
"""Generate speech scripts for the entire presentation with retry mechanism"""
try:
if not project.slides_data:
return SpeechScriptResult(
success=False,
scripts=[],
error_message="No slides data available"
)
# Generate scripts for all slides with retry mechanism
slide_indices = list(range(len(project.slides_data)))
result = await self.generate_multi_slide_scripts_with_retry(
project, slide_indices, customization, progress_callback, task_id=task_id
)
# 不再自动添加开场白和结束语,完全按照选择的页面生成演讲稿
# Opening and closing remarks are no longer automatically added
return result
except Exception as e:
logger.error(f"Error generating full presentation scripts: {e}")
return SpeechScriptResult(
success=False,
scripts=[],
error_message=str(e)
)
async def generate_multi_slide_scripts_with_retry(
self,
project: PPTProject,
slide_indices: List[int],
customization: SpeechScriptCustomization,
progress_callback=None,
max_retries: int = 5,
task_id: str = None
) -> SpeechScriptResult:
"""Generate speech scripts for multiple slides with retry mechanism"""
try:
if not project.slides_data:
return SpeechScriptResult(
success=False,
scripts=[],
error_message="No slides data available"
)
total_slides = len(slide_indices)
successful_scripts = []
failed_slides = []
skipped_slides = []
# Create progress tracking task
if not task_id:
task_id = str(uuid.uuid4())
progress_info = progress_tracker.create_task(
task_id=task_id,
project_id=project.project_id,
total_slides=total_slides
)
# Track progress
completed_count = 0
for i, slide_index in enumerate(slide_indices):
if slide_index >= len(project.slides_data):
failed_slides.append({
'slide_index': slide_index,
'error': 'Slide index out of range'
})
continue
slide = project.slides_data[slide_index]
slide_title = slide.get('title', f'{slide_index + 1}')
# Update progress
progress_tracker.update_progress(
task_id,
current_slide=slide_index,
current_slide_title=slide_title,
message=f'正在生成第{slide_index + 1}页演讲稿...'
)
if progress_callback:
progress_callback({
'type': 'progress',
'current_slide': slide_index + 1,
'total_slides': total_slides,
'completed': completed_count,
'failed': len(failed_slides),
'skipped': len(skipped_slides),
'message': f'正在生成第{slide_index + 1}页演讲稿...'
})
# Try to generate script with retries
script_generated = False
last_error = None
for retry_count in range(max_retries):
try:
# Get previous slide context for better coherence
previous_slide_context = ""
if slide_index > 0 and successful_scripts:
# Find the most recent successful script before this slide
for prev_script in reversed(successful_scripts):
if prev_script.slide_index < slide_index:
prev_content = prev_script.script_content
if len(prev_content) > 200:
previous_slide_context = prev_content[-200:]
else:
previous_slide_context = prev_content
break
# Generate script
script_content = await self._generate_script_for_slide(
slide, slide_index, len(project.slides_data),
project, previous_slide_context, customization
)
# Estimate duration
estimated_duration = self._estimate_speaking_duration(script_content)
slide_script = SlideScriptData(
slide_index=slide_index,
slide_title=slide_title,
script_content=script_content,
estimated_duration=estimated_duration
)
successful_scripts.append(slide_script)
script_generated = True
completed_count += 1
# Update progress tracker
progress_tracker.add_slide_completed(task_id, slide_index, slide_title)
# Update progress callback
if progress_callback:
progress_callback({
'type': 'slide_completed',
'slide_index': slide_index,
'slide_title': slide_title,
'completed': completed_count,
'total_slides': total_slides
})
break # Success, exit retry loop
except Exception as e:
last_error = str(e)
logger.warning(f"Retry {retry_count + 1}/{max_retries} failed for slide {slide_index + 1}: {e}")
# Update progress with retry info
if progress_callback:
progress_callback({
'type': 'retry',
'slide_index': slide_index,
'retry_count': retry_count + 1,
'max_retries': max_retries,
'error': str(e)
})
# Wait a bit before retrying
import asyncio
await asyncio.sleep(1)
# If all retries failed, mark as failed or skipped
if not script_generated:
if max_retries > 0:
failed_slides.append({
'slide_index': slide_index,
'slide_title': slide_title,
'error': last_error or 'Unknown error'
})
# Update progress tracker
progress_tracker.add_slide_failed(task_id, slide_index, slide_title, last_error or 'Unknown error')
# Update progress callback
if progress_callback:
progress_callback({
'type': 'slide_failed',
'slide_index': slide_index,
'slide_title': slide_title,
'error': last_error
})
else:
skipped_slides.append({
'slide_index': slide_index,
'slide_title': slide_title,
'reason': 'Max retries exceeded'
})
# Update progress tracker
progress_tracker.add_slide_skipped(task_id, slide_index, slide_title, 'Max retries exceeded')
# Calculate total duration
total_duration = self._calculate_total_duration([s.estimated_duration for s in successful_scripts])
# Determine overall success
success = len(successful_scripts) > 0
error_message = None
if len(failed_slides) > 0 or len(skipped_slides) > 0:
error_parts = []
if failed_slides:
error_parts.append(f"{len(failed_slides)}页生成失败")
if skipped_slides:
error_parts.append(f"{len(skipped_slides)}页被跳过")
error_message = "部分页面生成失败: " + ", ".join(error_parts)
# Final progress update - DO NOT mark as completed here
# The task will be marked as completed in routes.py after database save
if not success:
progress_tracker.fail_task(task_id, error_message or "生成失败")
if progress_callback:
progress_callback({
'type': 'completed',
'successful': len(successful_scripts),
'failed': len(failed_slides),
'skipped': len(skipped_slides),
'total': total_slides
})
return SpeechScriptResult(
success=success,
scripts=successful_scripts,
total_estimated_duration=total_duration,
error_message=error_message,
generation_metadata={
"generation_time": time.time(),
"customization": customization.__dict__,
"successful_slides": len(successful_scripts),
"failed_slides": len(failed_slides),
"skipped_slides": len(skipped_slides),
"failed_details": failed_slides,
"skipped_details": skipped_slides
}
)
except Exception as e:
logger.error(f"Error in generate_multi_slide_scripts_with_retry: {e}")
return SpeechScriptResult(
success=False,
scripts=[],
error_message=str(e)
)
async def _generate_script_for_slide(
self,
slide: Dict[str, Any],
slide_index: int,
total_slides: int,
project: PPTProject,
previous_slide_context: str,
customization: SpeechScriptCustomization
) -> str:
"""Generate speech script for a single slide using AI"""
# Create the prompt for speech script generation
prompt = self._create_speech_script_prompt(
slide, slide_index, total_slides, project,
previous_slide_context, customization
)
# Generate using AI
response = await self.ai_provider.text_completion(
prompt=prompt,
**self._build_request_kwargs(
max_tokens=ai_config.max_tokens,
temperature=0.7
)
)
return response.content.strip()
def _create_speech_script_prompt(
self,
slide: Dict[str, Any],
slide_index: int,
total_slides: int,
project: PPTProject,
previous_slide_context: str,
customization: SpeechScriptCustomization
) -> str:
"""Create AI prompt for speech script generation"""
from .prompts.speech_script_prompts import SpeechScriptPrompts
project_info = {
'topic': project.topic,
'scenario': project.scenario
}
customization_dict = {
'tone': customization.tone.value,
'target_audience': customization.target_audience.value,
'language_complexity': customization.language_complexity.value,
'custom_style_prompt': customization.custom_style_prompt,
'include_transitions': customization.include_transitions,
'speaking_pace': customization.speaking_pace
}
return SpeechScriptPrompts.get_single_slide_script_prompt(
slide, slide_index, total_slides, project_info,
previous_slide_context, customization_dict
)
def _get_tone_description(self, tone: SpeechTone) -> str:
"""Get description for speech tone"""
descriptions = {
SpeechTone.FORMAL: "正式、严谨、专业的商务语调",
SpeechTone.CASUAL: "轻松、自然、亲切的日常语调",
SpeechTone.PERSUASIVE: "有说服力、激励性的语调",
SpeechTone.EDUCATIONAL: "教学式、解释性的语调",
SpeechTone.CONVERSATIONAL: "对话式、互动性的语调",
SpeechTone.AUTHORITATIVE: "权威、自信、专家式的语调",
SpeechTone.STORYTELLING: "叙事性、生动有趣的语调"
}
return descriptions.get(tone, "自然流畅的语调")
def _get_audience_description(self, audience: TargetAudience) -> str:
"""Get description for target audience"""
descriptions = {
TargetAudience.EXECUTIVES: "企业高管和决策者,注重效率和结果",
TargetAudience.STUDENTS: "学生群体,需要清晰的解释和引导",
TargetAudience.GENERAL_PUBLIC: "普通大众,使用通俗易懂的语言",
TargetAudience.TECHNICAL_EXPERTS: "技术专家,可以使用专业术语",
TargetAudience.COLLEAGUES: "同事和合作伙伴,平等交流的语调",
TargetAudience.CLIENTS: "客户群体,注重价值和利益",
TargetAudience.INVESTORS: "投资者,关注商业价值和回报"
}
return descriptions.get(audience, "一般听众")
def _get_complexity_description(self, complexity: LanguageComplexity) -> str:
"""Get description for language complexity"""
descriptions = {
LanguageComplexity.SIMPLE: "简单易懂,避免复杂词汇和长句",
LanguageComplexity.MODERATE: "适中复杂度,平衡专业性和可理解性",
LanguageComplexity.ADVANCED: "较高复杂度,可以使用专业术语和复杂概念"
}
return descriptions.get(complexity, "适中复杂度")
def _extract_slide_context(self, slide: Dict[str, Any]) -> str:
"""Extract context summary from a slide"""
title = slide.get('title', '')
content = slide.get('html_content', '')
# Extract text content from HTML
import re
text_content = re.sub(r'<[^>]+>', '', content)
text_content = re.sub(r'\s+', ' ', text_content).strip()
# Create a brief summary
if len(text_content) > 200:
text_content = text_content[:200] + "..."
return f"{title}: {text_content}"
def _estimate_speaking_duration(self, script_content: str) -> str:
"""Estimate speaking duration based on script length"""
# Average speaking rate: 150-160 words per minute in Chinese
# For Chinese text, we estimate by character count
char_count = len(script_content)
# Rough estimation: 300-400 characters per minute for Chinese
minutes = char_count / 350
if minutes < 1:
seconds = int(minutes * 60)
return f"{seconds}"
else:
return f"{minutes:.1f}分钟"
def _calculate_total_duration(self, durations: List[str]) -> str:
"""Calculate total duration from a list of duration strings"""
total_seconds = 0
for duration in durations:
if duration:
total_seconds += self._parse_duration_to_seconds(duration)
# Convert back to readable format
if total_seconds < 60:
return f"{total_seconds}"
else:
minutes = total_seconds / 60
return f"{minutes:.1f}分钟"
def _parse_duration_to_seconds(self, duration_str: str) -> int:
"""Parse duration string to seconds"""
import re
# Extract numbers and units
minutes_match = re.search(r'(\d+(?:\.\d+)?)分钟', duration_str)
seconds_match = re.search(r'(\d+)秒', duration_str)
total_seconds = 0
if minutes_match:
minutes = float(minutes_match.group(1))
total_seconds += int(minutes * 60)
if seconds_match:
seconds = int(seconds_match.group(1))
total_seconds += seconds
return total_seconds
def _format_duration_from_seconds(self, total_seconds: int) -> str:
"""Format duration from seconds to readable string"""
if total_seconds < 60:
return f"{total_seconds}"
else:
minutes = total_seconds // 60
seconds = total_seconds % 60
if seconds > 0:
return f"{minutes}分钟{seconds}"
else:
return f"{minutes}分钟"
async def _add_presentation_bookends(
self,
scripts: List[SlideScriptData],
project: PPTProject,
customization: SpeechScriptCustomization
) -> List[SlideScriptData]:
"""Add opening and closing remarks for full presentation"""
try:
# Generate opening remarks
opening_script = await self._generate_opening_remarks(project, customization)
opening_slide = SlideScriptData(
slide_index=-1, # Special index for opening
slide_title="开场白",
script_content=opening_script,
estimated_duration=self._estimate_speaking_duration(opening_script)
)
# Generate closing remarks
closing_script = await self._generate_closing_remarks(project, customization)
closing_slide = SlideScriptData(
slide_index=len(scripts), # Special index for closing
slide_title="结束语",
script_content=closing_script,
estimated_duration=self._estimate_speaking_duration(closing_script)
)
# Insert opening at the beginning and closing at the end
return [opening_slide] + scripts + [closing_slide]
except Exception as e:
logger.error(f"Error adding presentation bookends: {e}")
return scripts
async def _generate_opening_remarks(
self,
project: PPTProject,
customization: SpeechScriptCustomization
) -> str:
"""Generate opening remarks for the presentation"""
from .prompts.speech_script_prompts import SpeechScriptPrompts
project_info = {
'topic': project.topic,
'scenario': project.scenario
}
customization_dict = {
'tone': customization.tone.value,
'target_audience': customization.target_audience.value,
'language_complexity': customization.language_complexity.value
}
prompt = SpeechScriptPrompts.get_opening_remarks_prompt(
project_info, customization_dict
)
response = await self.ai_provider.text_completion(
prompt=prompt,
**self._build_request_kwargs(
max_tokens=ai_config.max_tokens // 2,
temperature=0.7
)
)
return response.content.strip()
async def _generate_closing_remarks(
self,
project: PPTProject,
customization: SpeechScriptCustomization
) -> str:
"""Generate closing remarks for the presentation"""
from .prompts.speech_script_prompts import SpeechScriptPrompts
project_info = {
'topic': project.topic,
'scenario': project.scenario
}
customization_dict = {
'tone': customization.tone.value,
'target_audience': customization.target_audience.value,
'language_complexity': customization.language_complexity.value
}
prompt = SpeechScriptPrompts.get_closing_remarks_prompt(
project_info, customization_dict
)
response = await self.ai_provider.text_completion(
prompt=prompt,
**self._build_request_kwargs(
max_tokens=ai_config.max_tokens // 2,
temperature=0.7
)
)
def _build_request_kwargs(self, **kwargs) -> Dict[str, Any]:
"""Merge base kwargs with role-specific model override if configured."""
if self.provider_settings and self.provider_settings.get("model"):
kwargs.setdefault("model", self.provider_settings["model"])
return kwargs
return response.content.strip()