package com.xly.milvus.service.impl; import cn.hutool.core.collection.ConcurrentHashSet; import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.StrUtil; import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.xly.milvus.service.AiGlobalAgentQuestionSqlEmitterService; import com.xly.milvus.service.VectorizationService; import com.xly.milvus.util.MapToJsonConverter; import com.xly.service.DynamicExeDbService; import io.milvus.v2.client.MilvusClientV2; import io.milvus.v2.common.ConsistencyLevel; import io.milvus.v2.common.DataType; import io.milvus.v2.common.IndexBuildState; import io.milvus.v2.common.IndexParam; import io.milvus.v2.service.collection.request.*; import io.milvus.v2.service.index.request.CreateIndexReq; import io.milvus.v2.service.index.request.DescribeIndexReq; import io.milvus.v2.service.index.request.DropIndexReq; import io.milvus.v2.service.index.response.DescribeIndexResp; import io.milvus.v2.service.vector.request.InsertReq; import io.milvus.v2.service.vector.request.SearchReq; import io.milvus.v2.service.vector.request.data.FloatVec; import io.milvus.v2.service.vector.response.InsertResp; import io.milvus.v2.service.vector.response.SearchResp; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.util.*; @Slf4j @Service("aiGlobalAgentQuestionSqlEmitterService") @RequiredArgsConstructor public class AiGlobalAgentQuestionSqlEmitterServiceImpl implements AiGlobalAgentQuestionSqlEmitterService { private final MilvusClientV2 milvusClient; private final VectorizationService vectorizationService; private final DynamicExeDbService dynamicExeDbService; private final String sProName ="Sp_Ai_AiGlobalAgentQuestionThread"; // 或者从配置文件读取 @Value("${milvus.vector.dimension:384}") private int VECTOR_DIM; // 缓存已加载的集合 private final Set loadedCollections = new ConcurrentHashSet<>(); /*** * @Author 钱豹 * @Date 13:06 2026/3/19 * @Param [] * @return void * @Description 插入数据 **/ @Override public void addAiGlobalAgentQuestionSqlEmitter(String sKey,Map data,String sQuestion,String sSqlContent,String cachType,String collectionName) { // 向量化 List vector = vectorizationService.textToVector(sKey); if (vector == null || vector.isEmpty()) { throw new RuntimeException("向量化失败"); } if(ObjectUtil.isEmpty(sSqlContent)){ sSqlContent = StrUtil.EMPTY; } // 2. 转换为Milvus格式 JsonObject row = convertToMilvusRow(data, vector,sQuestion,sSqlContent,cachType,sKey); //创建集合 // createCollection(collectionName); createCollectionIfNotExists(collectionName); // 3. 插入到Milvus InsertReq insertReq = InsertReq.builder() .collectionName(collectionName) .data(List.of(row)) .build(); InsertResp insertResp = milvusClient.insert(insertReq); //调用数据库插入数据库 Map searMap = dynamicExeDbService.getDoProMap(sProName, data); dynamicExeDbService.getCallPro(searMap, sProName); System.out.println("成功插入 " + insertResp.getInsertCnt() + " 条数据"); System.out.println(" - 数据预览:"); } @Override public Map queryAiGlobalAgentQuestionSqlEmitter(String searchText, String collectionName) { Map result = new HashMap<>(); log.info("开始相似度查询: collection={}, searchText={}", collectionName, searchText); // 2. 设置范围搜索参数 Map searchParams = new HashMap<>(); searchParams.put("nprobe", 10); // 对于 IP 度量,相似度范围在 [minScore, maxScore] searchParams.put("radius", 0.98); // 最小相似度 searchParams.put("range_filter", 1); // 最大相似度 // 1. 确保集合已加载 ensureCollectionLoaded(collectionName); // 1. 向量化搜索文本 List vectorList = vectorizationService.textToVector(searchText); if (vectorList == null || vectorList.isEmpty()) { throw new RuntimeException("向量化失败"); } // 2. 转换为 float[] float[] floatArray = new float[vectorList.size()]; for (int i = 0; i < vectorList.size(); i++) { floatArray[i] = vectorList.get(i); } // 查询最近插入的数据(按时间倒序) // QueryReq queryReq = QueryReq.builder() // .collectionName(collectionName) // .outputFields(Arrays.asList("sQuestion", "sSqlContent", "data_id", "create_time","metadata")) // .limit(100) // .build(); // QueryResp queryResp = milvusClient.query(queryReq); // 3. 创建 Milvus FloatVec 对象 FloatVec floatVec = new FloatVec(floatArray); // 4. 构建搜索请求 SearchReq searchReq = SearchReq.builder() .collectionName(collectionName) .data(Collections.singletonList(floatVec)) .annsField("vector") // 向量字段名 .topK(10) // 返回最相似的10条 .metricType(IndexParam.MetricType.IP) // 内积相似度 .outputFields(Arrays.asList("sQuestion", "sSqlContent", "data_id","cachType", "create_time","metadata")) .searchParams(searchParams) .build(); // 5. 执行搜索 SearchResp searchResp = milvusClient.search(searchReq); // 6. 处理结果 List> searchResults = searchResp.getSearchResults(); if(ObjectUtil.isEmpty(searchResults)){ return result; } List firstResultList = searchResults.get(0); if(ObjectUtil.isEmpty(firstResultList)){ return result; } firstResultList.sort((a, b) -> Float.compare(b.getScore(), a.getScore())); SearchResp.SearchResult item = firstResultList.get(0); Map itemMap = new HashMap<>(); itemMap.put("score", item.getScore()); itemMap.put("id", item.getId()); itemMap.putAll(item.getEntity()); return itemMap; } /** * 确保集合已加载 */ private void ensureCollectionLoaded(String collectionName) { try { // 如果已经加载过,直接返回 if (loadedCollections.contains(collectionName)) { return; } log.info("检查集合加载状态: {}", collectionName); // 检查集合是否存在 HasCollectionReq hasCollectionReq = HasCollectionReq.builder() .collectionName(collectionName) .build(); boolean exists = milvusClient.hasCollection(hasCollectionReq); if (!exists) { log.error("集合不存在: {}", collectionName); throw new RuntimeException("集合不存在: " + collectionName); } // 获取加载状态 GetLoadStateReq getLoadStateReq = GetLoadStateReq.builder() .collectionName(collectionName) .build(); boolean isLoaded = milvusClient.getLoadState(getLoadStateReq); if (!isLoaded) { log.info("加载集合到内存: {}", collectionName); // 加载集合 LoadCollectionReq loadCollectionReq = LoadCollectionReq.builder() .collectionName(collectionName) .build(); milvusClient.loadCollection(loadCollectionReq); // 等待加载完成 waitForCollectionLoaded(collectionName); loadedCollections.add(collectionName); log.info("集合加载完成: {}", collectionName); } else { loadedCollections.add(collectionName); log.info("集合已加载: {}", collectionName); } } catch (Exception e) { log.error("确保集合加载失败", e); throw new RuntimeException("集合加载失败: " + collectionName, e); } } /** * 等待集合加载完成 */ private void waitForCollectionLoaded(String collectionName) { int maxRetries = 30; int retryInterval = 1000; // 1秒 for (int i = 0; i < maxRetries; i++) { try { GetLoadStateReq getLoadStateReq = GetLoadStateReq.builder() .collectionName(collectionName) .build(); boolean isLoaded = milvusClient.getLoadState(getLoadStateReq); if (isLoaded) { log.info("集合加载状态确认: {}", collectionName); return; } Thread.sleep(retryInterval); } catch (Exception e) { log.warn("检查加载状态失败,重试 {}/{}", i + 1, maxRetries); } } throw new RuntimeException("集合加载超时: " + collectionName); } /** * 从实体对象构建Milvus插入数据 */ public JsonObject convertToMilvusRow(Map data, List vector,String sQuestion,String sSqlContent,String cachType,String sKey) { JsonObject row = new JsonObject(); // 添加向量 JsonArray vectorArray = new JsonArray(); vector.forEach(vectorArray::add); row.add("vector", vectorArray); // 添加文本字段 row.addProperty("sKey", sKey); row.addProperty("data_id", data.get("sId").toString()); row.addProperty("sQuestion", sQuestion); row.addProperty("sSqlContent", sSqlContent); row.addProperty("cachType", cachType); // 创建时间字段 - 必须提供! row.addProperty("create_time", System.currentTimeMillis()); // 创建时间字段 - 必须提供! // row.add("create_time", JsonValue.from(System.currentTimeMillis())); // 添加业务字段到metadata JsonObject metadata = MapToJsonConverter.convert(data); row.add("metadata", metadata); return row; } /** * 创建集合(如果不存在) */ public void createCollectionIfNotExists(String collectionName) { try { // 检查集合是否存在 HasCollectionReq hasCollectionReq = HasCollectionReq.builder() .collectionName(collectionName) .build(); boolean exists = milvusClient.hasCollection(hasCollectionReq); if (!exists) { createCollection(collectionName); log.info("集合 {} 创建成功", collectionName); } } catch (Exception e) { log.error("检查/创建集合失败: {}", collectionName, e); throw new RuntimeException("初始化Milvus集合失败", e); } } /** * 创建集合(定义字段结构) */ private void createCollection(String collectionName) { //删除现有集合 // DropCollectionReq dropCollectionReq = DropCollectionReq.builder() // .collectionName(collectionName) // .build(); // milvusClient.dropCollection(dropCollectionReq); // 定义字段列表 List fieldSchemas = Arrays.asList( // 1. 主键字段 CreateCollectionReq.FieldSchema.builder() .name("id") .dataType(DataType.Int64) .isPrimaryKey(true) .autoID(true) // 使用自动ID .description("主键ID") .build(), // 2. 向量字段 CreateCollectionReq.FieldSchema.builder() .name("vector") .dataType(DataType.FloatVector) .dimension(VECTOR_DIM) .description("向量字段,用于相似性搜索") .build(), // 3. 问题字段 CreateCollectionReq.FieldSchema.builder() .name("sQuestion") .dataType(DataType.VarChar) .maxLength(5000) .description("用户问题") .build(), // 4. SQL内容字段 - 设置为可空 CreateCollectionReq.FieldSchema.builder() .name("sSqlContent") .dataType(DataType.VarChar) .maxLength(50000) // SQL可能较长 .isPrimaryKey(false) .isNullable(true) // 设置为 true,允许为空 .description("SQL语句") .build(), // 4. 缓存类型 CreateCollectionReq.FieldSchema.builder() .name("cachType") .dataType(DataType.VarChar) .maxLength(100) // 缓存类型 .description("缓存类型") .build(), // 5. 数据ID字段 CreateCollectionReq.FieldSchema.builder() .name("data_id") .dataType(DataType.VarChar) .maxLength(500) // 增加最大长度 .description("原始数据ID") .build(), // 6. 创建时间字段 CreateCollectionReq.FieldSchema.builder() .name("create_time") .dataType(DataType.Int64) .description("创建时间戳") .build(), // 7. 元数据字段(使用JSON类型存储额外数据) CreateCollectionReq.FieldSchema.builder() .name("metadata") .dataType(DataType.JSON) .description("额外元数据") .build(), CreateCollectionReq.FieldSchema.builder() .name("sKey") .dataType(DataType.VarChar) .maxLength(1000) // 增加最大长度 .description("存入的vector转换前数据") .build() ); // 创建集合schema CreateCollectionReq.CollectionSchema schema = CreateCollectionReq.CollectionSchema.builder() .fieldSchemaList(fieldSchemas) .enableDynamicField(true) .build(); // 创建集合请求 CreateCollectionReq createCollectionReq = CreateCollectionReq.builder() .collectionName(collectionName) .collectionSchema(schema) .consistencyLevel(ConsistencyLevel.BOUNDED) .build(); // 执行创建集合 milvusClient.createCollection(createCollectionReq); //创建索引 createIndexesStepByStep(collectionName); } /* * 分步创建索引,便于监控每个索引的状态 */ private void createIndexesStepByStep(String collectionName) { log.info("开始为集合创建索引: {}", collectionName); createAllIndexes(collectionName); // // 1. 创建向量索引 // createVectorIndex(collectionName); // // // 2. 创建标量索引 // createScalarIndexes(collectionName); } /** * 创建向量索引 * IVF_FLAT 向量相似度搜索 常用的向量索引,平衡性能和召回率 * STL_SORT 标量字段排序 适用于数字、时间等需要排序的字段 * INVERTED 文本字段过滤 倒排索引,适用于文本字段的精确匹配 * TRIE 字符串前缀匹配 适用于前缀查询 * BITMAP 枚举值过滤 适用于低基数字段 */ private void createVectorIndex(String collectionName) { log.info("创建向量索引: {}", collectionName); Map extraParams = new HashMap<>(); extraParams.put("nlist", 128); IndexParam vectorIndex = IndexParam.builder() .fieldName("vector") .indexName("idx_vector") .indexType(IndexParam.IndexType.IVF_FLAT) .metricType(IndexParam.MetricType.IP) .extraParams(extraParams) .build(); CreateIndexReq createIndexReq = CreateIndexReq.builder() .collectionName(collectionName) .indexParams(Collections.singletonList(vectorIndex)) .sync(true) .timeout(60000L) .build(); milvusClient.createIndex(createIndexReq); log.info("向量索引创建完成"); } /** * 创建标量索引 */ private void createScalarIndexes(String collectionName) { log.info("创建标量索引: {}", collectionName); // 为 create_time 字段创建索引 IndexParam timeIndex = IndexParam.builder() .fieldName("create_time") .indexName("idx_create_time") .indexType(IndexParam.IndexType.STL_SORT) // 排序索引 .build(); CreateIndexReq timeIndexReq = CreateIndexReq.builder() .collectionName(collectionName) .indexParams(Collections.singletonList(timeIndex)) .sync(true) .timeout(30000L) .build(); milvusClient.createIndex(timeIndexReq); log.info("create_time 索引创建完成"); // 为 question 字段创建倒排索引(支持文本过滤) IndexParam questionIndex = IndexParam.builder() .fieldName("sQuestion") .indexName("idx_question") .indexType(IndexParam.IndexType.TRIE) // 倒排索引 .build(); CreateIndexReq questionIndexReq = CreateIndexReq.builder() .collectionName(collectionName) .indexParams(Collections.singletonList(questionIndex)) .sync(true) .timeout(30000L) .build(); milvusClient.createIndex(questionIndexReq); log.info("question 索引创建完成"); // 为 data_id 字段创建索引 IndexParam idIndex = IndexParam.builder() .fieldName("data_id") .indexName("idx_data_id") .indexType(IndexParam.IndexType.TRIE) .build(); CreateIndexReq idIndexReq = CreateIndexReq.builder() .collectionName(collectionName) .indexParams(Collections.singletonList(idIndex)) .sync(true) .timeout(30000L) .build(); milvusClient.createIndex(idIndexReq); log.info("data_id 索引创建完成"); } /** * 重建索引(解决索引未就绪问题) */ public boolean rebuildIndex(String collectionName) { log.info("========== 开始重建索引 =========="); log.info("集合名称: {}", collectionName); try { // 1. 先检查集合是否存在 HasCollectionReq hasReq = HasCollectionReq.builder() .collectionName(collectionName) .build(); if (!milvusClient.hasCollection(hasReq)) { log.error("集合不存在: {}", collectionName); return false; } // 2. 先释放集合(如果已加载) try { ReleaseCollectionReq releaseReq = ReleaseCollectionReq.builder() .collectionName(collectionName) .build(); milvusClient.releaseCollection(releaseReq); log.info("集合已释放: {}", collectionName); Thread.sleep(2000); // 等待释放完成 } catch (Exception e) { log.warn("释放集合失败(可能未加载): {}", e.getMessage()); } // 3. 删除原有索引 try { DropIndexReq dropIndexReq = DropIndexReq.builder() .collectionName(collectionName) .fieldName("vector") // 指定向量字段 .build(); milvusClient.dropIndex(dropIndexReq); log.info("原有索引已删除"); Thread.sleep(2000); // 等待删除完成 } catch (Exception e) { log.warn("删除索引失败(可能不存在): {}", e.getMessage()); } // 4. 创建新索引 createVectorIndex(collectionName); // 5. 等待索引就绪 boolean indexReady = waitForIndexReady(collectionName, "vector", 60); if (!indexReady) { log.error("索引未就绪"); return false; } // 6. 重新加载集合 LoadCollectionReq loadReq = LoadCollectionReq.builder() .collectionName(collectionName) .build(); milvusClient.loadCollection(loadReq); log.info("集合重新加载成功"); // 7. 验证加载状态 boolean loaded = waitForLoad(collectionName, 30); if (!loaded) { log.error("集合加载失败"); return false; } log.info("✅ 索引重建完成,集合已就绪: {}", collectionName); return true; } catch (Exception e) { log.error("重建索引失败", e); return false; } } /** * 等待索引就绪 */ private boolean waitForIndexReady(String collectionName, String fieldName, int timeoutSeconds) { log.info("等待索引就绪: {}.{},超时: {}秒", collectionName, fieldName, timeoutSeconds); for (int i = 0; i < timeoutSeconds; i++) { try { DescribeIndexReq describeIndexReq = DescribeIndexReq.builder() .collectionName(collectionName) .build(); DescribeIndexResp describeIndexResp = milvusClient.describeIndex(describeIndexReq); List indexDescs = describeIndexResp.getIndexDescriptions(); for (DescribeIndexResp.IndexDesc desc : indexDescs) { if (fieldName.equals(desc.getFieldName())) { IndexBuildState state = desc.getIndexState(); log.info("索引状态: {}, 进度: {}/{}", state, desc.getIndexedRows(), desc.getTotalRows()); if (state == IndexBuildState.Finished) { log.info("✅ 索引就绪"); return true; } else if (state == IndexBuildState.Failed) { log.error("❌ 索引构建失败: {}", desc.getIndexFailedReason()); return false; } break; } } Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("等待被中断"); return false; } catch (Exception e) { log.warn("检查索引状态失败: {}/{}", i + 1, timeoutSeconds); } } log.error("等待索引就绪超时"); return false; } /** * 等待集合加载完成 */ private boolean waitForLoad(String collectionName, int timeoutSeconds) { log.info("等待集合加载: {},超时: {}秒", collectionName, timeoutSeconds); for (int i = 0; i < timeoutSeconds; i++) { try { GetLoadStateReq loadStateReq = GetLoadStateReq.builder() .collectionName(collectionName) .build(); boolean isLoaded = milvusClient.getLoadState(loadStateReq); if (isLoaded) { log.info("✅ 集合加载完成"); return true; } Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; } catch (Exception e) { log.warn("检查加载状态失败: {}/{}", i + 1, timeoutSeconds); } } log.error("集合加载超时"); return false; } /** * 批量创建所有索引(向量索引 + 多个标量索引) */ private void createAllIndexes(String collectionName) { log.info("开始为集合批量创建索引: {}", collectionName); // 1. 准备所有索引参数 List allIndexParams = new ArrayList<>(); // 1.1 向量索引 Map vectorExtraParams = new HashMap<>(); vectorExtraParams.put("nlist", 256); // 聚类中心数:sqrt(384) * 13 ≈ 256 vectorExtraParams.put("nprobe", 32); // 搜索时检查的聚类数 IndexParam vectorIndex = IndexParam.builder() .fieldName("vector") .indexName("idx_vector_rebuild") .indexType(IndexParam.IndexType.IVF_FLAT) .metricType(IndexParam.MetricType.IP) .extraParams(vectorExtraParams) .build(); allIndexParams.add(vectorIndex); // 1.2 create_time 字段索引(用于时间范围查询) IndexParam timeIndex = IndexParam.builder() .fieldName("create_time") .indexName("idx_create_time") .indexType(IndexParam.IndexType.STL_SORT) // 排序索引 .build(); allIndexParams.add(timeIndex); // 1.3 question 字段倒排索引(用于文本过滤) IndexParam questionIndex = IndexParam.builder() .fieldName("sQuestion") .indexName("idx_question") .indexType(IndexParam.IndexType.TRIE) // 倒排索引 .build(); allIndexParams.add(questionIndex); // 1.4 data_id 字段索引(用于精确匹配) IndexParam idIndex = IndexParam.builder() .fieldName("data_id") .indexName("idx_data_id") .indexType(IndexParam.IndexType.TRIE) .build(); allIndexParams.add(idIndex); IndexParam idx_cach_type = IndexParam.builder() .fieldName("cachType") .indexName("idx_cach_type") .indexType(IndexParam.IndexType.TRIE) .build(); allIndexParams.add(idx_cach_type); IndexParam sKey = IndexParam.builder() .fieldName("sKey") .indexName("s_key") .indexType(IndexParam.IndexType.TRIE) .build(); allIndexParams.add(sKey); // 1.5 sql_content 字段索引(如果需要) // IndexParam sqlIndex = IndexParam.builder() // .fieldName("sql_content") // .indexName("idx_sql_content") // .indexType(IndexParam.IndexType.INVERTED) // .build(); // allIndexParams.add(sqlIndex); // 2. 批量创建索引 try { CreateIndexReq createIndexReq = CreateIndexReq.builder() .collectionName(collectionName) .indexParams(allIndexParams) // 一次性传入所有索引 .sync(true) // 同步等待 .timeout(120000L) // 总超时时间120秒 .build(); milvusClient.createIndex(createIndexReq); log.info("所有索引批量创建成功: {}", collectionName); } catch (Exception e) { log.error("批量创建索引失败: {}", e.getMessage()); createScalarIndexes(collectionName); } } }