# OSS.PipeLine
**Repository Path**: osscore/OSS.PipeLine
## Basic Information
- **Project Name**: OSS.PipeLine
- **Description**: 流式事件处理,微服务下业务生命周期管理,强化业务的流程管理,建立业务操作边界,打造标准化的业务执行单元,提高代码复用。
- **Primary Language**: Unknown
- **License**: GPL-3.0
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 4
- **Forks**: 3
- **Created**: 2021-06-01
- **Last Updated**: 2026-01-27
## Categories & Tags
**Categories**: process-engine
**Tags**: None
## README
# OSS 流程引擎与数据管道库
## 项目简介
- **OSS.DataPipe**:轻量级消息管道,抽象定义消息输入输出接口,简化业务侧消息生产、消费调用方式。
内部提供基于内存队列的默认实现(可扩展自定义消息队列实现)。
同时提供事件处理器,用以支持自定义事件重试策略。
- **OSS.Pipeline**:轻量级流程引擎,提供Activity、Branch等多种组件、支持自定义流程监控等功能。
组件间的消息传递基于 OSS.DataPipe 实现,进而支持组件的自动重试。
## 快速开始
### 安装
Install-Package OSS.DataPipe
Install-Package OSS.Pipeline
### 简单示例
#### OSS.Pipeline 示例
```csharp
using OSS.Pipeline;
using OSS.Common.Resp;
var node1 = new SimpleActivity("Node1", async (input) =>
{
Console.WriteLine($"Node1 is executing.......input:{input}");
return string.Empty;
});
var node3 = node1.Append("Node2", async (s) =>
{
Console.WriteLine("Node2 is executing.......");
return "Node2-Successful";
})
.Append("Node3", async (s) =>
{
Console.WriteLine($"Node3 is executing.......(Node2 result:{s})");
return "Ended";
});
// 定义流程
var pipeline = PipelineBuilder.Define("MyFlow", node1, node3);
// 执行流程
await pipeline.Run("测试数据"); // 或者 await startActivity.Run("测试数据")
```
#### 自定义流程监控
您可以实现 `IPipeMonitor` 接口来自定义流程监控逻辑,例如将监控数据发送到 ELK 或 Prometheus。
```csharp
// 可查看 IPipeMonitor 扩展不同阶段监控
public class MyPipelineMonitor : IPipeMonitor
{
///
/// Executed
///
public async Task Executed(PipeType pipeType, string pipeCode, object? inputPara, object? exeResult)
{
return Task.CompletedTask;
}
}
// 使用自定义监控
pipeline.SetMonitor(new MyPipelineMonitor());
```
#### OSS.DataPipe 业务侧调用示例
```csharp
using OSS.DataPipe;
// 注入数据消费方法,并得到数据输入句柄
var dataProducer = DataPipeFactory.CreateProducer(async (data) =>
{
Console.WriteLine($"消费数据: {data}");
return true;
});
// 通过数据输入句柄推送数据
await dataProducer.Push("测试数据");
// 等待测试数据消费完成
await Task.Delay(1000);
// 如果不再需要继续生产消息,通过此方法释放资源
DataPipeFactory.Release(dataProducer);
```
## 详细文档
### OSS.DataPipe
- [快速入门](docs/datapipe/quickstart.md)
- [核心组件](docs/datapipe/core-components.md)
- [高级特性](docs/datapipe/advanced-features.md)
### OSS.Pipeline
- [快速入门](docs/pipeline/quickstart.md)
- [核心组件](docs/pipeline/core-components.md)
- [高级特性](docs/pipeline/advanced-features.md)
## 扩展指南
### 自定义集成 RabbitMQ
OSS.DataPipe 支持自定义数据输入提供器,您可以轻松集成 RabbitMQ 等消息队列。
详细指南请参考:[RabbitMQ 集成指南](docs/extensions/rabbitmq-integration.md)
## 应用场景
### OSS.Pipeline 应用场景
- 订单处理流程
- 审批流程
- 数据同步流程
- 任务调度系统
- 事件驱动架构
### OSS.DataPipe 应用场景
- 消息队列
- 事件驱动架构
- 可靠任务执行
- 数据流转处理
- 异步处理
## 测试用例
查看 [Tests/OSS.Pipeline.ConsoleTests](https://github.com/your-repo/OSS.Pipeline/tree/main/src/Tests/OSS.Pipeline.ConsoleTests) 目录获取完整示例代码。
## 许可证
MIT License
## 贡献
欢迎提交 Issue 和 Pull Request!