From 04f4fe5b200e9cf922e7f9ebd6796b802b514096 Mon Sep 17 00:00:00 2001 From: zxstty Date: Mon, 27 Oct 2025 21:23:42 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=95=B0=E6=8D=AE=E5=BA=93?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data_chain/manager/chunk_manager.py | 2 -- ...mic_weighted_keyword_and_vector_searcher.py | 18 +++++++++++------- data_chain/stores/database/database.py | 7 ++++++- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/data_chain/manager/chunk_manager.py b/data_chain/manager/chunk_manager.py index a741ba8..d0c9ac6 100644 --- a/data_chain/manager/chunk_manager.py +++ b/data_chain/manager/chunk_manager.py @@ -380,8 +380,6 @@ class ChunkManager(): # 按相似度分数排序(逻辑不变) stmt = stmt.order_by(similarity_score.desc()) stmt = stmt.limit(top_k) - - # 执行最终查询(逻辑不变) result = await session.execute(stmt) chunk_entities = result.scalars().all() logging.warning( diff --git a/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py b/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py index 3f2e6d6..a5467d4 100644 --- a/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py +++ b/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py @@ -11,7 +11,7 @@ from data_chain.embedding.embedding import Embedding from data_chain.entities.enum import SearchMethod -class KeywordVectorSearcher(BaseSearcher): +class DynamicKeywordVectorSearcher(BaseSearcher): """ 关键词向量检索 """ @@ -34,28 +34,32 @@ class KeywordVectorSearcher(BaseSearcher): kb_id, query, max(top_k//3, 1), doc_ids, banned_ids) banned_ids += [chunk_entity.id for chunk_entity in chunk_entities_get_by_keyword] keywords, weights = TokenTool.get_top_k_keywords_and_weights(query) - logging.error(f"[KeywordVectorSearcher] keywords: {keywords}, weights: {weights}") + logging.error(f"[DynamicKeywordVectorSearcher] keywords: {keywords}, weights: {weights}") + import time + start_time = time.time() chunk_entities_get_by_dynamic_weighted_keyword = await ChunkManager.get_top_k_chunk_by_kb_id_dynamic_weighted_keyword(kb_id, keywords, weights, top_k//2, doc_ids, banned_ids) + end_time = time.time() + logging.info(f"[DynamicKeywordVectorSearcher] 动态关键字检索成功完成,耗时: {end_time - start_time:.2f}秒") banned_ids += [chunk_entity.id for chunk_entity in chunk_entities_get_by_dynamic_weighted_keyword] chunk_entities_get_by_vector = [] for _ in range(3): try: import time start_time = time.time() - logging.error(f"[KeywordVectorSearcher] 开始进行向量检索,top_k: {top_k-len(chunk_entities_get_by_keyword)-len(chunk_entities_get_by_dynamic_weighted_keyword)}") + logging.error(f"[DynamicKeywordVectorSearcher] 开始进行向量检索,top_k: {top_k-len(chunk_entities_get_by_keyword)-len(chunk_entities_get_by_dynamic_weighted_keyword)}") chunk_entities_get_by_vector = await asyncio.wait_for(ChunkManager.get_top_k_chunk_by_kb_id_vector(kb_id, vector, top_k-len(chunk_entities_get_by_keyword)-len(chunk_entities_get_by_dynamic_weighted_keyword), doc_ids, banned_ids), timeout=20) end_time = time.time() - logging.info(f"[KeywordVectorSearcher] 向量检索成功完成,耗时: {end_time - start_time:.2f}秒") + logging.info(f"[DynamicKeywordVectorSearcher] 向量检索成功完成,耗时: {end_time - start_time:.2f}秒") break except Exception as e: import traceback - err = f"[KeywordVectorSearcher] 向量检索失败,error: {e}, traceback: {traceback.format_exc()}" + err = f"[DynamicKeywordVectorSearcher] 向量检索失败,error: {e}, traceback: {traceback.format_exc()}" logging.error(err) continue - logging.error(f"[KeywordVectorSearcher] chunk_entities_get_by_keyword: {len(chunk_entities_get_by_keyword)}, chunk_entities_get_by_dynamic_weighted_keyword: {len(chunk_entities_get_by_dynamic_weighted_keyword)}, chunk_entities_get_by_vector: {len(chunk_entities_get_by_vector)}") + logging.error(f"[DynamicKeywordVectorSearcher] chunk_entities_get_by_keyword: {len(chunk_entities_get_by_keyword)}, chunk_entities_get_by_dynamic_weighted_keyword: {len(chunk_entities_get_by_dynamic_weighted_keyword)}, chunk_entities_get_by_vector: {len(chunk_entities_get_by_vector)}") chunk_entities = chunk_entities_get_by_keyword + chunk_entities_get_by_dynamic_weighted_keyword + chunk_entities_get_by_vector except Exception as e: - err = f"[KeywordVectorSearcher] 关键词向量检索失败,error: {e}" + err = f"[DynamicKeywordVectorSearcher] 关键词向量检索失败,error: {e}" logging.exception(err) return [] return chunk_entities diff --git a/data_chain/stores/database/database.py b/data_chain/stores/database/database.py index 0eb976d..7d15991 100644 --- a/data_chain/stores/database/database.py +++ b/data_chain/stores/database/database.py @@ -603,11 +603,16 @@ class DataBase: database_url = f"opengauss+asyncpg://{config['DATABASE_USER']}:{encoded_password}@{config['DATABASE_HOST']}:{config['DATABASE_PORT']}/{config['DATABASE_DB']}" else: database_url = f"postgresql+asyncpg://{config['DATABASE_USER']}:{encoded_password}@{config['DATABASE_HOST']}:{config['DATABASE_PORT']}/{config['DATABASE_DB']}" + import os + pool_size = os.cpu_count() + if pool_size is None: + pool_size = 5 engine = create_async_engine( database_url, echo=False, pool_recycle=300, - pool_pre_ping=True + pool_pre_ping=True, + pool_size=pool_size ) init_all_table_flag = False -- Gitee