package com.xly.tts.service; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.xly.constant.BusinessCode; import com.xly.constant.ReturnTypeCode; import com.xly.entity.AiResponseAccumulator; import com.xly.entity.AiResponseDTO; import com.xly.service.UserSceneSessionService; import com.xly.service.XlyErpService; import com.xly.tts.bean.*; import com.xly.util.AdvancedSymbolRemover; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.io.InputStreamResource; import org.springframework.http.*; import org.springframework.stereotype.Service; import org.springframework.web.client.RestTemplate; import reactor.core.publisher.Flux; import javax.annotation.PostConstruct; import java.io.*; import java.time.Duration; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.io.InputStream; @Slf4j @Service @RequiredArgsConstructor public class PythonTtsProxyService { private final RestTemplate restTemplate; @Value("${tts.python.url:http://localhost:8000}") private String pythonServiceUrl; @Value("${tts.python.timeout:30000}") private int timeout; private ExecutorService executorService; private final XlyErpService xlyErpService; private final UserSceneSessionService userSceneSessionService; @PostConstruct public void init() { executorService = Executors.newFixedThreadPool(5); log.info("Python TTS代理服务初始化完成,Python服务地址: {}", pythonServiceUrl); } public ResponseEntity initTool(TTSRequestDTO request) { TTSResponseDTO ttsResponse = TTSResponseDTO.builder() .code(200) .message("success") .build(); return ResponseEntity.ok(ttsResponse); } /** * 流式合成语音 - 代理到Python服务 */ public ResponseEntity synthesizeStream(TTSRequestDTO request) { return getVoiceResult(request); } /** * @Author 钱豹 * @Date 8:54 2026/4/7 * @Param [request] * @return org.springframework.http.ResponseEntity * @Description 换一换 **/ public ResponseEntity change(TTSRequestDTO request) { String sUserId = request.getUserid(); String sUserName = request.getUsername(); String sBrandsId = request.getBrandsid(); String sSubsidiaryId = request.getSubsidiaryid(); String sUserType = request.getUsertype(); String authorization = request.getAuthorization(); Integer iPage = request.getPage(); Integer pageCount = request.getPageCount(); AiResponseDTO voiceText = xlyErpService.change(sUserId, sUserName , sBrandsId , sSubsidiaryId, sUserType, authorization,iPage, pageCount); return synthesizeStreamAi(request, voiceText); } /*** * @Author 钱豹 * @Date 14:35 2026/4/7 * @Param [request] * @return org.springframework.http.ResponseEntity * @Description 清空条件 **/ public ResponseEntity removeCondition(TTSRequestDTO request) { String sUserId = request.getUserid(); String sUserName = request.getUsername(); String sBrandsId = request.getBrandsid(); String sSubsidiaryId = request.getSubsidiaryid(); String sUserType = request.getUsertype(); String authorization = request.getAuthorization(); String removeCondition = request.getRemoveCondition(); AiResponseDTO voiceText = xlyErpService.removeCondition(sUserId, sUserName, sBrandsId, sSubsidiaryId, sUserType, authorization,removeCondition); return synthesizeStreamAi(request, voiceText); } /** * 【保持原有返回类型】AI对话 + 流式TTS */ public ResponseEntity synthesizeStreamAi(TTSRequestDTO request) { String userInput = request.getText(); String sUserId = request.getUserid(); String sUserName = request.getUsername(); String sBrandsId = request.getBrandsid(); String sSubsidiaryId = request.getSubsidiaryid(); String sUserType = request.getUsertype(); String authorization = request.getAuthorization(); AiResponseDTO voiceText = xlyErpService.erpUserInput(userInput, sUserId, sUserName, sBrandsId, sSubsidiaryId, sUserType, authorization); return synthesizeStreamAi(request, voiceText); } /** * 流式ERP + 流式TTS合成 */ public Flux synthesizeStreamAiStream(TTSRequestDTO request) { String userInput = request.getText(); String sUserId = request.getUserid(); String sUserName = request.getUsername(); String sBrandsId = request.getBrandsid(); String sSubsidiaryId = request.getSubsidiaryid(); String sUserType = request.getUsertype(); String authorization = request.getAuthorization(); String requestId = UUID.randomUUID().toString(); log.info("开始流式处理: requestId={}, userId={}", requestId, sUserId); // 1. 处理ERP流,将AiResponseDTO转换为TTSResponseDTO Flux erpStream = xlyErpService.erpUserInputStream( userInput, sUserId, sUserName, sBrandsId, sSubsidiaryId, sUserType, authorization ) .map(aiResponse -> { // 将AiResponseDTO转换为TTSResponseDTO TTSResponseDTO dto = TTSResponseDTO.builder() .code(200) .message("ERP_CHUNK") .requestId(requestId) .processedText(aiResponse.getAiText()) // 使用 aiText 字段传递文本 .systemText(aiResponse.getSystemText()) .sSceneName(aiResponse.getSSceneName()) .sMethodName(aiResponse.getSMethodName()) .sReturnType(aiResponse.getSReturnType()) .timestamp(System.currentTimeMillis()) .build(); log.debug("发送ERP片段: requestId={}, text长度={}", requestId, aiResponse.getAiText() != null ? aiResponse.getAiText().length() : 0); return dto; }); // 2. 收集完整的ERP响应(用于TTS) List textChunks = new ArrayList<>(); List systemTextChunks = new ArrayList<>(); Flux erpWithCollect = erpStream .doOnNext(dto -> { // 收集文本片段 if (dto.getProcessedText() != null) { textChunks.add(dto.getProcessedText()); } if (dto.getSystemText() != null) { systemTextChunks.add(dto.getSystemText()); } }); // 3. ERP完成后,发送完成标记并开始TTS return erpWithCollect .concatWith(Flux.defer(() -> { // 合并所有文本片段 String fullText = String.join("", textChunks); String fullSystemText = String.join("", systemTextChunks); if (StrUtil.isBlank(fullText)) { log.warn("ERP返回空文本: requestId={}", requestId); return Flux.just(TTSResponseDTO.error(requestId, 500, "ERP返回空文本")); } log.info("ERP流式处理完成,开始TTS合成: requestId={}, 文本长度={}", requestId, fullText.length()); // 发送ERP完成消息 TTSResponseDTO completeDto = TTSResponseDTO.builder() .code(200) .message("ERP_COMPLETE") .requestId(requestId) // .processedText(fullText) // .systemText(fullSystemText) .timestamp(System.currentTimeMillis()) .build(); // AiResponseDTO aiResponseDTO = AiResponseDTO.builder().aiText(fullText).systemText(fullSystemText); // 调用TTS合成 // synthesizeStreamAi(request, aiResponseDTO); // 先发送完成消息,再发送TTS流 return Flux.just(completeDto); //Flux.concat(Flux.just(completeDto), ttsStream); })) .timeout(Duration.ofSeconds(120)) .onErrorResume(e -> { log.error("流式处理失败: requestId={}", requestId, e); return Flux.just(TTSResponseDTO.error(requestId, 500, e.getMessage())); }) .doOnComplete(() -> log.info("流式处理完成: requestId={}", requestId)) .doOnCancel(() -> log.warn("流式处理取消: requestId={}", requestId)); } // /** // * 调用TTS服务进行语音合成(流式返回) // */ // private Flux synthesizeTtsStream(TTSRequestDTO originalRequest, // String text, // String requestId) { // // 构建TTS请求 // TTSRequestDTO ttsRequest = TTSRequestDTO.builder() // .userid(originalRequest.getUserid()) // .text(text) // .voice(originalRequest.getVoice()) // .rate(originalRequest.getRate()) // .volume(originalRequest.getVolume()) // .voiceless(originalRequest.getVoiceless()) // .build(); // // // 调用Python TTS服务 // return webClient.post() // .uri("http://python-service:5000/api/tts/synthesize") // .contentType(MediaType.APPLICATION_JSON) // .bodyValue(ttsRequest) // .retrieve() // .bodyToFlux(TTSResponseDTO.class) // .doOnNext(ttsResponse -> { // ttsResponse.setRequestId(requestId); // ttsResponse.setMessage("TTS_SEGMENT"); // log.debug("发送TTS片段: requestId={}, audioSize={}", // requestId, // ttsResponse.getAudio() != null ? ttsResponse.getAudio().length() : 0); // }); // } public ResponseEntity cleanMemory(TTSRequestDTO request) { String sUserId = request.getUserid(); String sUserName = request.getUsername(); String sBrandsId = request.getBrandsid(); String sSubsidiaryId = request.getSubsidiaryid(); String sUserType = request.getUsertype(); String authorization = request.getAuthorization(); AiResponseDTO aiResponseDTO = xlyErpService.cleanMemory(sUserId, sUserName, sBrandsId, sSubsidiaryId, sUserType, authorization); return ResponseEntity.ok(TTSResponseDTO.builder() .code(200) .message("success") .originalText(request.getText()) .processedText(aiResponseDTO.getAiText()) .systemText(aiResponseDTO.getSystemText()) .voice(request.getVoice()) .sSceneName(aiResponseDTO.getSSceneName()) .sMethodName(aiResponseDTO.getSMethodName()) .sReturnType(aiResponseDTO.getSReturnType()) .timestamp(System.currentTimeMillis()) .textLength(request.getText().length()) .build()); } /** * 【保持原有返回类型】不动!!! */ public ResponseEntity init(TTSRequestDTO request) { String sUserId = request.getUserid(); String sUserName = request.getUsername(); String sBrandsId = request.getBrandsid(); String sSubsidiaryId = request.getSubsidiaryid(); String sUserType = request.getUsertype(); String authorization = request.getAuthorization(); // 清空记忆 userSceneSessionService.cleanUserSession(sUserId); AiResponseDTO voiceText = xlyErpService.initSceneGuide(StrUtil.EMPTY, sUserId, sUserName, sBrandsId, sSubsidiaryId, sUserType, authorization); voiceText.setSReturnType(ReturnTypeCode.HTML.getCode()); return synthesizeStreamAi(request, voiceText); } /** * 内部流式请求Python */ public ResponseEntity synthesizeStreamAi(TTSRequestDTO request, AiResponseDTO aiResponseDTO) { String aiText = aiResponseDTO.getAiText(); String systemText = aiResponseDTO.getSystemText(); if (ObjectUtil.isEmpty(systemText)) { systemText = StrUtil.EMPTY; } String voiceTextNew = AdvancedSymbolRemover.removePunctuationHtml(aiText); // ============================ // 【绝对唯一】不会重复、不会覆盖 // ============================ String cacheKey = request.getUserid() + "_" + System.nanoTime(); TTSResponseDTO dto = TTSResponseDTO.builder() .code(200) .message("success") .cacheKey(cacheKey) // 前端靠这个取自己的分段 .originalText(request.getText()) .processedText(aiText) .audioText(voiceTextNew) .systemText(systemText) .voice(request.getVoice()) .sSceneName(aiResponseDTO.getSSceneName()) .sMethodName(aiResponseDTO.getSMethodName()) .sReturnType(aiResponseDTO.getSReturnType()) .dbType(aiResponseDTO.getDbType()) .dbCach(aiResponseDTO.getDbCach()) .sCopyTo(aiResponseDTO.getSCopyTo()) .sCopyToSrcId(aiResponseDTO.getSCopyToSrcId()) .sCommonts(BusinessCode.COMMONTS.getMessage()) .timestamp(System.currentTimeMillis()) .textLength((aiText + systemText).length()) .build(); boolean voiceless = Boolean.TRUE.equals(request.getVoiceless()); if (!voiceless || ObjectUtil.isEmpty(voiceTextNew)) { return ResponseEntity.ok(dto); } // 平均分割文字 List textParts = splitTextSmart(voiceTextNew, 30); dto.setAudioSize(textParts.size()); // 异步分段合成 CompletableFuture.runAsync(() -> { for (int i = 0; i < textParts.size(); i++) { String part = textParts.get(i); if (ObjectUtil.isEmpty(part)) continue; try { Map params = new HashMap<>(); params.put("text", part); params.put("voice", request.getVoice()); params.put("rate", request.getRate() != null ? request.getRate() : "+10%"); params.put("volume", request.getVolume() != null ? request.getVolume() : "+0%"); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); headers.setAccept(Collections.singletonList(MediaType.APPLICATION_OCTET_STREAM)); HttpEntity> entity = new HttpEntity<>(params, headers); ResponseEntity response = restTemplate.exchange( pythonServiceUrl + "/stream-synthesize", HttpMethod.POST, entity, byte[].class ); if (response.getStatusCode().is2xxSuccessful() && response.getBody() != null) { String base64 = Base64.getEncoder().encodeToString(response.getBody()); // ============================ // 【关键】带序号存储!前端靠序号知道顺序! // ============================ LocalAudioCache.addPiece(cacheKey, i, part, base64); } } catch (Exception e) { log.warn("分段合成失败: {}", e.getMessage()); } } }, executorService); return ResponseEntity.ok(dto); } /** * 内部流式请求Python */ public Flux synthesizeStreamAiNew(TTSRequestDTO request, AiResponseDTO aiResponseDTO) { String aiText = aiResponseDTO.getAiText(); String systemText = aiResponseDTO.getSystemText(); if (ObjectUtil.isEmpty(systemText)) { systemText = StrUtil.EMPTY; } String voiceTextNew = AdvancedSymbolRemover.removePunctuationHtml(aiText); // ============================ // 【绝对唯一】不会重复、不会覆盖 // ============================ String cacheKey = request.getUserid() + "_" + System.nanoTime(); TTSResponseDTO dto = TTSResponseDTO.builder() .code(200) .message("success") .cacheKey(cacheKey) // 前端靠这个取自己的分段 .originalText(request.getText()) .processedText(aiText) .audioText(voiceTextNew) .systemText(systemText) .voice(request.getVoice()) .sSceneName(aiResponseDTO.getSSceneName()) .sMethodName(aiResponseDTO.getSMethodName()) .sReturnType(aiResponseDTO.getSReturnType()) .sCommonts(BusinessCode.COMMONTS.getMessage()) .timestamp(System.currentTimeMillis()) .textLength((aiText + systemText).length()) .build(); boolean voiceless = Boolean.TRUE.equals(request.getVoiceless()); if (!voiceless || ObjectUtil.isEmpty(voiceTextNew)) { return Flux.just(dto); } // 平均分割文字 List textParts = splitTextSmart(voiceTextNew, 30); dto.setAudioSize(textParts.size()); // 异步分段合成 CompletableFuture.runAsync(() -> { for (int i = 0; i < textParts.size(); i++) { String part = textParts.get(i); if (ObjectUtil.isEmpty(part)) continue; try { Map params = new HashMap<>(); params.put("text", part); params.put("voice", request.getVoice()); params.put("rate", request.getRate() != null ? request.getRate() : "+10%"); params.put("volume", request.getVolume() != null ? request.getVolume() : "+0%"); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); headers.setAccept(Collections.singletonList(MediaType.APPLICATION_OCTET_STREAM)); HttpEntity> entity = new HttpEntity<>(params, headers); ResponseEntity response = restTemplate.exchange( pythonServiceUrl + "/stream-synthesize", HttpMethod.POST, entity, byte[].class ); if (response.getStatusCode().is2xxSuccessful() && response.getBody() != null) { String base64 = Base64.getEncoder().encodeToString(response.getBody()); // ============================ // 【关键】带序号存储!前端靠序号知道顺序! // ============================ LocalAudioCache.addPiece(cacheKey, i, part, base64); } } catch (Exception e) { log.warn("分段合成失败: {}", e.getMessage()); } } }, executorService); return Flux.just(dto); } // ============================================== // 智能分段:优先按 。!?; , 空格 断开 // 不会把一句话生硬切断,更自然 // ============================================== private List splitTextSmart(String text, int maxLength) { List parts = new ArrayList<>(); if (text == null || text.isEmpty()) return parts; int len = text.length(); int start = 0; while (start < len) { int end = Math.min(start + maxLength, len); // 如果不是最后一段,寻找最近的断句点 if (end < len) { // 优先按 。!?; 断句 int splitPos = lastIndexOfAny(text, start, end, '。', '!', '?', ';'); if (splitPos == -1) { // 其次按 , 逗号 splitPos = lastIndexOfAny(text, start, end, ','); } if (splitPos == -1) { // 最后按空格 splitPos = lastIndexOfAny(text, start, end, ' '); } if (splitPos != -1) { end = splitPos + 1; } } String part = text.substring(start, end).trim(); if (!part.isEmpty()) { parts.add(part); } start = end; } return parts; } // 工具:查找最后出现的符号 private int lastIndexOfAny(String text, int start, int end, char... chars) { for (int i = end - 1; i >= start; i--) { for (char c : chars) { if (text.charAt(i) == c) { return i; } } } return -1; } public ResponseEntity getVoiceResult(TTSRequestDTO request) { try { String voiceText = AdvancedSymbolRemover.removePunctuationHtml(request.getText()); Map pythonRequest = new HashMap<>(); pythonRequest.put("text", voiceText); pythonRequest.put("voice", request.getVoice()); pythonRequest.put("rate", request.getRate() != null ? request.getRate() : "+0%"); pythonRequest.put("volume", request.getVolume() != null ? request.getVolume() : "+0%"); HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); headers.setAccept(Arrays.asList(MediaType.APPLICATION_OCTET_STREAM, MediaType.ALL)); HttpEntity> entity = new HttpEntity<>(pythonRequest, headers); ResponseEntity response = restTemplate.exchange( pythonServiceUrl + "/stream-synthesize", HttpMethod.POST, entity, byte[].class ); if (response.getStatusCode() == HttpStatus.OK && response.getBody() != null) { InputStream inputStream = new ByteArrayInputStream(response.getBody()); InputStreamResource resource = new InputStreamResource(inputStream); HttpHeaders responseHeaders = new HttpHeaders(); responseHeaders.setContentType(MediaType.parseMediaType("audio/mpeg")); responseHeaders.setContentLength(response.getBody().length); responseHeaders.set("Content-Disposition", "inline; filename=\"speech.mp3\""); return ResponseEntity.ok().headers(responseHeaders).body(resource); } } catch (Exception e) { return fallbackResponse(request); } return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build(); } public ResponseEntity quickSynthesize(String text, String voice) { TTSRequestDTO request = new TTSRequestDTO(); request.setText(text); request.setVoice(voice); return synthesizeStream(request); } public CompletableFuture> synthesizeStreamAsync(TTSRequestDTO request) { return CompletableFuture.supplyAsync(() -> synthesizeStream(request), executorService); } public List getAvailableVoices() { try { ResponseEntity response = restTemplate.getForEntity(pythonServiceUrl + "/voices", Map.class); if (response.getStatusCode() == HttpStatus.OK && response.getBody() != null) { List> voicesData = (List>) response.getBody().get("voices"); List voices = new ArrayList<>(); for (Map vd : voicesData) { VoiceInfoDTO vo = new VoiceInfoDTO(); vo.setName(vd.get("name")); vo.setLocale(vd.get("locale")); vo.setGender(vd.get("gender")); vo.setDisplayName(vd.get("displayName")); voices.add(vo); } return voices; } } catch (Exception e) { log.error("获取语音列表失败", e); } return getDefaultVoices(); } public VoiceInfoDTO getVoiceDetail(String name) { return getAvailableVoices().stream().filter(v -> v.getName().equals(name)).findFirst().orElse(null); } public boolean healthCheck() { try { ResponseEntity res = restTemplate.getForEntity(pythonServiceUrl + "/health", Map.class); return res.getStatusCode() == HttpStatus.OK && "healthy".equals(res.getBody().get("status")); } catch (Exception e) { return false; } } public List> batchSynthesize(List requests) { List> list = new ArrayList<>(); for (TTSRequestDTO req : requests) list.add(synthesizeStream(req)); return list; } public ResponseEntity synthesizeDirect(TTSRequestDTO request) { return synthesizeStream(request); } public void shutdown() { if (executorService != null) executorService.shutdown(); log.info("Python TTS服务已关闭"); } private ResponseEntity fallbackResponse(TTSRequestDTO request) { try { TTSRequestDTO req = new TTSRequestDTO(); req.setText("服务暂时不可用"); req.setVoice("zh-CN-XiaoxiaoNeural"); return synthesizeStream(req); } catch (Exception e) { return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).body(null); } } private List getDefaultVoices() { return Arrays.asList( createVoice("zh-CN-XiaoxiaoNeural", "zh-CN", "Female", "晓晓"), createVoice("zh-CN-YunyangNeural", "zh-CN", "Male", "云扬") ); } private VoiceInfoDTO createVoice(String name, String locale, String gender, String displayName) { VoiceInfoDTO v = new VoiceInfoDTO(); v.setName(name); v.setLocale(locale); v.setGender(gender); v.setDisplayName(displayName); return v; } }