# edge_tts_server.py # stream_server.py import asyncio import json from fastapi import FastAPI, HTTPException, Response, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel, Field, validator import edge_tts import io import logging from typing import List, Optional from datetime import datetime import re from functools import lru_cache # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = FastAPI( title="Edge TTS API Service", description="用于Java服务代理的TTS服务", version="1.0.0" ) # CORS配置 app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class TTSRequest(BaseModel): text: str = Field(..., min_length=1, max_length=5000) voice: str = Field(default="zh-CN-XiaoxiaoNeural") rate: str = Field(default="+0%") volume: str = Field(default="+0%") @validator('rate', 'volume') def validate_percentage(cls, v): if not re.match(r'^[+-]\d+%$', v): raise ValueError('格式应为 +10% 或 -20% 等') return v @validator('text') def validate_text_length(cls, v): if len(v) > 5000: raise ValueError('文本长度不能超过5000字符') return v class VoiceInfo(BaseModel): name: str locale: str gender: str displayName: Optional[str] = None class HealthResponse(BaseModel): status: str service: str timestamp: str voices_count: Optional[int] = None @app.get("/") async def root(): """服务根目录""" return { "service": "Edge TTS API", "version": "1.0.0", "endpoints": { "synthesize": "POST /stream-synthesize", "voices": "GET /voices", "health": "GET /health" } } @app.post("/stream-synthesize") async def stream_synthesize(request: TTSRequest): """流式合成语音 - 主接口""" try: logger.info(f"合成请求: voice={request.voice}, text_length={len(request.text)}") # 创建内存流 audio_stream = io.BytesIO() # 使用edge-tts生成语音 communicate = edge_tts.Communicate( text=request.text, voice=request.voice, rate=request.rate, volume=request.volume ) # 流式写入音频数据 async for chunk in communicate.stream(): if chunk["type"] == "audio": audio_stream.write(chunk["data"]) # 获取音频数据 audio_data = audio_stream.getvalue() if len(audio_data) == 0: raise HTTPException(status_code=500, detail="生成音频为空") logger.info(f"合成完成: {len(audio_data)} bytes") # 返回音频流响应 return Response( content=audio_data, media_type="audio/mpeg", headers={ "Content-Disposition": "inline; filename=speech.mp3", "Content-Length": str(len(audio_data)), "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0", "X-TTS-Status": "success", "X-TTS-Voice": request.voice, "X-TTS-Size": str(len(audio_data)) } ) except HTTPException: raise except Exception as e: logger.error(f"合成失败: {str(e)}") raise HTTPException(status_code=500, detail=f"语音合成失败: {str(e)}") @app.get("/voices") async def get_voices(): """获取语音列表""" try: voices = await edge_tts.list_voices() voice_list = [] for voice in voices: voice_info = VoiceInfo( name=voice.get("ShortName", ""), locale=voice.get("Locale", ""), gender=voice.get("Gender", ""), displayName=voice.get("FriendlyName", "") ) voice_list.append(voice_info.dict()) logger.info(f"返回 {len(voice_list)} 个语音") return JSONResponse( content={"voices": voice_list}, headers={"Cache-Control": "public, max-age=3600"} ) except Exception as e: logger.error(f"获取语音列表失败: {str(e)}") raise HTTPException(status_code=500, detail=f"获取语音列表失败: {str(e)}") @app.get("/health") async def health_check(): """健康检查""" try: # 测试语音服务是否正常 voices = await edge_tts.list_voices() voices_count = len(voices) response = HealthResponse( status="healthy", service="edge-tts", timestamp=datetime.now().isoformat(), voices_count=voices_count ) return response.dict() except Exception as e: logger.error(f"健康检查失败: {str(e)}") response = HealthResponse( status="unhealthy", service="edge-tts", timestamp=datetime.now().isoformat() ) return JSONResponse( content=response.dict(), status_code=503 ) @app.get("/test") async def test_synthesis(): """测试接口""" try: # 简单的测试合成 communicate = edge_tts.Communicate( text="这是一条测试语音,用于验证服务是否正常工作。", voice="zh-CN-XiaoxiaoNeural" ) audio_stream = io.BytesIO() async for chunk in communicate.stream(): if chunk["type"] == "audio": audio_stream.write(chunk["data"]) audio_data = audio_stream.getvalue() return Response( content=audio_data, media_type="audio/mpeg", headers={ "Content-Disposition": "inline; filename=test.mp3", "Content-Length": str(len(audio_data)) } ) except Exception as e: raise HTTPException(status_code=500, detail=f"测试失败: {str(e)}") if __name__ == "__main__": import uvicorn logger.info("启动Edge TTS服务...") logger.info(f"服务地址: http://0.0.0.0:8000") uvicorn.run( app, host="0.0.0.0", port=8000, log_level="info", access_log=True )