# OSS.PipeLine **Repository Path**: osscore/OSS.PipeLine ## Basic Information - **Project Name**: OSS.PipeLine - **Description**: 流式事件处理,微服务下业务生命周期管理,强化业务的流程管理,建立业务操作边界,打造标准化的业务执行单元,提高代码复用。 - **Primary Language**: Unknown - **License**: GPL-3.0 - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 4 - **Forks**: 3 - **Created**: 2021-06-01 - **Last Updated**: 2026-01-27 ## Categories & Tags **Categories**: process-engine **Tags**: None ## README # OSS 流程引擎与数据管道库 ## 项目简介 - **OSS.DataPipe**:轻量级消息管道,抽象定义消息输入输出接口,简化业务侧消息生产、消费调用方式。
内部提供基于内存队列的默认实现(可扩展自定义消息队列实现)。
同时提供事件处理器,用以支持自定义事件重试策略。 - **OSS.Pipeline**:轻量级流程引擎,提供Activity、Branch等多种组件、支持自定义流程监控等功能。
组件间的消息传递基于 OSS.DataPipe 实现,进而支持组件的自动重试。 ## 快速开始 ### 安装 Install-Package OSS.DataPipe
Install-Package OSS.Pipeline ### 简单示例 #### OSS.Pipeline 示例 ```csharp using OSS.Pipeline; using OSS.Common.Resp; var node1 = new SimpleActivity("Node1", async (input) => { Console.WriteLine($"Node1 is executing.......input:{input}"); return string.Empty; }); var node3 = node1.Append("Node2", async (s) => { Console.WriteLine("Node2 is executing......."); return "Node2-Successful"; }) .Append("Node3", async (s) => { Console.WriteLine($"Node3 is executing.......(Node2 result:{s})"); return "Ended"; }); // 定义流程 var pipeline = PipelineBuilder.Define("MyFlow", node1, node3); // 执行流程 await pipeline.Run("测试数据"); // 或者 await startActivity.Run("测试数据") ``` #### 自定义流程监控 您可以实现 `IPipeMonitor` 接口来自定义流程监控逻辑,例如将监控数据发送到 ELK 或 Prometheus。 ```csharp // 可查看 IPipeMonitor 扩展不同阶段监控 public class MyPipelineMonitor : IPipeMonitor { /// /// Executed /// public async Task Executed(PipeType pipeType, string pipeCode, object? inputPara, object? exeResult) { return Task.CompletedTask; } } // 使用自定义监控 pipeline.SetMonitor(new MyPipelineMonitor()); ``` #### OSS.DataPipe 业务侧调用示例 ```csharp using OSS.DataPipe; // 注入数据消费方法,并得到数据输入句柄 var dataProducer = DataPipeFactory.CreateProducer(async (data) => { Console.WriteLine($"消费数据: {data}"); return true; }); // 通过数据输入句柄推送数据 await dataProducer.Push("测试数据"); // 等待测试数据消费完成 await Task.Delay(1000); // 如果不再需要继续生产消息,通过此方法释放资源 DataPipeFactory.Release(dataProducer); ``` ## 详细文档 ### OSS.DataPipe - [快速入门](docs/datapipe/quickstart.md) - [核心组件](docs/datapipe/core-components.md) - [高级特性](docs/datapipe/advanced-features.md) ### OSS.Pipeline - [快速入门](docs/pipeline/quickstart.md) - [核心组件](docs/pipeline/core-components.md) - [高级特性](docs/pipeline/advanced-features.md) ## 扩展指南 ### 自定义集成 RabbitMQ OSS.DataPipe 支持自定义数据输入提供器,您可以轻松集成 RabbitMQ 等消息队列。 详细指南请参考:[RabbitMQ 集成指南](docs/extensions/rabbitmq-integration.md) ## 应用场景 ### OSS.Pipeline 应用场景 - 订单处理流程 - 审批流程 - 数据同步流程 - 任务调度系统 - 事件驱动架构 ### OSS.DataPipe 应用场景 - 消息队列 - 事件驱动架构 - 可靠任务执行 - 数据流转处理 - 异步处理 ## 测试用例 查看 [Tests/OSS.Pipeline.ConsoleTests](https://github.com/your-repo/OSS.Pipeline/tree/main/src/Tests/OSS.Pipeline.ConsoleTests) 目录获取完整示例代码。 ## 许可证 MIT License ## 贡献 欢迎提交 Issue 和 Pull Request!