package com.xly.milvus.service.impl; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.ConcurrentHashSet; import cn.hutool.core.date.DateUtil; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.xly.milvus.service.MilvusService; import com.xly.milvus.service.VectorizationService; import com.xly.milvus.util.MapToJsonConverter; import com.xly.milvus.util.MilvusTimeUtil; import com.xly.service.DynamicExeDbService; import com.xly.tts.bean.TTSResponseDTO; import io.milvus.v2.client.MilvusClientV2; import io.milvus.v2.common.ConsistencyLevel; import io.milvus.v2.common.DataType; import io.milvus.v2.common.IndexParam; import io.milvus.v2.service.collection.request.*; import io.milvus.v2.service.vector.request.DeleteReq; import io.milvus.v2.service.vector.request.InsertReq; import io.milvus.v2.service.vector.request.SearchReq; import io.milvus.v2.service.vector.request.data.FloatVec; import io.milvus.v2.service.vector.response.InsertResp; import io.milvus.v2.service.vector.response.SearchResp; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.lang.reflect.Type; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; /** * 向量化服务实现 - 使用LangChain4j的All-MiniLM-L6-v2模型 */ @Slf4j @Service(value = "milvusService") @RequiredArgsConstructor public class MilvusServiceImpl implements MilvusService { private final MilvusClientV2 milvusClient; private final VectorizationService vectorizationService; private final DynamicExeDbService dynamicExeDbService; private static final long NULL_TIMESTAMP = -1L; // 日期格式常量 private static final DateTimeFormatter ISO_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"); // 或者从配置文件读取 @Value("${milvus.vector.dimension:384}") private int VECTOR_DIM; // 缓存已经初始化过的 Milvus 集合(线程安全) private final Set loadedCollections = new ConcurrentHashSet<>(); /*** * @Author 钱豹 * @Date 22:18 2026/3/24 * @Param [reqMap] * @return void * @Description 初始化结构以及数据 **/ @Override public TTSResponseDTO initDataToMilvus(Map reqMap) { if(ObjectUtil.isEmpty(reqMap.get("sInputTabelName"))){ return TTSResponseDTO.builder() .code(-1) .message("输入表名") .build(); } if(ObjectUtil.isEmpty(reqMap.get("sVectorfiled"))){ return TTSResponseDTO.builder() .code(-1) .message("向量库标量字段") .build(); } if(ObjectUtil.isEmpty(reqMap.get("sVectorjson"))){ return TTSResponseDTO.builder() .code(-1) .message("向量化内容JSON") .build(); } String sInputTabelName = reqMap.get("sInputTabelName").toString(); String sVectorfiled = reqMap.get("sVectorfiled").toString(); String sVectorjson = reqMap.get("sVectorjson").toString(); //创建集合 createCollectionIfNotExists(sInputTabelName, sVectorfiled, sVectorjson,true); String tUpdateDate = DateUtil.now(); String tUpdateDateUp = getUpdateDateUp(sInputTabelName); //获取需要同步地数据 List> data = getAddData(sInputTabelName,tUpdateDate, tUpdateDateUp); if(ObjectUtil.isNotEmpty(data)){ //插入数据 long num= addDataToCollection(sInputTabelName, sVectorfiled, sVectorjson,data); } addAiMilvusVectorRecord(sInputTabelName,tUpdateDate, tUpdateDateUp); return TTSResponseDTO.builder() .code(200) .message("success") .build(); } public String getUpdateDateUp(String sInputTabelName) { Map serDataMap = new HashMap<>(); String sSql ="SELECT DATE_FORMAT(tUpdateDate,'%Y-%m-%d %H:%i:%s') AS tUpdateDate FROM ai_milvus_vector_record WHERE sInputTabelName = #{sInputTabelName}"; serDataMap.put("sInputTabelName",sInputTabelName); List> data = this.dynamicExeDbService.findSql(serDataMap,sSql); if(ObjectUtil.isEmpty(data)){ return "2000-03-24"; } return data.get(0).get("tUpdateDate").toString(); } /*** * @Author 钱豹 * @Date 22:24 2026/3/24 * @Param * @return * @Description 获取需要同步地数据 **/ public List> getAddData(String sInputTabelName,String tUpdateDate,String tUpdateDateUp) { //获取需要同步地数据 Map serDataMap = new HashMap<>(); serDataMap.put("tUpdateDate",tUpdateDate); serDataMap.put("tUpdateDateUp",tUpdateDateUp); String sSql = String.format("SELECT * FROM %s WHERE tUpdateDate >= #{tUpdateDateUp} AND tUpdateDate < #{tUpdateDate}",sInputTabelName); return this.dynamicExeDbService.findSql(serDataMap,sSql); } /*** * @Author 钱豹 * @Date 22:32 2026/3/24 * @Param [sInputTabelName, tUpdateDate] * @return java.util.List> * @Description 获取更新地数据 **/ public void addAiMilvusVectorRecord(String sInputTabelName,String tUpdateDate,String tUpdateDateUp) { //获取需要同步地数据 delAiMilvusVectorRecord(sInputTabelName); Map dMap = new HashMap<>(); dMap.put("sInputTabelName",sInputTabelName); dMap.put("tUpdateDate",tUpdateDate); dMap.put("tUpdateDateUp",tUpdateDateUp); String sSql = String.format("INSERT INTO ai_milvus_vector_record(sId,sInputTabelName,tUpdateDate,tUpdateDateUp)VALUES(newId(),#{sInputTabelName},#{tUpdateDate},#{tUpdateDateUp})"); dynamicExeDbService.addSql(dMap,sSql); } /*** * @Author 钱豹 * @Date 22:32 2026/3/24 * @Param [sInputTabelName, tUpdateDate] * @return java.util.List> * @Description 获取更新地数据 **/ public void delAiMilvusVectorRecord(String sInputTabelName) { //获取需要同步地数据 Map dMap = new HashMap<>(); dMap.put("sInputTabelName",sInputTabelName); String sSql = "DELETE FROM ai_milvus_vector_record WHERE sInputTabelName = #{sInputTabelName}"; dynamicExeDbService.delSql(dMap,sSql); } /*** * @Author 钱豹 * @Date 16:31 2026/3/24 * @Param [collectionName] * @return void * @Description 创建集合 **/ @Override public void createCollectionIfNotExists(String collectionName,String sVectorfiled,String sVectorjson,Boolean bRset) { createCollection(collectionName,sVectorfiled,bRset); log.info("集合 {} 创建成功", collectionName); // 集合缓存:只加载一次 if (!loadedCollections.contains(collectionName)) { loadedCollections.add(collectionName); } } /*** * @Author 钱豹 * @Date 20:59 2026/3/24 * @Param [collectionName, sVectorfiled, sVectorjson, bRset] * @return void * @Description 新增数据集合 **/ @Override public long addDataToCollection(String collectionName, String sVectorfiled, String sVectorjson,List> data){ // 1. 参数校验(防止空参数导致崩溃) if (ObjectUtil.isEmpty(collectionName) || CollUtil.isEmpty(data)) { throw new IllegalArgumentException("参数异常:集合名/slaveId/数据不能为空"); } // 1. 转换为Milvus格式 List rows = convertToMilvusRow(data, sVectorfiled, sVectorjson); if (CollUtil.isEmpty(rows)) { return 0l; // 无数据直接返回 } // 3.先删除再插入 // 1. 构建删除请求 // 过滤条件:匹配唯一键 // 核心:从 data 中提取所有 sSlaveId,批量删除 List slaveIdList = data.stream() .map(map -> map.get("sSlaveId")) // 取每条数据的slaveId .filter(Objects::nonNull) .map(String::valueOf) .distinct() // 去重 .toList(); if (slaveIdList.isEmpty()) { throw new RuntimeException("未获取到slaveId,无法删除旧数据"); } // 拼接 Milvus 删除条件:sSlaveId in ['111','222','333'] String filter = String.format("sSlaveId in [%s]", slaveIdList.stream() .map(id -> "'" + id + "'") .collect(Collectors.joining(",")) ); // 批量删除 DeleteReq deleteReq = DeleteReq.builder() .collectionName(collectionName) .filter(filter) .build(); milvusClient.delete(deleteReq); // 短暂等待 Milvus 数据同步 ThreadUtil.sleep(100); // 4. 插入到Milvus(批量) InsertReq insertReq = InsertReq.builder() .collectionName(collectionName) .data(rows) .build(); InsertResp insertResp = milvusClient.insert(insertReq); return insertResp.getInsertCnt(); } /*** * @Author 钱豹 * @Date 13:29 2026/3/25 * @Param [sVectorfiled] * @return java.util.Map * @Description 返回组装动态内容 **/ @Override public Map getMilvusFiled(String sVectorfiled,String sVectorfiledAll,String sVectorfiledShow){ List sFileds = new ArrayList<>(); List filedsShow = new ArrayList<>(); List sFiledDescriptions = new ArrayList<>(); List sFiledDescriptionsAll = new ArrayList<>(); List> titleList = new LinkedList<>(); String[] sVectorfiledArray = sVectorfiled.split(","); for(String sVectorfiledOne : sVectorfiledArray){ String[] sVectorfiledOneArray = sVectorfiledOne.split(":"); String sDescriptions = sVectorfiledOneArray[0]; String sName = sVectorfiledOneArray[1]; sFileds.add(sName); // 处理描述中可能包含的换行,保持缩进一致 String formattedDesc =String.format("%s: %s", sName, sDescriptions); sFiledDescriptions.add(formattedDesc); } String[] sVectorfiledShowArray = sVectorfiledShow.split(","); for(String sVectorfiledShowOne : sVectorfiledShowArray){ Map title = new HashMap<>(4); String[] sVectorfiledOneArray = sVectorfiledShowOne.split(":"); String sDescriptions = sVectorfiledOneArray[0]; String sName = sVectorfiledOneArray[1]; filedsShow.add(sName); title.put("sName",sName); title.put("sTitle",sDescriptions); titleList.add(title); } String[] sVectorfiledArrayAll = sVectorfiledAll.split(","); for(String sVectorfiledOne : sVectorfiledArrayAll){ String[] sVectorfiledOneArray = sVectorfiledOne.split(":"); String sDescriptions = sVectorfiledOneArray[0]; String sName = sVectorfiledOneArray[1]; String formattedDesc =String.format("%s: %s", sName, sDescriptions); sFiledDescriptionsAll.add(formattedDesc); } Map rMap = new HashMap<>(); rMap.put("sMilvusFiled", String.join(",", sFileds)); rMap.put("sMilvusFiledDescription", String.join(",", sFiledDescriptions)); rMap.put("sMilvusFiledDescriptionAll", String.join(",", sFiledDescriptionsAll)); rMap.put("filedsShow", filedsShow); rMap.put("title", titleList); return rMap; } @Override public List> getDataToCollection(String collectionName, String milvusFilter,String searchText,Integer size,List fields){ log.info("开始相似度查询: collection={}, searchText={}", collectionName, searchText); // 2. 设置范围搜索参数 Map searchParams = new HashMap<>(); searchParams.put("nprobe", 10); // 对于 IP 度量,相似度范围在 [minScore, maxScore] searchParams.put("radius", 0.9); // 最小相似度 searchParams.put("range_filter", 1); // 最大相似度 // 1. 确保集合已加载 // ensureCollectionLoaded(collectionName); // 1. 向量化搜索文本 List vectorList = vectorizationService.textToVector(searchText); if (vectorList == null || vectorList.isEmpty()) { throw new RuntimeException("向量化失败"); } // 2. 转换为 float[] float[] floatArray = new float[vectorList.size()]; for (int i = 0; i < vectorList.size(); i++) { floatArray[i] = vectorList.get(i); } if(ObjectUtil.isEmpty(fields)){ fields = new ArrayList<>(); } fields.add("sSlaveId"); fields.add("metadata"); // 3. 创建 Milvus FloatVec 对象 FloatVec floatVec = new FloatVec(floatArray); // 4. 构建搜索请求 SearchReq searchReq = SearchReq.builder() .collectionName(collectionName) .data(Collections.singletonList(floatVec)) .annsField("vector") // 向量字段名 .topK(size) // 返回最相似的10条 .metricType(IndexParam.MetricType.IP) // 内积相似度 .outputFields(fields) // .searchParams(searchParams) .filter(milvusFilter) .build(); // 5. 执行搜索 SearchResp searchResp = milvusClient.search(searchReq); // 6. 处理结果 return processMilvusResults(searchResp); } /** * 判断 Milvus 过滤条件是否有效(支持 TEXT_MATCH 全文检索) * @param milvusFilter 过滤条件字符串 * @return true: 有效条件, false: 无效条件 */ public boolean isValidMilvusFilter(String milvusFilter) { // 1. 空值判断 if (milvusFilter == null || milvusFilter.trim().isEmpty()) { return false; } String filter = milvusFilter.trim(); // 2. 基本格式检查:不能是纯布尔值 if ("true".equalsIgnoreCase(filter) || "false".equalsIgnoreCase(filter)) { return false; } // 3. 【修改】检查是否包含有效的操作符(增加 TEXT_MATCH 支持) boolean hasValidOperator = filter.matches(".*[=!<>]=?.*") || filter.contains(" like ") || filter.toUpperCase().contains("TEXT_MATCH"); if (!hasValidOperator) { return false; } // 4. 对于复合条件,递归检查 if (filter.contains("&&") || filter.contains("||")) { // 分割复合条件(简单处理,生产环境需要更完善的解析) String[] conditions = splitConditions(filter); for (String condition : conditions) { if (!isValidCondition(condition)) { return false; } } return true; } // 5. 检查单个条件 return isValidCondition(filter); } /** * 拆分复合条件(处理括号嵌套) */ private String[] splitConditions(String filter) { List conditions = new ArrayList<>(); StringBuilder current = new StringBuilder(); int parentheses = 0; for (int i = 0; i < filter.length(); i++) { char c = filter.charAt(i); if (c == '(') { parentheses++; current.append(c); } else if (c == ')') { parentheses--; current.append(c); } else if (parentheses == 0 && (filter.startsWith("&&", i) || filter.startsWith("||", i))) { // 遇到顶层操作符,分割条件 if (current.length() > 0) { conditions.add(current.toString().trim()); current = new StringBuilder(); } i += 1; // 跳过操作符的第二个字符 } else { current.append(c); } } if (current.length() > 0) { conditions.add(current.toString().trim()); } return conditions.toArray(new String[0]); } /** * 验证单个条件(支持 TEXT_MATCH 和普通条件) */ private boolean isValidCondition(String condition) { if (condition == null || condition.trim().isEmpty()) { return false; } condition = condition.trim(); // 去除外层括号 while (condition.startsWith("(") && condition.endsWith(")")) { condition = condition.substring(1, condition.length() - 1).trim(); } // 1. 【新增】检查 TEXT_MATCH 语法 if (condition.toUpperCase().contains("TEXT_MATCH")) { return isValidTextMatch(condition); } // 2. 检查普通条件 return isValidSimpleCondition(condition); } /** * 【新增】验证 TEXT_MATCH 语法 * 格式:TEXT_MATCH(字段名, '关键词') * 或:TEXT_MATCH(字段名, "关键词") */ private boolean isValidTextMatch(String condition) { // 匹配 TEXT_MATCH(字段名, '关键词') 或 TEXT_MATCH(字段名, "关键词") Pattern pattern = Pattern.compile( "TEXT_MATCH\\s*\\(\\s*([a-zA-Z_][a-zA-Z0-9_]*)\\s*,\\s*['\"]([^'\"]*)['\"]\\s*\\)", Pattern.CASE_INSENSITIVE ); Matcher matcher = pattern.matcher(condition); if (!matcher.matches()) { log.warn("无效的 TEXT_MATCH 语法: {}", condition); return false; } String fieldName = matcher.group(1); String keyword = matcher.group(2); // 检查字段名不能为空 if (fieldName == null || fieldName.trim().isEmpty()) { log.warn("TEXT_MATCH 字段名不能为空: {}", condition); return false; } // 检查关键词不能为空 if (keyword == null || keyword.trim().isEmpty()) { log.warn("TEXT_MATCH 关键词不能为空: {}", condition); return false; } return true; } /** * 验证简单条件(不包含 && 和 ||,不包含 TEXT_MATCH) */ private boolean isValidSimpleCondition(String condition) { if (condition == null || condition.trim().isEmpty()) { return false; } condition = condition.trim(); // 匹配简单条件的正则 // 格式:字段名 操作符 值 // 字段名:字母开头,包含字母数字下划线 // 操作符:==, !=, >=, <=, >, <, like // 值:单引号字符串 或 数字 String regex = "^\\s*([a-zA-Z_][a-zA-Z0-9_]*)\\s*" + // 字段名 "(==|!=|>=|<=|>|<|like)\\s*" + // 操作符 "('([^'\\\\]|\\\\.)*'|\\d+(\\.\\d+)?)\\s*$"; // 值 if (!condition.matches(regex)) { return false; } // 【修改】额外检查:like 操作符的限制 if (condition.contains(" like ")) { String value = condition.split("like")[1].trim(); if (!value.contains("%")) { log.warn("like 操作符必须包含 % 通配符: {}", condition); return false; } // 【新增】检查是否包含前后都有通配符的模式(Milvus 不支持) if (value.matches("'%.*%'")) { log.warn("Milvus 不支持前后都有通配符的 like: {}", condition); return false; } } return true; } /** * 处理 Milvus 查询结果 */ private List> processMilvusResults(SearchResp response) { List> results = new ArrayList<>(); if (response == null) { log.warn("Milvus 响应为空"); return results; } List> searchResults = response.getSearchResults(); if (searchResults == null || searchResults.isEmpty()) { log.warn("Milvus 搜索结果为空"); return results; } // 遍历每个查询的结果集(通常只有一个查询) for (List resultList : searchResults) { // 遍历每个搜索结果 for (SearchResp.SearchResult result : resultList) { // 获取实体字段数据 Map entity = result.getEntity(); Map metadata = new HashMap<>(); if(ObjectUtil.isNotEmpty(entity.get("metadata"))){ JsonObject obj = (JsonObject) entity.get("metadata"); metadata.putAll( jsonObjectToMap(obj)); } // 获取相似度分数 Float score = result.getScore(); if (score != null) { metadata.put("score", score); } // 将所有字段添加到结果中 // item.putAll(entity); results.add(metadata); } } log.info("处理完成,共 {} 条搜索结果", results.size()); return results; } /** * JsonObject 转 Map */ public static Map jsonObjectToMap(JsonObject jsonObject) { Gson gson = new Gson(); Type type = new TypeToken>(){}.getType(); return gson.fromJson(jsonObject, type); } /** * 从实体对象构建Milvus插入数据 */ public List convertToMilvusRow(List> data, String sVectorfiled,String sVectorjson) { List rows = new ArrayList<>(); if (CollUtil.isEmpty(data)) { return rows; } // 批量遍历,逐个转换 for (Map map : data) { JsonObject jsonObject = convertToMilvusRowOne(map, sVectorfiled, sVectorjson); rows.add(jsonObject); } return rows; } /*** * @Author 钱豹 * @Date 21:31 2026/3/24 * @Param [data, sVectorfiled, sVectorjson] * @return com.google.gson.JsonObject * @Description 单个转换 **/ public JsonObject convertToMilvusRowOne(Map data, String sVectorfiled,String sVectorjson) { // ====================== 修复 1:使用真实的向量化文本 ====================== // 从 sVectorjson 或 data 中获取要向量化的字段值 StringBuffer vectorText = new StringBuffer(); getVectorText(data, vectorText, sVectorjson); // 向量化 List vector = vectorizationService.textToVector(vectorText.toString()); if (vector == null || vector.isEmpty()) { throw new RuntimeException("向量化失败,文本内容:" + vectorText); } JsonObject row = new JsonObject(); // 添加向量 JsonArray vectorArray = new JsonArray(); vector.forEach(vectorArray::add); row.add("vector", vectorArray); // ====================== 修复 2:sSlaveId 空值安全 ====================== Object slaveIdObj = data.get("sSlaveId"); if (slaveIdObj == null) { throw new RuntimeException("数据中缺少 sSlaveId 字段,无法插入Milvus"); } row.addProperty("sSlaveId", slaveIdObj.toString()); // 创建时间 row.addProperty("create_time", System.currentTimeMillis()); // 业务元数据 JsonObject metadata = MapToJsonConverter.convert(data); row.add("metadata", metadata); // 动态字段 String[] sVectorfiledArray = sVectorfiled.split(","); for (String vectorFieldOne : sVectorfiledArray) { String[] fieldArr = vectorFieldOne.split(":"); if (fieldArr.length < 2) { continue; } String fieldName = fieldArr[1]; Object value = ObjectUtil.isEmpty(data.get(fieldName)) ? getDefaultData(fieldName) : data.get(fieldName); // 通用类型安全添加(你之前的优化方法) addToJsonObject(row, fieldName, value); } return row; } private void getVectorText(Map data, StringBuffer vectorText,String sVectorjson){ // 动态字段 String[] sVectorjsonArray = sVectorjson.split(";"); for (String sVectorjsonOne : sVectorjsonArray) { String sText; String[] fieldArr = sVectorjsonOne.split(":"); if (fieldArr.length < 2) { continue; } String fieldName = fieldArr[1]; Object value = ObjectUtil.isEmpty(data.get(fieldName)) ? getDefaultData(fieldName) : data.get(fieldName); if (ObjectUtil.isEmpty(value)) { sText = StrUtil.EMPTY; }else{ sText = value.toString(); } vectorText.append(" ").append(fieldArr[0]).append(sText); } } /*******************************************************内部方法********************************************************************************/ /** * 安全将 Object 加入 Gson JsonObject,自动识别类型 */ private void addToJsonObject(JsonObject row, String fieldName, Object value) { if (value == null) { row.addProperty(fieldName, ""); return; } // 基本类型直接添加 if(fieldName.startsWith("t")){ if(ObjectUtil.isNotEmpty(value)){ long date = MilvusTimeUtil.toTimestamp(value.toString()); row.addProperty(fieldName,date); }else{ row.addProperty(fieldName,NULL_TIMESTAMP); } }else if (value instanceof String) { row.addProperty(fieldName, (String) value); } else if (value instanceof Number) { row.addProperty(fieldName, (Number) value); } else if (value instanceof Boolean) { row.addProperty(fieldName, (Boolean) value); }else if (value instanceof List) { // List / 数组类型 JsonArray jsonArray = new JsonArray(); for (Object item : (List) value) { addJsonElement(jsonArray, item); } row.add(fieldName, jsonArray); } else { // 其他对象转字符串 row.addProperty(fieldName, value.toString()); } } /** * 解析字符串为时间戳 * 支持多种格式 */ private static long parseStringToTimestamp(String dateStr) { if (dateStr == null || dateStr.trim().isEmpty()) { return NULL_TIMESTAMP; } try { // 1. 尝试 ISO 8601 格式(如:2025-07-21T09:26:09) if (dateStr.matches("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}")) { LocalDateTime ldt = LocalDateTime.parse(dateStr, ISO_FORMATTER); return ldt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); } // 2. 尝试 yyyy-MM-dd HH:mm:ss 格式 if (dateStr.matches("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}")) { DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); LocalDateTime ldt = LocalDateTime.parse(dateStr, formatter); return ldt.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); } // 3. 尝试 yyyy-MM-dd 格式(当天 00:00:00) if (dateStr.matches("\\d{4}-\\d{2}-\\d{2}")) { LocalDate ld = LocalDate.parse(dateStr); return ld.atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli(); } // 4. 尝试直接解析为时间戳数字(如果字符串全是数字) if (dateStr.matches("\\d+")) { return Long.parseLong(dateStr); } // 5. 如果都不匹配,返回空值 System.err.println("无法解析的日期格式: " + dateStr); return NULL_TIMESTAMP; } catch (Exception e) { System.err.println("日期解析失败: " + dateStr + ", 错误: " + e.getMessage()); return NULL_TIMESTAMP; } } /** * 递归处理 List 元素(支持无限层嵌套 List) */ private void addJsonElement(JsonArray jsonArray, Object item) { if (item == null) { jsonArray.add((String) null); return; } if (item instanceof String) { jsonArray.add((String) item); } else if (item instanceof Number) { jsonArray.add((Number) item); } else if (item instanceof Boolean) { jsonArray.add((Boolean) item); } else if (item instanceof List) { // 递归处理嵌套列表 JsonArray nestedArray = new JsonArray(); for (Object nestedItem : (List) item) { addJsonElement(nestedArray, nestedItem); } jsonArray.add(nestedArray); } else { jsonArray.add(item.toString()); } } /** * 创建集合(定义字段结构) */ private void createCollection(String collectionName,String sVectorfiled,Boolean bRset) { // 6. 重新加载集合 LoadCollectionReq loadReq = LoadCollectionReq.builder() .collectionName(collectionName) .build(); //是否删除集合 重新创建 if (bRset){ this.delAiMilvusVectorRecord(collectionName); // 1. 删除旧集合 milvusClient.dropCollection(DropCollectionReq.builder() .collectionName(collectionName) .build()); } // 检查集合是否存在 HasCollectionReq hasCollectionReq = HasCollectionReq.builder() .collectionName(collectionName) .build(); boolean exists = milvusClient.hasCollection(hasCollectionReq); if (exists) { return; } // 定义字段列表 List fieldSchemas = new ArrayList<>(); // 1. 准备所有索引参数 List allIndexParams = new ArrayList<>(); //定义字段列表 // 1. 主键字段 fieldSchemas.add( CreateCollectionReq.FieldSchema.builder() .name("id") .dataType(DataType.Int64) .isPrimaryKey(true) .autoID(true) // 使用自动ID .description("主键ID") .build()); // 3. 主键字段 fieldSchemas.add(CreateCollectionReq.FieldSchema.builder() .name("sSlaveId") .dataType(DataType.VarChar) // .isPrimaryKey(true) //索引创建 .maxLength(100) .description("原始数据主键ID") .build()); // 2. 向量字段 fieldSchemas.add(CreateCollectionReq.FieldSchema.builder() .name("vector") .dataType(DataType.FloatVector) .dimension(VECTOR_DIM) .description("向量字段,用于相似性搜索") .build()); // 4. 创建时间字段 fieldSchemas.add( CreateCollectionReq.FieldSchema.builder() .name("create_time") .dataType(DataType.Int64) .description("创建时间戳") .build()); // 5. 元数据字段(使用JSON类型存储额外数据) fieldSchemas.add(CreateCollectionReq.FieldSchema.builder() .name("metadata") .dataType(DataType.JSON) .description("额外元数据") .build()); //动态字段创建 String[] sVectorfiledArray = sVectorfiled.split(","); for(String sVectorfiledOne : sVectorfiledArray){ String[] sVectorfiledOneArray = sVectorfiledOne.split(":"); String sName = sVectorfiledOneArray[1]; String sDescription = sVectorfiledOneArray[0]; DataType dataType = processField(sVectorfiledOneArray[1]); fieldSchemas.add(CreateCollectionReq.FieldSchema.builder() .name(sName) .dataType(dataType) .description(sDescription) .isPrimaryKey(false) // 如果不是主键 .isNullable(true) // 允许为空 // .defaultValue("") // 如果有默认值 // SQL可能较长 .maxLength(1000) .build()); } //创建索引 createAllIndexes(sVectorfiled,allIndexParams); // 创建集合schema CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder() .fieldSchemaList(fieldSchemas) .enableDynamicField(true) .build(); // 创建集合请求 CreateCollectionReq createCollectionReq = CreateCollectionReq.builder() .collectionName(collectionName) .collectionSchema(schema) .indexParams(allIndexParams)//索引集合 .consistencyLevel(ConsistencyLevel.BOUNDED) .build(); // 执行创建集合 milvusClient.createCollection(createCollectionReq); milvusClient.loadCollection(loadReq); log.info("集合重新加载成功"); } /** * 批量创建所有索引(向量索引 + 多个标量索引) */ private void createAllIndexes(String sVectorfiled,List allIndexParams) { // 1.1 向量索引 Map vectorExtraParams = new HashMap<>(8); vectorExtraParams.put("nlist", 256); // 聚类中心数:sqrt(384) * 13 ≈ 256 vectorExtraParams.put("nprobe", 32); // 搜索时检查的聚类数 IndexParam vectorIndex = IndexParam.builder() .fieldName("vector") .indexName("idx_vector_rebuild") .indexType(IndexParam.IndexType.IVF_FLAT) .metricType(IndexParam.MetricType.IP) .extraParams(vectorExtraParams) .build(); allIndexParams.add(vectorIndex); // 1.2 create_time 字段索引(用于时间范围查询) IndexParam timeIndex = IndexParam.builder() .fieldName("create_time") .indexName("idx_create_time") .indexType(IndexParam.IndexType.STL_SORT) // 排序索引 .build(); allIndexParams.add(timeIndex); // 1.4 data_id 字段索引(用于精确匹配) IndexParam idIndex = IndexParam.builder() .fieldName("sSlaveId") .indexName("idx_data_id") .indexType(IndexParam.IndexType.TRIE) .build(); allIndexParams.add(idIndex); //动态字段创建 String[] sVectorfiledArray = sVectorfiled.split(","); for(String sVectorfiledOne : sVectorfiledArray){ String[] sVectorfiledOneArray = sVectorfiledOne.split(":"); String sName = sVectorfiledOneArray[1]; IndexParam.IndexType indexType =indexField(sVectorfiledOneArray[1]); allIndexParams.add(IndexParam.builder() .fieldName(sName) .indexName(sName) .indexType(indexType) .build()); } } /*** * @Author 钱豹 * @Date 21:10 2026/3/24 * @Param * @return * @Description //TODO **/ public Object getDefaultData(String sKey) { if(sKey.startsWith("d") || sKey.startsWith("i")){ return BigDecimal.ZERO; }else if(sKey.startsWith("b")){ return false; }else{ return StrUtil.EMPTY; } } /*** * @Author 钱豹 * @Date 20:44 2026/3/24 * @Param [sKey] * @return io.milvus.v2.common.DataType * @Description 字段类型 **/ public DataType processField(String sKey) { if(sKey.startsWith("d")){ return DataType.Double; }else if(sKey.startsWith("i")){ return DataType.Int32; }else if(sKey.startsWith("b")){ return DataType.Bool; }else if(sKey.startsWith("t")){ return DataType.Int64; }else{ return DataType.VarChar; } } /*** * @Author 钱豹 * @Date 20:44 2026/3/24 * @Param [sKey] * @return io.milvus.v2.common.DataType * @Description 索引类型 **/ public IndexParam.IndexType indexField(String sKey) { if(sKey.startsWith("d") || sKey.startsWith("i")){ return IndexParam.IndexType.STL_SORT; }else if(sKey.startsWith("t")){ return IndexParam.IndexType.STL_SORT; }else{ return IndexParam.IndexType.TRIE; } } }