From 2ecdfddac1ce90052837c8edcdf16fde92cf7ce9 Mon Sep 17 00:00:00 2001 From: zxstty Date: Fri, 7 Nov 2025 16:41:05 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E5=85=B3=E9=97=ADpddleocr=E7=9A=84?= =?UTF-8?q?=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data_chain/llm/llm.py | 2 +- data_chain/manager/chunk_manager.py | 7 +++---- data_chain/parser/tools/ocr_tool.py | 3 ++- .../rag/dynamic_weighted_keyword_and_vector_searcher.py | 4 ++-- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/data_chain/llm/llm.py b/data_chain/llm/llm.py index bad100b5..e3038de8 100644 --- a/data_chain/llm/llm.py +++ b/data_chain/llm/llm.py @@ -94,7 +94,7 @@ class LLM: index = content[::-1].find(en_str[::-1]) if index != -1: content = content[:len(content)-index] - logger.error(f"LLM nostream content: {content}") + logger.info(f"LLM nostream content: {content}") except Exception as e: err = f"[LLM] 非流式输出异常: {e}" logger.error("[LLM] %s", err) diff --git a/data_chain/manager/chunk_manager.py b/data_chain/manager/chunk_manager.py index 4ece74bb..f743f2ed 100644 --- a/data_chain/manager/chunk_manager.py +++ b/data_chain/manager/chunk_manager.py @@ -272,7 +272,6 @@ class ChunkManager(): # -------------------------- # 原有逻辑:执行查询与结果处理(完全保留) # -------------------------- - logging.error(f"执行向量查询SQL: {base_sql},参数: {params}") result = await session.execute(text(base_sql), params) rows = result.fetchall() @@ -301,7 +300,7 @@ class ChunkManager(): # 可选:查询结束后恢复enable_seqscan(避免影响后续查询,会话结束也会自动恢复) - logging.error(f"向量查询耗时:{datetime.now()-st}") + logging.info(f"向量查询耗时:{datetime.now()-st}") return chunk_entities except Exception as e: @@ -384,7 +383,7 @@ class ChunkManager(): stmt = stmt.limit(top_k) result = await session.execute(stmt) chunk_entities = result.scalars().all() - logging.warning( + logging.info( f"[ChunkManager] get_top_k_chunk_by_kb_id_keyword cost: {(datetime.now()-st).total_seconds()}s") return chunk_entities except Exception as e: @@ -453,7 +452,7 @@ class ChunkManager(): # 7. 日志输出 cost = (datetime.now() - st).total_seconds() - logging.warning( + logging.info( f"[ChunkManager] BM25查询耗时: {cost}s " f"| kb_id: {kb_id} | query: {query[:50]}... | 匹配数量: {len(chunk_entities)}" ) diff --git a/data_chain/parser/tools/ocr_tool.py b/data_chain/parser/tools/ocr_tool.py index d2db2150..0c29cf1a 100644 --- a/data_chain/parser/tools/ocr_tool.py +++ b/data_chain/parser/tools/ocr_tool.py @@ -23,7 +23,8 @@ class OcrTool: rec_model_dir=rec_model_dir, cls_model_dir=cls_model_dir, use_angle_cls=True, - lang="ch" + lang="ch", + show_log=False ) else: model = None diff --git a/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py b/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py index 32060132..f0420569 100644 --- a/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py +++ b/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py @@ -46,7 +46,7 @@ class DynamicKeywordVectorSearcher(BaseSearcher): try: import time start_time = time.time() - logging.error( + logging.info( f"[DynamicKeywordVectorSearcher] 开始进行向量检索,top_k: {top_k-len(chunk_entities_get_by_keyword)-len(chunk_entities_get_by_dynamic_weighted_keyword)}") chunk_entities_get_by_vector = await asyncio.wait_for(ChunkManager.get_top_k_chunk_by_kb_id_vector(kb_id, vector, top_k-len(chunk_entities_get_by_keyword)-len(chunk_entities_get_by_dynamic_weighted_keyword), doc_ids, banned_ids), timeout=300) end_time = time.time() @@ -58,7 +58,7 @@ class DynamicKeywordVectorSearcher(BaseSearcher): err = f"[DynamicKeywordVectorSearcher] 向量检索失败,error: {e}, traceback: {traceback.format_exc()}" logging.error(err) continue - logging.error( + logging.info( f"[DynamicKeywordVectorSearcher] chunk_entities_get_by_keyword: {len(chunk_entities_get_by_keyword)}, chunk_entities_get_by_dynamic_weighted_keyword: {len(chunk_entities_get_by_dynamic_weighted_keyword)}, chunk_entities_get_by_vector: {len(chunk_entities_get_by_vector)}") chunk_entities = chunk_entities_get_by_keyword + \ chunk_entities_get_by_dynamic_weighted_keyword + chunk_entities_get_by_vector -- Gitee From d63bf1a2cc23dbbba7c6b5d155ff21a60a6f82be Mon Sep 17 00:00:00 2001 From: zxstty Date: Fri, 7 Nov 2025 16:43:27 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E6=97=A0=E7=94=A8?= =?UTF-8?q?=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data_chain/apps/service/task_queue_service.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/data_chain/apps/service/task_queue_service.py b/data_chain/apps/service/task_queue_service.py index d9905abd..58fde2f7 100644 --- a/data_chain/apps/service/task_queue_service.py +++ b/data_chain/apps/service/task_queue_service.py @@ -23,7 +23,6 @@ class TaskQueueService: st = time.time() pending_task_entities = await TaskManager.list_task_by_task_status(TaskStatus.PENDING.value) en = time.time() - logging.info(f"[TaskQueueService] 获取待处理任务耗时 {en-st} 秒") pending_task_ids = [ task_entity.id for task_entity in pending_task_entities] pending_task_entities_in_db = await TaskQueueManager.get_tasks_by_ids(pending_task_ids) @@ -38,14 +37,12 @@ class TaskQueueService: st = time.time() running_task_entities = await TaskManager.list_task_by_task_status(TaskStatus.RUNNING.value) en = time.time() - logging.info(f"[TaskQueueService] 获取运行中任务耗时 {en-st} 秒") for task_entity in running_task_entities: # 将所有任务取消 try: st = time.time() flag = await BaseWorker.reinit(task_entity.id) en = time.time() - logging.info(f"[TaskQueueService] 重新初始化任务耗时 {en-st} 秒") if flag: st = time.time() task_need_pending_ids.append(task_entity.id) @@ -64,7 +61,6 @@ class TaskQueueService: await TaskQueueManager.update_task_by_ids( task_need_pending_ids[i:i+batch_size], TaskStatus.PENDING) en = time.time() - logging.info(f"[TaskQueueService] 更新待处理任务状态耗时 {en-st} 秒") if len(task_need_delete_ids) > 0: st = time.time() for i in range(0, len(task_need_delete_ids), batch_size): @@ -78,7 +74,6 @@ class TaskQueueService: await TaskQueueManager.add_tasks( task_entities_need_add[i:i+batch_size]) en = time.time() - logging.info(f"[TaskQueueService] 批量添加任务耗时 {en-st} 秒") @staticmethod async def init_task(task_type: str, op_id: uuid.UUID) -> uuid.UUID: -- Gitee From 6d8a766941c14477f55cd73e2657ee78c8a8c199 Mon Sep 17 00:00:00 2001 From: zxstty Date: Mon, 10 Nov 2025 15:03:40 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E8=BF=9E=E5=B8=A6=E5=88=A0=E9=99=A4=E5=AF=BC?= =?UTF-8?q?=E5=85=A5=E6=88=90=E5=8A=9F=E6=95=B0=E6=8D=AE=E9=9B=86=E7=9A=84?= =?UTF-8?q?bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../base/task/worker/export_dataset_worker.py | 5 +++-- .../task/worker/export_knowledge_base_worker.py | 2 +- .../base/task/worker/import_dataset_worker.py | 17 +++++++++++------ data_chain/apps/service/team_service.py | 2 +- data_chain/manager/team_manager.py | 2 +- 5 files changed, 17 insertions(+), 11 deletions(-) diff --git a/data_chain/apps/base/task/worker/export_dataset_worker.py b/data_chain/apps/base/task/worker/export_dataset_worker.py index 082326a8..d952f858 100644 --- a/data_chain/apps/base/task/worker/export_dataset_worker.py +++ b/data_chain/apps/base/task/worker/export_dataset_worker.py @@ -116,7 +116,8 @@ class ExportDataSetWorker(BaseWorker): cleaned_value = invalid_chars.sub('', value) # 额外处理常见问题字符(如替换冒号、斜杠等) - problematic_chars = {'\\': '', '/': '', '*': '', '?': '', '"': "'", '<': '', '>': '', ':': ''} + problematic_chars = {'\\': '', '/': '', '*': '', + '?': '', '"': "'", '<': '', '>': '', ':': ''} for char, replacement in problematic_chars.items(): cleaned_value = cleaned_value.replace(char, replacement) @@ -219,7 +220,7 @@ class ExportDataSetWorker(BaseWorker): err = f"[ImportDataSetWorker] 任务不存在,task_id: {task_id}" logging.exception(err) return None - if task_entity.status == TaskStatus.CANCLED or TaskStatus.FAILED.value: + if task_entity.status == TaskStatus.CANCLED or task_entity.status == TaskStatus.FAILED.value: await MinIO.delete_object( EXPORT_DATASET_PATH_IN_MINIO, str(task_entity.id) diff --git a/data_chain/apps/base/task/worker/export_knowledge_base_worker.py b/data_chain/apps/base/task/worker/export_knowledge_base_worker.py index 4a139da2..8ca51dc4 100644 --- a/data_chain/apps/base/task/worker/export_knowledge_base_worker.py +++ b/data_chain/apps/base/task/worker/export_knowledge_base_worker.py @@ -230,6 +230,6 @@ class ExportKnowledgeBaseWorker(BaseWorker): err = f"[ExportKnowledgeBaseWorker] 任务不存在,task_id: {task_id}" logging.exception(err) return None - if task_entity.status == TaskStatus.CANCLED or TaskStatus.FAILED.value: + if task_entity.status == TaskStatus.CANCLED or task_entity.status == TaskStatus.FAILED.value: await MinIO.delete_object(EXPORT_KB_PATH_IN_MINIO, str(task_id)) return task_id diff --git a/data_chain/apps/base/task/worker/import_dataset_worker.py b/data_chain/apps/base/task/worker/import_dataset_worker.py index 86b98092..00ed4786 100644 --- a/data_chain/apps/base/task/worker/import_dataset_worker.py +++ b/data_chain/apps/base/task/worker/import_dataset_worker.py @@ -193,16 +193,21 @@ class ImportDataSetWorker(BaseWorker): databse_score = 0 with open(config['PROMPT_PATH'], 'r', encoding='utf-8') as f: prompt_dict = yaml.load(f, Loader=yaml.SafeLoader) - cal_qa_score_prompt_template = prompt_dict.get('CAL_QA_SCORE_PROMPT', {}) - cal_qa_score_prompt_template = cal_qa_score_prompt_template.get(language, '') + cal_qa_score_prompt_template = prompt_dict.get( + 'CAL_QA_SCORE_PROMPT', {}) + cal_qa_score_prompt_template = cal_qa_score_prompt_template.get( + language, '') for qa_entity in qa_entities: chunk = qa_entity.chunk question = qa_entity.question answer = qa_entity.answer sys_call = cal_qa_score_prompt_template.format( - fragment=TokenTool.get_k_tokens_words_from_content(chunk, llm.max_tokens//9*4), - question=TokenTool.get_k_tokens_words_from_content(question, llm.max_tokens//9), - answer=TokenTool.get_k_tokens_words_from_content(answer, llm.max_tokens//9*4) + fragment=TokenTool.get_k_tokens_words_from_content( + chunk, llm.max_tokens//9*4), + question=TokenTool.get_k_tokens_words_from_content( + question, llm.max_tokens//9), + answer=TokenTool.get_k_tokens_words_from_content( + answer, llm.max_tokens//9*4) ) usr_call = '请输出分数' score = await llm.nostream([], sys_call, usr_call) @@ -278,7 +283,7 @@ class ImportDataSetWorker(BaseWorker): err = f"[ImportDataSetWorker] 任务不存在,task_id: {task_id}" logging.exception(err) return None - if task_entity.status == TaskStatus.CANCLED or TaskStatus.FAILED.value: + if task_entity.status == TaskStatus.CANCLED or task_entity.status == TaskStatus.FAILED.value: await DatasetManager.update_dataset_by_dataset_id(task_entity.op_id, {"status": DataSetStatus.DELETED.value}) await MinIO.delete_object( IMPORT_DATASET_PATH_IN_MINIO, diff --git a/data_chain/apps/service/team_service.py b/data_chain/apps/service/team_service.py index cdb2e02e..da1fc4c8 100644 --- a/data_chain/apps/service/team_service.py +++ b/data_chain/apps/service/team_service.py @@ -77,7 +77,7 @@ class TeamService: elif req.team_type == TeamType.PUBLIC: total, team_entities = await TeamManager.list_public_team(user_sub, req) else: - total_mycreated, team_entities_mycreated = await TeamManager.list_team_mycreated_user_sub(user_sub, req) + total_mycreated, team_entities_mycreated = await TeamManager.list_team_mycreated_by_user_sub(user_sub, req) total_myjoined, team_entities_myjoined = await TeamManager.list_team_myjoined_by_user_sub(user_sub, req) total = total_mycreated + total_myjoined team_entities = team_entities_mycreated + team_entities_myjoined diff --git a/data_chain/manager/team_manager.py b/data_chain/manager/team_manager.py index 41e4af74..850f09b0 100644 --- a/data_chain/manager/team_manager.py +++ b/data_chain/manager/team_manager.py @@ -75,7 +75,7 @@ class TeamManager: raise e @staticmethod - async def list_team_mycreated_user_sub(user_sub: str, req: ListTeamRequest) -> list[TeamEntity]: + async def list_team_mycreated_by_user_sub(user_sub: str, req: ListTeamRequest) -> list[TeamEntity]: """列出我创建的团队""" try: async with await DataBase.get_session() as session: -- Gitee