# LLMScore_python **Repository Path**: chen_qinxu/llmscore_python ## Basic Information - **Project Name**: LLMScore_python - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: main - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2025-08-08 - **Last Updated**: 2025-08-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 项目介绍:大模型端 > 传送门🚪:[LLM Score 项目的 Java 后端](https://gitee.com/rf818/llmscore_java) 本项目是大模型评分的 python 后端代码。主要功能是从kafka消息队列中拿取消息(每次一条)然后进行prompt提示处理,对LLM 返回对结果也通过kafka消息队列传回Java后端,保证可靠性传输。 ![项目架构图](readme_files/大模型智能质检.jpg) # Quick Start 直接运行 `main.py` 即基于 `FastAPI` 和 `Uvicorn` 启动Restful Web 应用,并暴露`/score` API接口,方便进行快速测试,主要是使用kafka消息队列确保稳定性。 在`config/uvicorn-config.yaml` 文件中定义了web应用的地址和端口。 `config/llm-config.yaml`文件可以调整LLM 的超参数,Prompt等。 `config/qianfan-config.yaml`中调整百度大模型平台千帆的secret-key、access-key。 > 注意!由于考虑到像千帆api请求上限异常、评分条数不足等情况,将kafka的`session_time_out_ms` 设置为6分钟。 > 所以如果重启python 后端服务时,kafka不会立马将分配给重启的服务,需要等待6分钟,判定上一个服务“死亡”,重启的服务才会被分配消息进行消费。 > 而request_time_out_ms默认是40s一次,所以在6分钟内会有多次的请求kafka超时错误,这是正常现象。 # 项目功能 技术栈:FastAPI、Uvicorn、Kafka、Nacos、qianfan SDK - 实现 `kafka consumer` 用于消费并推送结果信息 - 基于`qianfan SDK` 实现单轮、多轮对话 - 基于`Fast API` 和 `Uvicorn` 实现 Python Web 应用, 方便简单使用和后续开发维护 - 基于`nacos-python` 实现LLM动态配置,模型上线后能够动态调整超参数。 ### `main.py` - `main.py` 用于启动Fast API 应用。 ```python if __name__ == '__main__': app_string = 'main:app' uvicorn.run(app_string, host=uvicorn_config['host'], port=uvicorn_config['port'], reload=uvicorn_config['reload'], log_config=uvicorn_config['log-config']) ``` - 在启动时,进行`nacos`的初始化、qianfan SDK 实例注册,同时启动`kafka consumer` 的初始化 ```python @app.on_event("startup") async def startup_event(): # 先初始化好配置 init_nacos() # 更新本地配置 chat_comp_singleton.update(qianfan_config) # 在一个单独的线程中运行 initial_consumer 函数 logger.info("启动kafka consumer") asyncio.create_task(initial_consumer(llm_config, kafka_config=kafka_config)) ``` - `nacos`初始化时,实例化一个`NacosHelper` 同时初始化配置监听器。 `NacosHelper` 内部维护一个`nacos client` 、实现配置拉取、推送配置、添加配置监听器等功能。 `nacos`会监控`config/` 目录下的所有配置文件,即项目上线后可以方便的在`nacos` 后台中更新任何配置文件,而不必经历:杀停应用->修改本地配置文件->应用上线部署 ```python def init_nacos(): nacos = NacosHelper(nacos_endpoint, nacos_namespace_id) # 注册配置变更监控回调 # Todo: 测试一下配置更新功能 nacos.add_conf_watcher(nacos_data_id_list, nacos_group_name, nacos_config_callback) ``` --- ### `kafka_consumer.py` - `initial_consumer` 对 kafkaConsumer进行初始化,同时启动消费者监听器。当有消息时,就会启动`consume_produce_integreate` 方法进行消费。 ```python async def initial_consumer(llm_config=None, kafka_config=None): loop = asyncio.get_event_loop() KAFKA_TOPIC_CONSUMER = kafka_config['kafka']['consumer']['topic'] KAFKA_CONSUMER_GROUP = kafka_config['kafka']['consumer']['group'] KAFKA_BOOTSTRAP_SERVERS = kafka_config['kafka']['bootstrap_servers'] # 后续配置 # log.info(f'Initializing KafkaConsumer for topic {KAFKA_TOPIC_CONSUMER}, group_id {group_id}' # f' and using bootstrap servers {KAFKA_BOOTSTRAP_SERVERS}') consumer = AIOKafkaConsumer( KAFKA_TOPIC_CONSUMER, loop=loop, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, group_id=KAFKA_CONSUMER_GROUP, auto_offset_reset="earliest", value_deserializer=json.loads, enable_auto_commit=True, max_poll_records=1, session_timeout_ms=60000, ) # get cluster layout and join group await consumer.start() try: # Consume messages async for msg in consumer: try: await consume_produce_integrate(msg, llm_config, kafka_config) except Exception as e: logger.error(e) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() ``` - `consume_produce_integrate` 方法拿到 LLM 评分对结果,使用 kafkaProducer 向另外一个主题发送批改后的 JSON 结果。在Java 后端响应的会有一个消费者监听着该主题。 ```python async def consume_produce_integrate(msg, llm_config=None, kafka_config=None): json_post_list = msg.value prompt_prefix = llm_config['prompt']['prefix'] prompt_output = llm_config['prompt']['output'] dialog_content = json_post_list['stt'] try: llm_output = await score_by_llm_integrate(dialog_content, json_post_list, llm_config, prompt_output, prompt_prefix) logger.info("正在返回大模型输出结果->{}", llm_output) await produce(llm_output, kafka_config) except JsonFormatError as jfe: logger.error("大模型处理Json格式出错!将记录->{}", jfe.llm_output) output = { "mark": -1, "data": jfe.llm_output } await produce(output, kafka_config) ``` - `produce` 方法用于异步启动一个kafka生产者,发送批改后的结果 ```python async def produce(llm_output, kafka_config): KAFKA_TOPIC_PROVIDER = kafka_config['kafka']['provider']['topic'] KAFKA_BOOTSTRAP_SERVERS = kafka_config['kafka']['bootstrap_servers'] loop = asyncio.get_event_loop() producer = AIOKafkaProducer( loop=loop, bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS, value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode(), ) # Get cluster layout and initial topic/partition leadership information await producer.start() try: # Produce message await producer.send_and_wait(KAFKA_TOPIC_PROVIDER, llm_output) finally: # Wait for all pending messages to be delivered or expire. await producer.stop() ``` - `consume_produce_integrate`方法读取文件中的prompt设置,同时以integrate的方法将评分内容发送给LLM。评分采用免费的、基于qianfan SDK的Yi-34B模型。 ```python async def consume_produce_integrate(msg, llm_config=None, kafka_config=None): json_post_list = msg.value prompt_prefix = llm_config['prompt']['prefix'] prompt_output = llm_config['prompt']['output'] dialog_content = json_post_list['stt'] try: llm_output = await score_by_llm_integrate(dialog_content, json_post_list, llm_config, prompt_output, prompt_prefix) logger.info("正在返回大模型输出结果->{}", llm_output) await produce(llm_output, kafka_config) except JsonFormatError as jfe: logger.error("大模型处理Json格式出错!将记录->{}", jfe.llm_output) output = { "mark": -1, "data": jfe.llm_output } await produce(output, kafka_config) ``` - `score_by_llm_integrate`方法继续传递数据,在`score_integrate` 中正式调用LLM。 对LLM返回的结果进行检查,如果长度不合规、或者返回的结果不是JSON格式,那么就进行重新评分。(建议使用另外一个模型+低温度系数,确保更好的纠错效果) ```python async def score_by_llm_integrate(dialog_content, json_post_list, llm_config, prompt_output, prompt_prefix): resp = await score_integrate(dialog_content, prompt_prefix, prompt_output, llm_config) json_data = json.loads(resp) if len(json_data['reasons']) < 15: logger.error(f"评分出错,大小{len(json_data['reasons'])},->\n{resp}") send_content = f''' {prompt_prefix}{dialog_content} {llm_config['prompt']['stand-content']['error']} 你的评分:{resp} {llm_config['prompt']['output-error']} {llm_config['prompt']['integrate-error']} {llm_config['prompt']['score-error']} ''' resp_redo = await chat_with_llm(send_content, llm_config) await ensure_json_format(llm_config, resp_redo['body']['result']) json_data = json.loads(resp_redo['body']['result']) return { "id": json_post_list['id'], "result": json_data['reasons'] } ``` --- ### `llm_func.py` - `score_integrate`方法将要发送的内容进行拼合,送入`get_response_from_llm` 方法进行打分。 ```python async def score_integrate(dialog_content, prompt_prefix, prompt_output, llm_config=None): send_content = f"{llm_config['prompt']['few-shot']}\n{prompt_prefix}{dialog_content}\n{llm_config['prompt']['stand-content']['total']}\n{llm_config['prompt']['integrate']}\n{prompt_output}" return await get_response_from_llm(llm_config, send_content) ``` - `get_response_from_llm`方法,用于判断是Iterative 方法还是 Integrate方法。拿到结果后需要判断一下是否符合JSON格式。 ```python async def get_response_from_llm(llm_config, send_content_first, send_content_second=None): if send_content_second is None: resp = await chat_with_llm(send_content_first, llm_config) else: resp = await chat_with_llm_multi_turn(await chat_with_llm(send_content_first, llm_config), send_content_first, send_content_second, llm_config) await ensure_json_format(llm_config, resp['body']['result']) return resp['body']['result'] ```