From cee490635f16fd9073470cd44b9fde3b1a547749 Mon Sep 17 00:00:00 2001 From: zxstty Date: Wed, 23 Apr 2025 11:24:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=86=E6=89=B9=E6=AC=A1=E5=BD=95=E5=85=A5?= =?UTF-8?q?=E6=96=87=E6=A1=A3=E8=A7=A3=E6=9E=90=E7=BB=93=E6=9E=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data_chain/parser/service/parser_service.py | 35 +++++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/data_chain/parser/service/parser_service.py b/data_chain/parser/service/parser_service.py index 47a4d20..7e881dd 100644 --- a/data_chain/parser/service/parser_service.py +++ b/data_chain/parser/service/parser_service.py @@ -124,7 +124,11 @@ class ParserService: status=chunk['status'] ) chunk_entity_list.append(chunk_entity) - await ChunkManager.insert_chunks(chunk_entity_list) + if len(chunk_entity_list) >= 1000: + await ChunkManager.insert_chunks(chunk_entity_list) + chunk_entity_list = [] + if len(chunk_entity_list) > 0: + await ChunkManager.insert_chunks(chunk_entity_list) else: chunk_entity_list = [] for chunk in chunks: @@ -137,7 +141,11 @@ class ParserService: global_offset=chunk['global_offset'], ) chunk_entity_list.append(chunk_entity) - await TemporaryChunkManager.insert_temprorary_chunks(chunk_entity_list) + if len(chunk_entity_list) >= 1000: + await TemporaryChunkManager.insert_temprorary_chunks(chunk_entity_list) + chunk_entity_list = [] + if len(chunk_entity_list) > 0: + await TemporaryChunkManager.insert_temprorary_chunks(chunk_entity_list) except Exception as e: logging.error(f"Failed to upload chunk: {e}") raise e @@ -155,7 +163,11 @@ class ParserService: type=chunk_link['type'], ) chunk_link_entity_list.append(chunk_link_entity) - await ChunkLinkManager.insert_chunk_links(chunk_link_entity_list) + if len(chunk_link_entity_list) >= 1000: + await ChunkLinkManager.insert_chunk_links(chunk_link_entity_list) + chunk_link_entity_list = [] + if len(chunk_link_entity_list) > 0: + await ChunkLinkManager.insert_chunk_links(chunk_link_entity_list) except Exception as e: logging.error(f"Failed to upload chunk: {e}") raise e @@ -189,7 +201,11 @@ class ParserService: user_id=image['user_id'], ) image_entity_list.append(image_entity) - await ImageManager.add_images(image_entity_list) + if len(image_entity_list) >= 1000: + await ImageManager.add_images(image_entity_list) + image_entity_list = [] + if len(image_entity_list) > 0: + await ImageManager.add_images(image_entity_list) except Exception as e: logging.error(f"Failed to upload image: {e}") raise e @@ -224,6 +240,7 @@ class ParserService: return try: if not is_temporary_document: + index = 0 doc = await DocumentManager.select_by_id(vectors[0]['doc_id']) kb = await KnowledgeBaseManager.select_by_id(doc.kb_id) vector_items_table = await PostgresDB.get_dynamic_vector_items_table( @@ -231,7 +248,9 @@ class ParserService: embedding_model_out_dimensions[kb.embedding_model] ) await PostgresDB.create_table(vector_items_table) - await VectorItemsManager.add_all(vector_items_table, vectors) + while index <= len(vectors): + await VectorItemsManager.add_all(vector_items_table, vectors[index:min(index+1000, len(vectors))]) + index += 1000 else: vector_entity_list = [] for vector in vectors: @@ -241,7 +260,11 @@ class ParserService: chunk_id=vector['chunk_id'], vector=vector['vector']) ) - await TemporaryVectorItemsManager.add_all(vector_entity_list) + if len(vector_entity_list) >= 1000: + await TemporaryVectorItemsManager.add_all(vector_entity_list) + vector_entity_list = [] + if len(vector_entity_list) > 0: + await TemporaryVectorItemsManager.add_all(vector_entity_list) except Exception as e: logging.error(f"Failed to upload chunk: {e}") raise e -- Gitee