diff --git a/data_chain/apps/base/task/worker/export_dataset_worker.py b/data_chain/apps/base/task/worker/export_dataset_worker.py index 082326a86ae6216814c7d38ca5e237e66748d948..d952f858075821778a56a1ff52053ff3683089c7 100644 --- a/data_chain/apps/base/task/worker/export_dataset_worker.py +++ b/data_chain/apps/base/task/worker/export_dataset_worker.py @@ -116,7 +116,8 @@ class ExportDataSetWorker(BaseWorker): cleaned_value = invalid_chars.sub('', value) # 额外处理常见问题字符(如替换冒号、斜杠等) - problematic_chars = {'\\': '', '/': '', '*': '', '?': '', '"': "'", '<': '', '>': '', ':': ''} + problematic_chars = {'\\': '', '/': '', '*': '', + '?': '', '"': "'", '<': '', '>': '', ':': ''} for char, replacement in problematic_chars.items(): cleaned_value = cleaned_value.replace(char, replacement) @@ -219,7 +220,7 @@ class ExportDataSetWorker(BaseWorker): err = f"[ImportDataSetWorker] 任务不存在,task_id: {task_id}" logging.exception(err) return None - if task_entity.status == TaskStatus.CANCLED or TaskStatus.FAILED.value: + if task_entity.status == TaskStatus.CANCLED or task_entity.status == TaskStatus.FAILED.value: await MinIO.delete_object( EXPORT_DATASET_PATH_IN_MINIO, str(task_entity.id) diff --git a/data_chain/apps/base/task/worker/export_knowledge_base_worker.py b/data_chain/apps/base/task/worker/export_knowledge_base_worker.py index 4a139da2d8f5a77e893b5345a4911379d67cab39..8ca51dc4114e7c9c959530b55170cc6817ed176a 100644 --- a/data_chain/apps/base/task/worker/export_knowledge_base_worker.py +++ b/data_chain/apps/base/task/worker/export_knowledge_base_worker.py @@ -230,6 +230,6 @@ class ExportKnowledgeBaseWorker(BaseWorker): err = f"[ExportKnowledgeBaseWorker] 任务不存在,task_id: {task_id}" logging.exception(err) return None - if task_entity.status == TaskStatus.CANCLED or TaskStatus.FAILED.value: + if task_entity.status == TaskStatus.CANCLED or task_entity.status == TaskStatus.FAILED.value: await MinIO.delete_object(EXPORT_KB_PATH_IN_MINIO, str(task_id)) return task_id diff --git a/data_chain/apps/base/task/worker/import_dataset_worker.py b/data_chain/apps/base/task/worker/import_dataset_worker.py index 86b9809295dadb7346e55cf741697e8904ba0c63..00ed4786123aed0ae34ff8f07ccce869db155db5 100644 --- a/data_chain/apps/base/task/worker/import_dataset_worker.py +++ b/data_chain/apps/base/task/worker/import_dataset_worker.py @@ -193,16 +193,21 @@ class ImportDataSetWorker(BaseWorker): databse_score = 0 with open(config['PROMPT_PATH'], 'r', encoding='utf-8') as f: prompt_dict = yaml.load(f, Loader=yaml.SafeLoader) - cal_qa_score_prompt_template = prompt_dict.get('CAL_QA_SCORE_PROMPT', {}) - cal_qa_score_prompt_template = cal_qa_score_prompt_template.get(language, '') + cal_qa_score_prompt_template = prompt_dict.get( + 'CAL_QA_SCORE_PROMPT', {}) + cal_qa_score_prompt_template = cal_qa_score_prompt_template.get( + language, '') for qa_entity in qa_entities: chunk = qa_entity.chunk question = qa_entity.question answer = qa_entity.answer sys_call = cal_qa_score_prompt_template.format( - fragment=TokenTool.get_k_tokens_words_from_content(chunk, llm.max_tokens//9*4), - question=TokenTool.get_k_tokens_words_from_content(question, llm.max_tokens//9), - answer=TokenTool.get_k_tokens_words_from_content(answer, llm.max_tokens//9*4) + fragment=TokenTool.get_k_tokens_words_from_content( + chunk, llm.max_tokens//9*4), + question=TokenTool.get_k_tokens_words_from_content( + question, llm.max_tokens//9), + answer=TokenTool.get_k_tokens_words_from_content( + answer, llm.max_tokens//9*4) ) usr_call = '请输出分数' score = await llm.nostream([], sys_call, usr_call) @@ -278,7 +283,7 @@ class ImportDataSetWorker(BaseWorker): err = f"[ImportDataSetWorker] 任务不存在,task_id: {task_id}" logging.exception(err) return None - if task_entity.status == TaskStatus.CANCLED or TaskStatus.FAILED.value: + if task_entity.status == TaskStatus.CANCLED or task_entity.status == TaskStatus.FAILED.value: await DatasetManager.update_dataset_by_dataset_id(task_entity.op_id, {"status": DataSetStatus.DELETED.value}) await MinIO.delete_object( IMPORT_DATASET_PATH_IN_MINIO, diff --git a/data_chain/apps/service/task_queue_service.py b/data_chain/apps/service/task_queue_service.py index d9905abdf0d2ef29c67ffe7c5f7242891d8babe5..58fde2f7754814e5f7d709ce20403db3cdec4270 100644 --- a/data_chain/apps/service/task_queue_service.py +++ b/data_chain/apps/service/task_queue_service.py @@ -23,7 +23,6 @@ class TaskQueueService: st = time.time() pending_task_entities = await TaskManager.list_task_by_task_status(TaskStatus.PENDING.value) en = time.time() - logging.info(f"[TaskQueueService] 获取待处理任务耗时 {en-st} 秒") pending_task_ids = [ task_entity.id for task_entity in pending_task_entities] pending_task_entities_in_db = await TaskQueueManager.get_tasks_by_ids(pending_task_ids) @@ -38,14 +37,12 @@ class TaskQueueService: st = time.time() running_task_entities = await TaskManager.list_task_by_task_status(TaskStatus.RUNNING.value) en = time.time() - logging.info(f"[TaskQueueService] 获取运行中任务耗时 {en-st} 秒") for task_entity in running_task_entities: # 将所有任务取消 try: st = time.time() flag = await BaseWorker.reinit(task_entity.id) en = time.time() - logging.info(f"[TaskQueueService] 重新初始化任务耗时 {en-st} 秒") if flag: st = time.time() task_need_pending_ids.append(task_entity.id) @@ -64,7 +61,6 @@ class TaskQueueService: await TaskQueueManager.update_task_by_ids( task_need_pending_ids[i:i+batch_size], TaskStatus.PENDING) en = time.time() - logging.info(f"[TaskQueueService] 更新待处理任务状态耗时 {en-st} 秒") if len(task_need_delete_ids) > 0: st = time.time() for i in range(0, len(task_need_delete_ids), batch_size): @@ -78,7 +74,6 @@ class TaskQueueService: await TaskQueueManager.add_tasks( task_entities_need_add[i:i+batch_size]) en = time.time() - logging.info(f"[TaskQueueService] 批量添加任务耗时 {en-st} 秒") @staticmethod async def init_task(task_type: str, op_id: uuid.UUID) -> uuid.UUID: diff --git a/data_chain/apps/service/team_service.py b/data_chain/apps/service/team_service.py index cdb2e02ee2efca30a9c50bc9c107096f158e03bc..da1fc4c8d80d4f3636a069e408dcb6f2ec5e5ac1 100644 --- a/data_chain/apps/service/team_service.py +++ b/data_chain/apps/service/team_service.py @@ -77,7 +77,7 @@ class TeamService: elif req.team_type == TeamType.PUBLIC: total, team_entities = await TeamManager.list_public_team(user_sub, req) else: - total_mycreated, team_entities_mycreated = await TeamManager.list_team_mycreated_user_sub(user_sub, req) + total_mycreated, team_entities_mycreated = await TeamManager.list_team_mycreated_by_user_sub(user_sub, req) total_myjoined, team_entities_myjoined = await TeamManager.list_team_myjoined_by_user_sub(user_sub, req) total = total_mycreated + total_myjoined team_entities = team_entities_mycreated + team_entities_myjoined diff --git a/data_chain/llm/llm.py b/data_chain/llm/llm.py index bad100b5815dc1d85dcf46feb22eff745fd6a26c..e3038de80005ab8ca35dae1508a46be4001fcf0d 100644 --- a/data_chain/llm/llm.py +++ b/data_chain/llm/llm.py @@ -94,7 +94,7 @@ class LLM: index = content[::-1].find(en_str[::-1]) if index != -1: content = content[:len(content)-index] - logger.error(f"LLM nostream content: {content}") + logger.info(f"LLM nostream content: {content}") except Exception as e: err = f"[LLM] 非流式输出异常: {e}" logger.error("[LLM] %s", err) diff --git a/data_chain/manager/chunk_manager.py b/data_chain/manager/chunk_manager.py index 4ece74bba9378cdade6f27475cf76a1e2a78130d..f743f2ed5d8988eda266fc525878a8d52cec27e7 100644 --- a/data_chain/manager/chunk_manager.py +++ b/data_chain/manager/chunk_manager.py @@ -272,7 +272,6 @@ class ChunkManager(): # -------------------------- # 原有逻辑:执行查询与结果处理(完全保留) # -------------------------- - logging.error(f"执行向量查询SQL: {base_sql},参数: {params}") result = await session.execute(text(base_sql), params) rows = result.fetchall() @@ -301,7 +300,7 @@ class ChunkManager(): # 可选:查询结束后恢复enable_seqscan(避免影响后续查询,会话结束也会自动恢复) - logging.error(f"向量查询耗时:{datetime.now()-st}") + logging.info(f"向量查询耗时:{datetime.now()-st}") return chunk_entities except Exception as e: @@ -384,7 +383,7 @@ class ChunkManager(): stmt = stmt.limit(top_k) result = await session.execute(stmt) chunk_entities = result.scalars().all() - logging.warning( + logging.info( f"[ChunkManager] get_top_k_chunk_by_kb_id_keyword cost: {(datetime.now()-st).total_seconds()}s") return chunk_entities except Exception as e: @@ -453,7 +452,7 @@ class ChunkManager(): # 7. 日志输出 cost = (datetime.now() - st).total_seconds() - logging.warning( + logging.info( f"[ChunkManager] BM25查询耗时: {cost}s " f"| kb_id: {kb_id} | query: {query[:50]}... | 匹配数量: {len(chunk_entities)}" ) diff --git a/data_chain/manager/team_manager.py b/data_chain/manager/team_manager.py index 41e4af74fb944ef17fa9f3374057ce8c0db47c2d..850f09b0a99e7df0cd634d7628f039176696bea3 100644 --- a/data_chain/manager/team_manager.py +++ b/data_chain/manager/team_manager.py @@ -75,7 +75,7 @@ class TeamManager: raise e @staticmethod - async def list_team_mycreated_user_sub(user_sub: str, req: ListTeamRequest) -> list[TeamEntity]: + async def list_team_mycreated_by_user_sub(user_sub: str, req: ListTeamRequest) -> list[TeamEntity]: """列出我创建的团队""" try: async with await DataBase.get_session() as session: diff --git a/data_chain/parser/tools/ocr_tool.py b/data_chain/parser/tools/ocr_tool.py index d2db21503883bee61e5455259e111fb42df88750..0c29cf1a7f5afb8419290abb98bba38709077377 100644 --- a/data_chain/parser/tools/ocr_tool.py +++ b/data_chain/parser/tools/ocr_tool.py @@ -23,7 +23,8 @@ class OcrTool: rec_model_dir=rec_model_dir, cls_model_dir=cls_model_dir, use_angle_cls=True, - lang="ch" + lang="ch", + show_log=False ) else: model = None diff --git a/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py b/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py index 320601329d16a0602537e32316c64cf38a9673c2..f0420569a71c1c6f993cf9047c302728e835ca10 100644 --- a/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py +++ b/data_chain/rag/dynamic_weighted_keyword_and_vector_searcher.py @@ -46,7 +46,7 @@ class DynamicKeywordVectorSearcher(BaseSearcher): try: import time start_time = time.time() - logging.error( + logging.info( f"[DynamicKeywordVectorSearcher] 开始进行向量检索,top_k: {top_k-len(chunk_entities_get_by_keyword)-len(chunk_entities_get_by_dynamic_weighted_keyword)}") chunk_entities_get_by_vector = await asyncio.wait_for(ChunkManager.get_top_k_chunk_by_kb_id_vector(kb_id, vector, top_k-len(chunk_entities_get_by_keyword)-len(chunk_entities_get_by_dynamic_weighted_keyword), doc_ids, banned_ids), timeout=300) end_time = time.time() @@ -58,7 +58,7 @@ class DynamicKeywordVectorSearcher(BaseSearcher): err = f"[DynamicKeywordVectorSearcher] 向量检索失败,error: {e}, traceback: {traceback.format_exc()}" logging.error(err) continue - logging.error( + logging.info( f"[DynamicKeywordVectorSearcher] chunk_entities_get_by_keyword: {len(chunk_entities_get_by_keyword)}, chunk_entities_get_by_dynamic_weighted_keyword: {len(chunk_entities_get_by_dynamic_weighted_keyword)}, chunk_entities_get_by_vector: {len(chunk_entities_get_by_vector)}") chunk_entities = chunk_entities_get_by_keyword + \ chunk_entities_get_by_dynamic_weighted_keyword + chunk_entities_get_by_vector