From 11f61c6c7c007ca2267a119b1784a41cf5052d94 Mon Sep 17 00:00:00 2001 From: qianbao Date: Thu, 26 Mar 2026 10:17:31 +0800 Subject: [PATCH] 添加向量库 --- pom.xml | 8 ++++++++ src/main/java/com/xly/agent/ChatiAgent.java | 12 ++++++++++++ src/main/java/com/xly/constant/ReturnTypeCode.java | 1 + src/main/java/com/xly/entity/ToolMeta.java | 1 + src/main/java/com/xly/milvus/service/MilvusService.java | 2 +- src/main/java/com/xly/milvus/service/impl/MilvusServiceImpl.java | 122 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------- src/main/java/com/xly/milvus/util/MilvusTimeUtil.java | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main/java/com/xly/service/XlyErpService.java | 210 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------------------------ src/main/java/com/xly/tts/service/PythonTtsProxyService.java | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------------------------------- src/main/java/com/xly/web/TTSStreamController.java | 16 +++++----------- src/main/resources/templates/chat.html | 192 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------- 11 files changed, 617 insertions(+), 148 deletions(-) create mode 100644 src/main/java/com/xly/milvus/util/MilvusTimeUtil.java diff --git a/pom.xml b/pom.xml index 52b4ae8..b75e830 100644 --- a/pom.xml +++ b/pom.xml @@ -40,6 +40,7 @@ 1.17.2 2.6.15 + 5.18.0 @@ -106,6 +107,13 @@ + net.sourceforge.tess4j + tess4j + ${tess4j.version} + compile + + + org.springframework.boot spring-boot-starter-thymeleaf diff --git a/src/main/java/com/xly/agent/ChatiAgent.java b/src/main/java/com/xly/agent/ChatiAgent.java index 5ea9ef5..9c8ff9a 100644 --- a/src/main/java/com/xly/agent/ChatiAgent.java +++ b/src/main/java/com/xly/agent/ChatiAgent.java @@ -4,6 +4,7 @@ import dev.langchain4j.service.MemoryId; import dev.langchain4j.service.SystemMessage; import dev.langchain4j.service.UserMessage; import dev.langchain4j.service.V; +import reactor.core.publisher.Flux; public interface ChatiAgent { @SystemMessage(""" @@ -16,4 +17,15 @@ public interface ChatiAgent { """) @UserMessage("用户说:{{userInput}}") String chat(@MemoryId String userId, @V("userInput") String userInput); + + @SystemMessage(""" + 你是一个轻松自然的聊天伙伴,语气亲切口语化,像朋友一样闲聊。 + 要求:1. 不生硬、不说教,避免书面化表达; + 2. 主动接梗,适当延伸话题,不一问一答; + 3. 偶尔带点小幽默,保持轻松无压力的氛围; + 4. 回答简洁,符合日常聊天的语气,不啰嗦。 + 5. 首次沟通时发现称呼不是“小羚羊”时,请回复“我不是..,我是小羚羊”,语气俏皮。 + """) + @UserMessage("用户说:{{userInput}}") + Flux chatStream(@MemoryId String userId, @V("userInput") String userInput); } diff --git a/src/main/java/com/xly/constant/ReturnTypeCode.java b/src/main/java/com/xly/constant/ReturnTypeCode.java index feb3c7a..41663a5 100644 --- a/src/main/java/com/xly/constant/ReturnTypeCode.java +++ b/src/main/java/com/xly/constant/ReturnTypeCode.java @@ -14,6 +14,7 @@ public enum ReturnTypeCode { // 成功 HTML("html", "html"), + STREAM("stream", "stream"), MAKEDOWN("makedown", "makedown"); diff --git a/src/main/java/com/xly/entity/ToolMeta.java b/src/main/java/com/xly/entity/ToolMeta.java index 42e14cb..b2874c4 100644 --- a/src/main/java/com/xly/entity/ToolMeta.java +++ b/src/main/java/com/xly/entity/ToolMeta.java @@ -49,5 +49,6 @@ public class ToolMeta { private LocalDateTime tMakeDate; private String sVectorfiled; private String sVectorjson; + private String sVectorfiledAll; } diff --git a/src/main/java/com/xly/milvus/service/MilvusService.java b/src/main/java/com/xly/milvus/service/MilvusService.java index b0c800c..178052e 100644 --- a/src/main/java/com/xly/milvus/service/MilvusService.java +++ b/src/main/java/com/xly/milvus/service/MilvusService.java @@ -54,5 +54,5 @@ public interface MilvusService { * @return java.util.Map * @Description 获取配置 **/ - Map getMilvusFiled(String sVectorfiled); + Map getMilvusFiled(String sVectorfiled,String sVectorfiledAll); } \ No newline at end of file diff --git a/src/main/java/com/xly/milvus/service/impl/MilvusServiceImpl.java b/src/main/java/com/xly/milvus/service/impl/MilvusServiceImpl.java index ec2bbb9..b5abfee 100644 --- a/src/main/java/com/xly/milvus/service/impl/MilvusServiceImpl.java +++ b/src/main/java/com/xly/milvus/service/impl/MilvusServiceImpl.java @@ -11,6 +11,7 @@ import com.google.gson.JsonObject; import com.xly.milvus.service.MilvusService; import com.xly.milvus.service.VectorizationService; import com.xly.milvus.util.MapToJsonConverter; +import com.xly.milvus.util.MilvusTimeUtil; import com.xly.service.DynamicExeDbService; import com.xly.tts.bean.TTSResponseDTO; import io.milvus.response.SearchResultsWrapper; @@ -33,6 +34,10 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.math.BigDecimal; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -49,7 +54,10 @@ public class MilvusServiceImpl implements MilvusService { private final MilvusClientV2 milvusClient; private final VectorizationService vectorizationService; private final DynamicExeDbService dynamicExeDbService; - + private static final long NULL_TIMESTAMP = -1L; + // 日期格式常量 + private static final DateTimeFormatter ISO_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"); // 或者从配置文件读取 @Value("${milvus.vector.dimension:384}") private int VECTOR_DIM; @@ -90,13 +98,12 @@ public class MilvusServiceImpl implements MilvusService { String sInputTabelName = reqMap.get("sInputTabelName").toString(); String sVectorfiled = reqMap.get("sVectorfiled").toString(); String sVectorjson = reqMap.get("sVectorjson").toString(); - + //创建集合 + createCollectionIfNotExists(sInputTabelName, sVectorfiled, sVectorjson,true); String tUpdateDate = DateUtil.now(); String tUpdateDateUp = getUpdateDateUp(sInputTabelName); //获取需要同步地数据 List> data = getAddData(sInputTabelName,tUpdateDate, tUpdateDateUp); - //创建集合 - createCollectionIfNotExists(sInputTabelName, sVectorfiled, sVectorjson,true); if(ObjectUtil.isNotEmpty(data)){ //插入数据 long num= addDataToCollection(sInputTabelName, sVectorfiled, sVectorjson,data); @@ -257,11 +264,12 @@ public class MilvusServiceImpl implements MilvusService { * @Description 返回组装动态内容 **/ @Override - public Map getMilvusFiled(String sVectorfiled){ - String[] sVectorfiledArray = sVectorfiled.split(","); + public Map getMilvusFiled(String sVectorfiled,String sVectorfiledAll){ List sFileds = new ArrayList<>(); List sFiledDescriptions = new ArrayList<>(); + List sFiledDescriptionsAll = new ArrayList<>(); List> titleList = new LinkedList<>(); + String[] sVectorfiledArray = sVectorfiled.split(","); for(String sVectorfiledOne : sVectorfiledArray){ Map title = new HashMap<>(); @@ -278,9 +286,21 @@ public class MilvusServiceImpl implements MilvusService { title.put("sTitle",sDescriptions); titleList.add(title); } + String[] sVectorfiledArrayAll = sVectorfiledAll.split(","); + for(String sVectorfiledOne : sVectorfiledArrayAll){ + String[] sVectorfiledOneArray = sVectorfiledOne.split(":"); + String sDescriptions = sVectorfiledOneArray[0]; + String sName = sVectorfiledOneArray[1]; + // 处理描述中可能包含的换行,保持缩进一致 +// String formattedDesc = sDescriptions.replace("\n", "\n "); +// sFiledDescriptions.add(String.format(" - %s: %s", sName, formattedDesc)); + String formattedDesc =String.format("%s: %s", sName, sDescriptions); + sFiledDescriptionsAll.add(formattedDesc); + } Map rMap = new HashMap<>(); rMap.put("sMilvusFiled", String.join(",", sFileds)); rMap.put("sMilvusFiledDescription", String.join(",", sFiledDescriptions)); + rMap.put("sMilvusFiledDescriptionAll", String.join(",", sFiledDescriptionsAll)); rMap.put("sFileds", sFileds); rMap.put("title", titleList); return rMap; @@ -430,17 +450,17 @@ public class MilvusServiceImpl implements MilvusService { for (List resultList : searchResults) { // 遍历每个搜索结果 for (SearchResp.SearchResult result : resultList) { - Map item = new HashMap<>(); + // 获取实体字段数据 + Map entity = result.getEntity(); + Map metadata = (Map) entity.get("metadata"); // 获取相似度分数 Float score = result.getScore(); if (score != null) { - item.put("score", score); + metadata.put("score", score); } - // 获取实体字段数据 - Map entity = result.getEntity(); // 将所有字段添加到结果中 - item.putAll(entity); - results.add(item); +// item.putAll(entity); + results.add(metadata); } } log.info("处理完成,共 {} 条搜索结果", results.size()); @@ -548,27 +568,75 @@ public class MilvusServiceImpl implements MilvusService { return; } // 基本类型直接添加 - if (value instanceof String) { + if(fieldName.startsWith("t")){ + if(ObjectUtil.isNotEmpty(value)){ + long date = MilvusTimeUtil.toTimestamp(value.toString()); + row.addProperty(fieldName,date); + }else{ + row.addProperty(fieldName,NULL_TIMESTAMP); + } + }else if (value instanceof String) { row.addProperty(fieldName, (String) value); } else if (value instanceof Number) { row.addProperty(fieldName, (Number) value); } else if (value instanceof Boolean) { row.addProperty(fieldName, (Boolean) value); - } - // List / 数组类型 - else if (value instanceof List) { + }else if (value instanceof List) { + // List / 数组类型 JsonArray jsonArray = new JsonArray(); for (Object item : (List) value) { addJsonElement(jsonArray, item); } row.add(fieldName, jsonArray); - } - // 其他对象转字符串 - else { + } else { + // 其他对象转字符串 row.addProperty(fieldName, value.toString()); } } /** + * 解析字符串为时间戳 + * 支持多种格式 + */ + private static long parseStringToTimestamp(String dateStr) { + if (dateStr == null || dateStr.trim().isEmpty()) { + return NULL_TIMESTAMP; + } + + try { + // 1. 尝试 ISO 8601 格式(如:2025-07-21T09:26:09) + if (dateStr.matches("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}")) { + LocalDateTime ldt = LocalDateTime.parse(dateStr, ISO_FORMATTER); + return ldt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + } + + // 2. 尝试 yyyy-MM-dd HH:mm:ss 格式 + if (dateStr.matches("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}")) { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + LocalDateTime ldt = LocalDateTime.parse(dateStr, formatter); + return ldt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + } + + // 3. 尝试 yyyy-MM-dd 格式(当天 00:00:00) + if (dateStr.matches("\\d{4}-\\d{2}-\\d{2}")) { + LocalDate ld = LocalDate.parse(dateStr); + return ld.atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli(); + } + + // 4. 尝试直接解析为时间戳数字(如果字符串全是数字) + if (dateStr.matches("\\d+")) { + return Long.parseLong(dateStr); + } + + // 5. 如果都不匹配,返回空值 + System.err.println("无法解析的日期格式: " + dateStr); + return NULL_TIMESTAMP; + + } catch (Exception e) { + System.err.println("日期解析失败: " + dateStr + ", 错误: " + e.getMessage()); + return NULL_TIMESTAMP; + } + } + /** * 递归处理 List 元素(支持无限层嵌套 List) */ private void addJsonElement(JsonArray jsonArray, Object item) { @@ -605,12 +673,11 @@ public class MilvusServiceImpl implements MilvusService { //是否删除集合 重新创建 if (bRset){ + this.delAiMilvusVectorRecord(collectionName); // 1. 删除旧集合 milvusClient.dropCollection(DropCollectionReq.builder() .collectionName(collectionName) .build()); - //删除对应的记录表 - delAiMilvusVectorRecord(collectionName); } // 检查集合是否存在 HasCollectionReq hasCollectionReq = HasCollectionReq.builder() @@ -777,9 +844,11 @@ public class MilvusServiceImpl implements MilvusService { if(sKey.startsWith("d")){ return DataType.Double; }else if(sKey.startsWith("i")){ - return DataType.Int64; + return DataType.Int32; }else if(sKey.startsWith("b")){ return DataType.Bool; + }else if(sKey.startsWith("t")){ + return DataType.Int64; }else{ return DataType.VarChar; } @@ -793,7 +862,14 @@ public class MilvusServiceImpl implements MilvusService { * @Description 索引类型 **/ public IndexParam.IndexType indexField(String sKey) { - return IndexParam.IndexType.TRIE; + if(sKey.startsWith("d") || sKey.startsWith("i")){ + return IndexParam.IndexType.STL_SORT; + }else if(sKey.startsWith("t")){ + return IndexParam.IndexType.STL_SORT; + }else{ + return IndexParam.IndexType.TRIE; + } + } diff --git a/src/main/java/com/xly/milvus/util/MilvusTimeUtil.java b/src/main/java/com/xly/milvus/util/MilvusTimeUtil.java new file mode 100644 index 0000000..5112f0c --- /dev/null +++ b/src/main/java/com/xly/milvus/util/MilvusTimeUtil.java @@ -0,0 +1,73 @@ +package com.xly.milvus.util; + +import java.time.*; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; +import java.util.*; + +public class MilvusTimeUtil { + + public static final long NULL_TIMESTAMP = -1L; + + // 多种 ISO 格式 + private static final List ISO_FORMATTERS = Arrays.asList( + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"), // 完整格式 + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm"), // 缺少秒 + DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH"), // 只有小时 + DateTimeFormatter.ISO_LOCAL_DATE_TIME, // 标准 ISO + DateTimeFormatter.ISO_OFFSET_DATE_TIME // 带时区 + ); + + /** + * 智能解析多种 ISO 格式 + */ + public static long toTimestamp(String isoString) { + if (isoString == null || isoString.trim().isEmpty()) { + return NULL_TIMESTAMP; + } + + // 尝试多种格式 + for (DateTimeFormatter formatter : ISO_FORMATTERS) { + try { + LocalDateTime localDateTime = LocalDateTime.parse(isoString, formatter); + return localDateTime.atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli(); + } catch (DateTimeParseException e) { + // 继续尝试下一个格式 + } + } + + // 特殊处理:如果只有日期 + try { + LocalDate localDate = LocalDate.parse(isoString); + return localDate.atStartOfDay(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli(); + } catch (DateTimeParseException e) { + // 忽略 + } + + System.err.println("无法解析的日期格式: " + isoString); + return NULL_TIMESTAMP; + } + + /** + * 补全秒部分(将缺少秒的格式补全为 :00) + */ + public static String fillSeconds(String isoString) { + if (isoString == null) return null; + + // 匹配 yyyy-MM-dd'T'HH:mm 格式 + if (isoString.matches("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}")) { + return isoString + ":00"; + } + + // 匹配 yyyy-MM-dd'T'HH 格式 + if (isoString.matches("\\d{4}-\\d{2}-\\d{2}T\\d{2}")) { + return isoString + ":00:00"; + } + + return isoString; + } +} \ No newline at end of file diff --git a/src/main/java/com/xly/service/XlyErpService.java b/src/main/java/com/xly/service/XlyErpService.java index 4fd724a..0d175f0 100644 --- a/src/main/java/com/xly/service/XlyErpService.java +++ b/src/main/java/com/xly/service/XlyErpService.java @@ -79,47 +79,137 @@ public class XlyErpService { /** - * 新的流式方法 - 返回 Flux - * 每个AiResponseDTO包含一个文本片段 - */ - /** - * 模拟的erpUserInputStream实现 - */ - public Flux erpUserInputStream(String userInput, String sUserId, - String sUserName, String sBrandsId, - String sSubsidiaryId, String sUserType, - String authorization) { - String requestId = UUID.randomUUID().toString(); - - // 按句子分割 - String[] sentences = userInput.split("(?<=[。!?.!?])"); - int totalChunks = sentences.length; - - return Flux.range(0, totalChunks) - .delayElements(Duration.ofMillis(200)) - .map(i -> { - String sentence = sentences[i].trim(); - if (sentence.isEmpty()) return null; - - return AiResponseDTO.builder() - .requestId(requestId) - .code(200) - .message("ERP_CHUNK") - .status("PROCESSING") - .textFragment(sentence) - .chunkIndex(i) - .totalChunks(totalChunks) - .isLastChunk(i == totalChunks - 1) - .progress((i + 1) * 100 / totalChunks) - .timestamp(System.currentTimeMillis()) - .sSceneName("客服咨询") - .sMethodName("chat") - .sReturnType("MARKDOWN") - .build(); - }) - .filter(Objects::nonNull); - } + * @Author 钱豹 + * @Date 19:18 2026/1/27 + * @Param [userInput, userId, sUserType] + * @return reactor.core.publisher.Flux + * @Description 问答(流式返回) + **/ + public Flux erpUserInputStream(String userInput, + String userId, + String sUserName, + String sBrandsId, + String sSubsidiaryId, + String sUserType, + String authorization) { + String sceneName = StrUtil.EMPTY; + String methodName = StrUtil.EMPTY; + UserSceneSession session=null; + try { + // 0. 预处理用户输入:去空格、转小写(方便匹配) + String input= InputPreprocessor.preprocessWithCommons(userInput); + // 1. 初始化用户场景会话(权限内场景) + session = userSceneSessionService.getUserSceneSession(userId,sUserName,sBrandsId,sSubsidiaryId,sUserType,authorization); + session.setAuthorization(authorization); + session.setSFunPrompts(null); + sceneName = ObjectUtil.isNotEmpty(session.getCurrentScene())?session.getCurrentScene().getSSceneName():StrUtil.EMPTY; + methodName = ObjectUtil.isNotEmpty(session.getCurrentTool())?session.getCurrentTool().getSMethodName():StrUtil.EMPTY; + // 2. 特殊指令:重置场景(无论是否已选,都可重置) + if (input.contains("重置") || input.contains("重新选择")) { + //清除记忆缓存 + reSet(userId ,sUserName, sBrandsId ,sSubsidiaryId,sUserType,authorization,session); + return Flux.just(AiResponseDTO.builder() + .aiText(resetUserScene(session.getUserId(), session)) + .sSceneName(sceneName) + .sMethodName(methodName) + .sReturnType(ReturnTypeCode.HTML.getCode()) + .build()); + } + //聊天只能体 + if (session.getCurrentScene() != null + && Objects.equals(session.getCurrentScene().getSSceneNo(), "ChatZone")) + { + return getChatiAgentStream(input, session); + } + // 3. 未选场景:先展示场景选择界面,处理用户序号选择 + if (!session.isSceneSelected() && ValiDataUtil.me().isPureNumber(input)){ + // 3.1 尝试处理场景选择(输入序号则匹配,否则展示选择提示) + AiResponseDTO aiResponseDTO = handleSceneSelect(userId, input, session); + return Flux.just(aiResponseDTO); + } + // 4. 构建Agent,执行业务交互,如果返回为null,说明大模型没有判段出场景,必判断出后才能继续 + ErpAiAgent aiAgent = createErpAiAgent(userId, input, session); + // 没有选择到场景,进闲聊模式 + if (aiAgent == null){ + return getChatiAgentStream (input,session); + } + String sResponMessage = StrUtil.EMPTY; + //用户输入添加方法(如果没有方法,动态SQL方法不需要) + if(!(ObjectUtil.isNotEmpty(session.getCurrentTool()) + && ObjectUtil.isNotEmpty(session.getCurrentTool().getSInputTabelName()) + && ObjectUtil.isNotEmpty(session.getCurrentTool().getSStructureMemo())) + ){ + sResponMessage = aiAgent.chat(userId, input); + } + if(ObjectUtil.isNotEmpty(session.getCurrentTool()) + && !ObjectUtil.isNotEmpty(session.getCurrentTool().getSInputTabelName()) + ){ + input = session.getCurrentTool().getSMethodName()+","+input; + } + //动态方法或返回需要提示的信息 + if(ObjectUtil.isNotEmpty(session.getSFunPrompts())){ + return Flux.just(AiResponseDTO.builder() + .aiText(session.getSFunPrompts()) + .sSceneName(sceneName) + .sMethodName(methodName) + .sReturnType(ReturnTypeCode.HTML.getCode()) + .build()); + } +// 1.找到方法并且本方法带表结构描述时,需要调用 自然语言转SQL智能体 + if((ObjectUtil.isNotEmpty(session.getCurrentTool()) + && ObjectUtil.isNotEmpty(session.getCurrentTool().getSInputTabelName()) + && ObjectUtil.isNotEmpty(session.getCurrentTool().getSStructureMemo())) + ){ + //查询是否走向量库 还是数据库查询 + Boolean isAggregation = aiAgent.routeQuery(session.getUserId(), input); + if(!isAggregation){ + //获取常量库内容 + sResponMessage = getMilvus(session, input, aiAgent); + }else { + sResponMessage = getDynamicTableSql(session, input, userId, userInput,0,StrUtil.EMPTY,StrUtil.EMPTY,"0",StrUtil.EMPTY, aiAgent); + } + return Flux.just(AiResponseDTO.builder() + .aiText(sResponMessage) + .sSceneName(sceneName) + .sMethodName(methodName) + .sReturnType(ReturnTypeCode.HTML.getCode()) + .build()); + } else if (ObjectUtil.isNotEmpty(session.getCurrentTool())) { + //2.处理工具参数采集结束后业务逻辑处理 + //调用方法,参数缺失部分提示,就直接使用方法返回的 + sResponMessage = dynamicToolProvider.doDynamicTool(session.getCurrentTool(),session); + return Flux.just(AiResponseDTO.builder() + .aiText(sResponMessage) + .sSceneName(sceneName) + .sMethodName(methodName) + .sReturnType(ReturnTypeCode.HTML.getCode()) + .build()); + }else if(session.getCurrentScene()== null ){ + return Flux.just(AiResponseDTO.builder() + .aiText("当前场景:没有选择 退回当前场景 请输入 "+ CommonConstant.RESET + sResponMessage) + .sSceneName(sceneName) + .sMethodName(methodName) + .sReturnType(ReturnTypeCode.HTML.getCode()) + .build()); + }else{ + return getChatiAgentStream (input, session); + } + } catch (Exception e) { + e.printStackTrace(); + return Flux.just(AiResponseDTO.builder() + .aiText("系统异常:" + e.getMessage() + ",请稍后重试!") + .sSceneName(sceneName) + .sMethodName(methodName) + .sReturnType(ReturnTypeCode.HTML.getCode()) + .build()); + }finally { + //5.执行工具方法后,清除记忆 + if(session !=null && session.getBCleanMemory()){ + doCleanUserMemory(session,userId); + } + } + } /*** * @Author 钱豹 @@ -278,15 +368,17 @@ public class XlyErpService { try{ String sVectorfiled = session.getCurrentTool().getSVectorfiled(); String sInputTabelName = session.getCurrentTool().getSInputTabelName(); - Map rMap = milvusService.getMilvusFiled(sVectorfiled); + String sVectorfiledAll = session.getCurrentTool().getSVectorfiledAll(); + Map rMap = milvusService.getMilvusFiled(sVectorfiled,sVectorfiledAll); String sMilvusFiled = rMap.get("sMilvusFiled").toString(); String sMilvusFiledDescription = rMap.get("sMilvusFiledDescription").toString(); + String sMilvusFiledDescriptionAll = rMap.get("sMilvusFiledDescriptionAll").toString(); List fields = (List) rMap.get("sFileds"); // List> title = (List>) rMap.get("title"); String milvusFilter = aiAgent.getMilvusFilter(session.getUserId(),userInput, sMilvusFiled, sMilvusFiledDescription); List> data = milvusService.getDataToCollection(sInputTabelName, milvusFilter,userInput,100,fields); //采用表格形式显示 - resultExplain = aiAgent.explainMilvusResult(session.getUserId(),userInput,sMilvusFiledDescription,JSONObject.toJSONString(data)); + resultExplain = aiAgent.explainMilvusResult(session.getUserId(),userInput,sMilvusFiledDescriptionAll,JSONObject.toJSONString(data)); //buildMarkdownTableWithStream(data, title); return resultExplain; }catch (Exception e){ @@ -778,6 +870,40 @@ public class XlyErpService { return sb.toString(); } + /** + * @Author 钱豹 + * @Date 13:32 2026/2/6 + * @Param [input, session] + * @return reactor.core.publisher.Flux + * @Description 获取智普通智能体(流式返回) + **/ + private Flux getChatiAgentStream(String input, UserSceneSession session) { + String sceneName = ObjectUtil.isNotEmpty(session.getCurrentScene()) + ? session.getCurrentScene().getSSceneName() : StrUtil.EMPTY; + String methodName = ObjectUtil.isNotEmpty(session.getCurrentTool()) + ? session.getCurrentTool().getSMethodName() : "随便聊聊"; + + // 从缓存获取或创建ChatiAgent + ChatiAgent chatiAgent = UserSceneSessionService.CHAT_AGENT_CACHE.get(session.getUserId()); + if (ObjectUtil.isEmpty(chatiAgent)) { + chatiAgent = AiServices.builder(ChatiAgent.class) + .chatLanguageModel(chatiModel) + .chatMemoryProvider(operableChatMemoryProvider) + .build(); + UserSceneSessionService.CHAT_AGENT_CACHE.put(session.getUserId(), chatiAgent); + } + + // 调用流式聊天方法 + return chatiAgent.chatStream(session.getUserId(), input) + .map(chunk -> AiResponseDTO.builder() + .sSceneName(sceneName) + .sMethodName(methodName) + .aiText(chunk) + .systemText(StrUtil.EMPTY) + .sReturnType(ReturnTypeCode.STREAM.getCode()) + .build()); + } + /*** * @Author 钱豹 * @Date 13:32 2026/2/6 diff --git a/src/main/java/com/xly/tts/service/PythonTtsProxyService.java b/src/main/java/com/xly/tts/service/PythonTtsProxyService.java index f30d1f3..d573917 100644 --- a/src/main/java/com/xly/tts/service/PythonTtsProxyService.java +++ b/src/main/java/com/xly/tts/service/PythonTtsProxyService.java @@ -84,8 +84,6 @@ public class PythonTtsProxyService { /** * 流式ERP + 流式TTS合成 - * 先流式输出ERP文本,完成后自动开始TTS合成 - * 使用现有TTSResponseDTO字段 */ public Flux synthesizeStreamAiStream(TTSRequestDTO request) { String userInput = request.getText(); @@ -95,95 +93,121 @@ public class PythonTtsProxyService { String sSubsidiaryId = request.getSubsidiaryid(); String sUserType = request.getUsertype(); String authorization = request.getAuthorization(); - String requestId = UUID.randomUUID().toString(); - log.info("开始流式处理: requestId={}, userId={}", requestId, sUserId); - // 创建累积器(用于累积完整的AiResponseDTO) - AiResponseAccumulator accumulator = new AiResponseAccumulator(requestId); + log.info("开始流式处理: requestId={}, userId={}", requestId, sUserId); // 1. 处理ERP流,将AiResponseDTO转换为TTSResponseDTO Flux erpStream = xlyErpService.erpUserInputStream( userInput, sUserId, sUserName, sBrandsId, sSubsidiaryId, sUserType, authorization ) - .doOnNext(aiResponse -> { - // 设置请求ID - aiResponse.setRequestId(requestId); - // 后台累积完整文本(为后续TTS做准备) - accumulator.accumulate(aiResponse); - log.debug("收到ERP片段: requestId={}, chunk={}/{}", - requestId, - aiResponse.getChunkIndex(), - aiResponse.getTotalChunks()); - }) .map(aiResponse -> { // 将AiResponseDTO转换为TTSResponseDTO - // 使用processedText字段传递AI文本片段 - // 使用systemText字段传递系统文本片段 - return TTSResponseDTO.builder() + TTSResponseDTO dto = TTSResponseDTO.builder() .code(200) - .message("ERP_CHUNK") // message字段标记为ERP文本块 + .message("ERP_CHUNK") .requestId(requestId) - .processedText(aiResponse.getTextFragment()) // 用processedText传递AI文本片段 - .systemText(aiResponse.getSystemTextFragment()) // 用systemText传递系统文本片段 + .processedText(aiResponse.getAiText()) // 使用 aiText 字段传递文本 + .systemText(aiResponse.getSystemText()) .sSceneName(aiResponse.getSSceneName()) .sMethodName(aiResponse.getSMethodName()) .sReturnType(aiResponse.getSReturnType()) .timestamp(System.currentTimeMillis()) .build(); + + log.debug("发送ERP片段: requestId={}, text长度={}", requestId, aiResponse.getAiText() != null ? aiResponse.getAiText().length() : 0); + return dto; + }); + + // 2. 收集完整的ERP响应(用于TTS) + List textChunks = new ArrayList<>(); + List systemTextChunks = new ArrayList<>(); + Flux erpWithCollect = erpStream + .doOnNext(dto -> { + // 收集文本片段 + if (dto.getProcessedText() != null) { + textChunks.add(dto.getProcessedText()); + } + if (dto.getSystemText() != null) { + systemTextChunks.add(dto.getSystemText()); + } }); - // 2. ERP完成后,发送完成标记,然后开始TTS合成 - return erpStream + // 3. ERP完成后,发送完成标记并开始TTS + return erpWithCollect .concatWith(Flux.defer(() -> { - // 获取完整的累积结果 - AiResponseDTO completeResponse = accumulator.getCompleteResponse(); + // 合并所有文本片段 + String fullText = String.join("", textChunks); + String fullSystemText = String.join("", systemTextChunks); - // 验证ERP结果 - if (StrUtil.isBlank(completeResponse.getAiText())) { + if (StrUtil.isBlank(fullText)) { log.warn("ERP返回空文本: requestId={}", requestId); - return Flux.error(new RuntimeException("ERP返回空文本")); + return Flux.just(TTSResponseDTO.error(requestId, 500, "ERP返回空文本")); } - log.info("ERP流式处理完成,开始TTS合成: requestId={}, aiText长度={}", - requestId, completeResponse.getAiText().length()); + log.info("ERP流式处理完成,开始TTS合成: requestId={}, 文本长度={}", + requestId, fullText.length()); - // 3. 发送ERP完成消息(使用完整文本) - TTSResponseDTO erpCompleteDto = TTSResponseDTO.builder() + // 发送ERP完成消息 + TTSResponseDTO completeDto = TTSResponseDTO.builder() .code(200) - .message("ERP_COMPLETE") // message标记完成 + .message("ERP_COMPLETE") .requestId(requestId) - .processedText(completeResponse.getAiText()) // 完整AI文本 - .systemText(completeResponse.getSystemText()) // 完整系统文本 - .sSceneName(completeResponse.getSSceneName()) - .sMethodName(completeResponse.getSMethodName()) - .sReturnType(completeResponse.getSReturnType()) +// .processedText(fullText) +// .systemText(fullSystemText) .timestamp(System.currentTimeMillis()) .build(); +// AiResponseDTO aiResponseDTO = AiResponseDTO.builder().aiText(fullText).systemText(fullSystemText); - // 4. 调用TTS合成(返回TTSResponseDTO流) - Flux ttsStream = synthesizeStreamAiNew(request, completeResponse) - .doOnNext(ttsResponse -> { - ttsResponse.setRequestId(requestId); - ttsResponse.setMessage("TTS_SEGMENT"); // message标记为TTS音频段 - }); - - // 先发送ERP完成消息,再发送TTS流 - return Flux.concat(Flux.just(erpCompleteDto), ttsStream); + // 调用TTS合成 +// synthesizeStreamAi(request, aiResponseDTO); + // 先发送完成消息,再发送TTS流 + return Flux.just(completeDto); + //Flux.concat(Flux.just(completeDto), ttsStream); })) - // 超时控制 .timeout(Duration.ofSeconds(120)) - // 错误处理 .onErrorResume(e -> { - log.error("流式处理失败: requestId={}, error={}", requestId, e.getMessage()); + log.error("流式处理失败: requestId={}", requestId, e); return Flux.just(TTSResponseDTO.error(requestId, 500, e.getMessage())); }) - // 日志记录 .doOnComplete(() -> log.info("流式处理完成: requestId={}", requestId)) .doOnCancel(() -> log.warn("流式处理取消: requestId={}", requestId)); } +// /** +// * 调用TTS服务进行语音合成(流式返回) +// */ +// private Flux synthesizeTtsStream(TTSRequestDTO originalRequest, +// String text, +// String requestId) { +// // 构建TTS请求 +// TTSRequestDTO ttsRequest = TTSRequestDTO.builder() +// .userid(originalRequest.getUserid()) +// .text(text) +// .voice(originalRequest.getVoice()) +// .rate(originalRequest.getRate()) +// .volume(originalRequest.getVolume()) +// .voiceless(originalRequest.getVoiceless()) +// .build(); +// +// // 调用Python TTS服务 +// return webClient.post() +// .uri("http://python-service:5000/api/tts/synthesize") +// .contentType(MediaType.APPLICATION_JSON) +// .bodyValue(ttsRequest) +// .retrieve() +// .bodyToFlux(TTSResponseDTO.class) +// .doOnNext(ttsResponse -> { +// ttsResponse.setRequestId(requestId); +// ttsResponse.setMessage("TTS_SEGMENT"); +// log.debug("发送TTS片段: requestId={}, audioSize={}", +// requestId, +// ttsResponse.getAudio() != null ? ttsResponse.getAudio().length() : 0); +// }); +// } + + diff --git a/src/main/java/com/xly/web/TTSStreamController.java b/src/main/java/com/xly/web/TTSStreamController.java index c72a9d1..58b3372 100644 --- a/src/main/java/com/xly/web/TTSStreamController.java +++ b/src/main/java/com/xly/web/TTSStreamController.java @@ -85,24 +85,18 @@ public class TTSStreamController { return pythonTtsProxyService.synthesizeStreamAi(request); } - /** - * 流式合成语音(代理到Python服务) - */ @PostMapping(value = "/stream/queryFlux", - consumes = {MediaType.APPLICATION_JSON_VALUE}, - produces = {MediaType.APPLICATION_OCTET_STREAM_VALUE, - MediaType.APPLICATION_JSON_VALUE}) - public Flux streamFlux(@Valid @RequestBody Mono requestMono) { - return requestMono.flatMapMany(request -> { - log.info("处理请求: requestId={}, text长度={}", request.getUserid(), request.getText().length()); - return pythonTtsProxyService.synthesizeStreamAiStream(request); - }) + consumes = MediaType.APPLICATION_JSON_VALUE, + produces = {MediaType.APPLICATION_NDJSON_VALUE, MediaType.APPLICATION_JSON_VALUE}) + public Flux streamFlux(@Valid @RequestBody TTSRequestDTO request) { + return pythonTtsProxyService.synthesizeStreamAiStream(request) .doOnSubscribe(sub -> log.debug("流式订阅开始")) .doOnCancel(() -> log.debug("流式请求被取消")) .doOnComplete(() -> log.debug("流式响应完成")) .doOnError(e -> log.error("流式处理错误", e)); } + @GetMapping("/audio/piece") public ResponseEntity> getPiece( @RequestParam String cacheKey, diff --git a/src/main/resources/templates/chat.html b/src/main/resources/templates/chat.html index 329b1af..92c99b0 100644 --- a/src/main/resources/templates/chat.html +++ b/src/main/resources/templates/chat.html @@ -594,6 +594,7 @@ checkPiece(); } + // 修改 doMessage 函数,改为调用新的流式接口 async function doMessage(input, message, button) { addMessage(message, 'user'); showTypingIndicator(); @@ -602,6 +603,9 @@ const requestData = { text: message, userid: userid, + username: username, + brandsid: brandsid, + subsidiaryid: subsidiaryid, usertype: usertype, authorization: authorization, voice: "zh-CN-XiaoxiaoNeural", @@ -610,26 +614,173 @@ voiceless: true }; - const response = await fetch(`${CONFIG.backendUrl}/api/tts/stream/query`, { + // 创建临时消息元素用于流式追加内容 + const tempMessageId = `temp-${Date.now()}`; + const messagesDiv = $('#chatMessages'); + hideTypingIndicator(); + + // 创建一个临时的AI消息容器 + const messageHtml = ` +
+
+
+
+ ${getCurrentTime()} +
+ + +
+
+
+
+ `; + messagesDiv.append(messageHtml); + scrollToBottom(); + + let fullText = ''; + let cacheKey = null; + let audioSize = 0; + let hasReceivedComplete = false; + + // 调用流式接口 + const response = await fetch(`${CONFIG.backendUrl}/api/tts/stream/queryFlux`, { method: "POST", - headers: { "Content-Type": "application/json;charset=UTF-8" }, + headers: { + "Content-Type": "application/json;charset=UTF-8", + "Accept": "application/x-ndjson, application/json" + }, body: JSON.stringify(requestData) }); - const data = await response.json(); - hideTypingIndicator(); - const replyText = (data.processedText || "") + (data.systemText || ""); - addMessage(replyText, 'ai'); + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + + for (const line of lines) { + if (line.trim() === '') continue; + + try { + const data = JSON.parse(line); + console.log('收到数据:', data.message, data); + + // 根据消息类型处理 + switch(data.message) { + case 'ERP_CHUNK': + // ERP文本片段 - 实时显示 + if (data.processedText) { + fullText += data.processedText; + const messageContent = $(`#${tempMessageId} .message-content`); + // 直接显示HTML内容(因为后端返回的是HTML格式) + messageContent.html(fullText); + scrollToBottom(); + } + break; + + case 'ERP_COMPLETE': + // ERP完成,更新完整文本 + if (data.processedText) { + fullText = data.processedText; + const messageContent = $(`#${tempMessageId} .message-content`); + messageContent.html(fullText); + scrollToBottom(); + } + hasReceivedComplete = true; + console.log('ERP完成,文本长度:', fullText.length); + break; + + case 'TTS_SEGMENT': + // TTS音频片段 - 处理音频 + if (data.audioBase64) { + const blob = base64ToBlob(data.audioBase64); + const audio = new Audio(URL.createObjectURL(blob)); + audio.play().catch(err => console.log('播放失败:', err)); + } + if (data.cacheKey) { + cacheKey = data.cacheKey; + } + if (data.audioSize) { + audioSize = data.audioSize; + } + break; + + default: + // 兼容旧格式 + if (data.processedText) { + fullText += data.processedText; + const messageContent = $(`#${tempMessageId} .message-content`); + messageContent.html(fullText); + scrollToBottom(); + } + if (data.cacheKey) { + cacheKey = data.cacheKey; + } + break; + } + + // 如果是最后一包且还没有播放音频 + if (data.last === true && cacheKey && audioSize > 0) { + playByIndex(cacheKey, 0, audioSize); + } - const cacheKey = data.cacheKey; - const audioSize = data.audioSize; // 总分几段 + } catch (e) { + console.error('解析流式数据失败:', e, line); + } + } + } + // 流式结束后,处理可能的音频播放 + if (cacheKey && audioSize > 0) { + playByIndex(cacheKey, 0, audioSize); + } - playByIndex(cacheKey, 0, audioSize); + // 如果收到完整内容,直接显示,不需要额外处理 + if (!hasReceivedComplete && fullText) { + const messageContent = $(`#${tempMessageId} .message-content`); + messageContent.html(fullText); + } + + // 更新最终消息ID + const finalMessageId = `msg-${Date.now()}`; + const finalMessage = $(`#${tempMessageId}`).clone(); + finalMessage.attr('id', finalMessageId); + $(`#${tempMessageId}`).remove(); + messagesDiv.append(finalMessage); + + // 更新按钮的事件绑定 + $(`#${finalMessageId} .action-btn`).each(function() { + const onclick = $(this).attr('onclick'); + if (onclick) { + $(this).attr('onclick', onclick.replace(tempMessageId, finalMessageId)); + } + }); + + // 处理消息中的可点击按钮(如果有) + $(`#${finalMessageId} .message-content [data-action]`).each(function() { + const action = $(this).attr('data-action'); + const text = $(this).attr('data-text'); + if (action === 'reset') { + $(this).on('click', function() { + reset(text); + }); + } + }); } catch (error) { console.error('错误:', error); hideTypingIndicator(); + $(`#temp-${Date.now()}`).remove(); addMessage("服务异常,请重试", 'ai'); } finally { input.prop('disabled', false); @@ -639,18 +790,10 @@ } } - function base64ToBlob(base64) { - const byteCharacters = atob(base64); - const byteNumbers = new Array(byteCharacters.length); - for (let i = 0; i < byteCharacters.length; i++) { - byteNumbers[i] = byteCharacters.charCodeAt(i); - } - return new Blob([new Uint8Array(byteNumbers)], { type: 'audio/mpeg' }); - } - + // 修改原有的 handleNormalResponse 函数(如果需要的话) async function handleNormalResponse(requestData) { try { - const response = await fetch(`${CONFIG.backendUrl}/api/tts/stream/query`, { + const response = await fetch(`${CONFIG.backendUrl}/api/tts/stream/queryFlux`, { method: 'POST', headers: CONFIG.headers, body: JSON.stringify(requestData) @@ -658,6 +801,8 @@ if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } + // 流式响应不需要在这里处理,由 doMessage 处理 + return response; } catch (error) { hideTypingIndicator(); throw error; @@ -666,6 +811,15 @@ } } + function base64ToBlob(base64) { + const byteCharacters = atob(base64); + const byteNumbers = new Array(byteCharacters.length); + for (let i = 0; i < byteCharacters.length; i++) { + byteNumbers[i] = byteCharacters.charCodeAt(i); + } + return new Blob([new Uint8Array(byteNumbers)], { type: 'audio/mpeg' }); + } + function getCurrentTime() { const now = new Date(); return now.getHours().toString().padStart(2, '0') + ':' + -- libgit2 0.22.2