stream_server.py.bak
2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# stream_server.py
import asyncio
import json
from fastapi import FastAPI, HTTPException, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import edge_tts
import io
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Edge TTS Stream API")
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class TTSRequest(BaseModel):
text: str
voice: str = "zh-CN-XiaoxiaoNeural"
rate: str = "+0%"
volume: str = "+0%"
@app.post("/stream-synthesize")
async def stream_synthesize(request: TTSRequest):
"""流式合成语音"""
try:
logger.info(f"开始合成语音: voice={request.voice}, text_length={len(request.text)}")
# 创建内存流
audio_stream = io.BytesIO()
# 使用edge-tts生成语音
communicate = edge_tts.Communicate(
request.text,
request.voice,
rate=request.rate,
volume=request.volume
)
# 流式写入音频数据
async for chunk in communicate.stream():
if chunk["type"] == "audio":
audio_stream.write(chunk["data"])
# 获取音频数据
audio_data = audio_stream.getvalue()
logger.info(f"语音合成完成,大小: {len(audio_data)} bytes")
# 返回音频流响应
return Response(
content=audio_data,
media_type="audio/mpeg",
headers={
"Content-Disposition": "inline; filename=speech.mp3",
"Content-Length": str(len(audio_data)),
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
"X-TTS-Status": "success",
"X-TTS-Voice": request.voice
}
)
except Exception as e:
logger.error(f"语音合成失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/voices")
async def get_voices():
"""获取语音列表"""
try:
voices = await edge_tts.list_voices()
voice_list = []
for voice in voices:
voice_list.append({
"name": voice.get("ShortName", ""),
"locale": voice.get("Locale", ""),
"gender": voice.get("Gender", ""),
"displayName": voice.get("FriendlyName", "")
})
return {"voices": voice_list}
except Exception as e:
logger.error(f"获取语音列表失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "service": "edge-tts-stream"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="info")