# webflux-rsocket-running
**Repository Path**: coderwing/webflux-rsocket-running
## Basic Information
- **Project Name**: webflux-rsocket-running
- **Description**: WebFlux+RSocket
- **Primary Language**: Unknown
- **License**: Not specified
- **Default Branch**: master
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-07-02
- **Last Updated**: 2025-10-25
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
# WebFlux + RSocket 响应式全栈应用





基于 Spring WebFlux + RSocket 的生产级响应式应用,集成完整的订单管理、实时通知、监控告警等企业级功能。
---
## 📖 项目简介
这是一个展示**生产级响应式编程最佳实践**的完整项目,实现了从数据访问层到前端通信的全链路响应式架构。项目采用事件驱动设计,支持高并发、低延迟的实时通信场景。
### 🎯 项目特点
- ✅ **完整业务场景**:订单管理、商品管理、用户认证等真实业务功能
- ✅ **RSocket 实时通信**:四种通信模式全覆盖(Request-Response、Fire-and-Forget、Request-Stream、Channel)
- ✅ **响应式全栈**:从数据库到Web层全程无阻塞
- ✅ **事件驱动架构**:解耦业务逻辑,易于扩展
- ✅ **生产级特性**:缓存、限流、监控、异常处理一应俱全
- ✅ **可观测性**:集成 Prometheus 监控和健康检查
---
## ✨ 核心功能
### 🔐 1. 响应式安全认证
- **JWT Token 认证**:无状态的分布式认证方案
- **响应式安全过滤器**:基于 Spring Security Reactive
- **密码加密**:BCrypt 安全加密
- **接口权限控制**:细粒度的访问控制
**接口示例**:
```bash
# 用户登录
POST /auth/login
Content-Type: application/json
{
"username": "admin",
"password": "password123"
}
```
### 📦 2. 订单管理系统
完整的订单生命周期管理,展示复杂的响应式业务流程。
**核心功能**:
- 订单创建(库存检查 → 价格计算 → 库存扣减 → 订单生成 → 通知发送)
- 订单支付(状态校验 → 支付处理 → 状态更新 → 事件发布)
- 订单取消(取消校验 → 库存恢复 → 状态变更 → 通知推送)
- 超时订单自动取消(定时任务 → 批量处理)
**接口示例**:
```bash
# 创建订单
POST /api/orders
Authorization: Bearer {token}
# 支付订单
POST /api/orders/{orderId}/pay
# 取消订单
POST /api/orders/{orderId}/cancel
# 获取用户订单
GET /api/orders/user/{userId}
# 批量取消超时订单
POST /api/orders/cancel-timeout?timeoutMinutes=30
```
### 🛍️ 3. 商品管理系统
**核心功能**:
- 商品 CRUD 操作
- 库存管理(检查、扣减、恢复)
- 商品搜索(关键词、价格区间、分类)
- 热门商品排行
- 库存预警(低库存商品监控)
**接口示例**:
```bash
# 获取商品详情
GET /api/products/{id}
# 商品搜索
GET /api/products/search?keyword=手机
# 价格区间查询
GET /api/products/price-range?minPrice=100&maxPrice=1000
# 获取热门商品
GET /api/products/top-selling?limit=10
# 库存检查
GET /api/products/{id}/stock-check?quantity=5
# 库存预警
GET /api/products/low-stock?threshold=10
```
### 🔔 4. RSocket 实时通知系统
这是项目的**核心亮点**,展示了 RSocket 四种通信模式的实际应用。
#### 4.1 Request-Stream(请求-流)
用户订阅实时通知,持续接收推送消息。
```java
// 客户端代码示例
rsocketRequester
.route("notification.subscribe")
.data(userId)
.retrieveFlux(NotificationMessage.class)
.subscribe(notification -> {
System.out.println("收到通知: " + notification.getTitle());
});
```
**特性**:
- 自动心跳检测(每30秒)
- 背压控制(限速10条/秒)
- 缓冲区管理(最多100条消息)
- 自动重试(失败重试3次)
- 连接超时保护(30分钟)
#### 4.2 Fire-and-Forget(发送不需响应)
高性能消息推送,适合不需要响应的场景。
```java
// 发送单条通知
rsocketRequester
.route("notification.send")
.data(notificationMessage)
.send()
.subscribe();
```
#### 4.3 Request-Response(请求-响应)
系统消息广播,返回成功推送的用户数。
```java
// 系统广播
rsocketRequester
.route("notification.broadcast")
.data(systemMessage)
.retrieveMono(Long.class)
.subscribe(count -> {
System.out.println("成功推送给 " + count + " 个用户");
});
```
#### 4.4 Request-Channel(双向流)
批量消息处理,双向流式通信。
```java
// 批量发送通知
Flux messageFlux = Flux.just(msg1, msg2, msg3);
rsocketRequester
.route("notification.batch")
.data(messageFlux)
.retrieveFlux(String.class)
.subscribe(result -> {
System.out.println(result); // "✓ 用户 1001 推送成功"
});
```
**RSocket 路由清单**:
| 路由 | 模式 | 功能描述 |
|------------------------------|-----------------|--------------|
| `notification.subscribe` | Request-Stream | 订阅实时通知 |
| `notification.send` | Fire-and-Forget | 发送单条通知 |
| `notification.batch` | Request-Channel | 批量发送通知 |
| `notification.broadcast` | Request-Response| 广播系统消息 |
| `notification.check-online` | Request-Response| 检查用户是否在线 |
| `notification.online-count` | Request-Response| 获取在线用户数 |
### 💾 5. 多级缓存系统
**两层缓存架构**:
- **L1 - Caffeine 本地缓存**:JVM 内存缓存,速度最快
- **L2 - Redis 分布式缓存**:跨实例共享,支持集群
**缓存策略**:
```java
// 商品查询自动缓存(TTL 1小时)
@ReactiveCacheable(cacheNames = "products", key = "#id")
public Mono getProductById(Long id) {
return productRepository.findById(id);
}
```
### 🛡️ 6. 三维度智能限流
基于 Bucket4j 实现的令牌桶限流算法。
**限流维度**:
1. **IP 限流**:防止单个 IP 恶意请求
2. **用户限流**:限制单个用户请求频率
3. **接口限流**:保护热点接口
**限流响应**:
```json
{
"error": "Rate limit exceeded",
"message": "IP访问频率过高",
"retryAfter": 5
}
```
### 📊 7. 监控与指标
集成 **Micrometer + Prometheus**,实时收集业务指标。
**计数器指标**:
- `orders.created` - 订单创建数
- `payments.success` - 支付成功数
- `payments.failure` - 支付失败数
- `rsocket.connections` - RSocket 连接数
- `notifications.sent` - 通知发送数
- `cache.hits` / `cache.misses` - 缓存命中/未命中
- `ratelimit.triggered` - 限流触发次数
**计时器指标**:
- `order.processing.time` - 订单处理耗时
- `database.query.time` - 数据库查询耗时
- `cache.operation.time` - 缓存操作耗时
- `rsocket.message.time` - RSocket 消息处理耗时
**实时指标**:
- `users.online` - 当前在线用户数
- `orders.pending` - 待处理订单数
- `products.low.stock` - 库存不足商品数
**访问监控端点**:
```bash
# 健康检查
GET http://localhost:8088/actuator/health
# Prometheus 指标
GET http://localhost:8088/actuator/prometheus
# 应用信息
GET http://localhost:8088/actuator/info
```
### 🎯 8. 事件驱动架构
使用 Spring 事件机制解耦业务逻辑。
**事件流**:
```
订单服务
→ 发布 OrderStatusChangedEvent
→ NotificationEventListener 监听
→ RSocket 推送通知给用户
```
**优点**:
- Service 不依赖 RSocket 具体实现
- 易于单元测试
- 易于扩展其他通知方式(短信、邮件等)
---
## 🛠️ 技术栈
### 核心框架
- **Spring Boot 3.1.0** - 基础框架
- **Spring WebFlux** - 响应式 Web
- **Spring Security Reactive** - 响应式安全
- **Spring Data R2DBC** - 响应式数据访问
- **Spring RSocket** - RSocket 协议支持
### 数据存储
- **MySQL 8.0+** - 业务数据存储
- **R2DBC MySQL** - 响应式数据库驱动
- **Redis 5.0+** - 分布式缓存
- **Lettuce** - 响应式 Redis 客户端
### 中间件与工具
- **JWT (jjwt 0.11.5)** - 认证令牌
- **Bucket4j** - 分布式限流
- **Caffeine** - 本地缓存
- **Micrometer** - 指标收集
- **Prometheus** - 监控系统
- **Lombok** - 代码简化
- **Flyway** - 数据库版本管理
---
## 📋 环境要求
| 组件 | 版本要求 | 说明 |
|-------|---------|--------------|
| Java | 17+ | 支持最新语法特性 |
| Maven | 3.6+ | 依赖管理工具 |
| MySQL | 8.0+ | 业务数据库 |
| Redis | 5.0+ | 缓存和分布式锁 |
---
## 🚀 快速开始
### 1. 克隆项目
```bash
git clone https://github.com/your-repo/webflux-rsocket-running.git
cd webflux-rsocket-running
```
### 2. 配置数据库
修改 `src/main/resources/application.yml`:
```yaml
spring:
# R2DBC MySQL 配置
r2dbc:
url: r2dbc:mysql://localhost:3306/your_database?useSSL=false
username: your_username
password: your_password
# Redis 配置
data:
redis:
host: localhost
port: 6379
password: your_password # 如果没有密码则留空
```
### 3. 初始化数据库
项目使用 Flyway 自动管理数据库版本,首次启动会自动创建表结构。
### 4. 启动应用
```bash
# 使用 Maven 启动
mvn spring-boot:run
# 或者先打包再运行
mvn clean package
java -jar target/webflux-rsocket-running-1.0.0.jar
```
### 5. 验证服务
```bash
# 健康检查
curl http://localhost:8088/actuator/health
# 用户登录
curl -X POST http://localhost:8088/auth/login \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"password123"}'
```
**服务端口**:
- HTTP 服务:`http://localhost:8088`
- RSocket 服务:`tcp://localhost:8089`
---
## 📡 API 文档
### REST API 接口
#### 认证接口
```bash
POST /auth/login # 用户登录
```
#### 订单接口
```bash
GET /api/orders/user/{userId} # 获取用户订单列表
GET /api/orders/number/{orderNumber} # 根据订单号查询
GET /api/orders/timeout # 获取超时待支付订单
POST /api/orders # 创建订单
POST /api/orders/{orderId}/pay # 支付订单
POST /api/orders/{orderId}/cancel # 取消订单
POST /api/orders/cancel-timeout # 批量取消超时订单
```
#### 商品接口
```bash
GET /api/products/{id} # 获取商品详情
GET /api/products/available # 获取上架商品列表
GET /api/products/category/{categoryId} # 根据分类获取商品
GET /api/products/price-range # 价格区间查询
GET /api/products/search # 搜索商品
GET /api/products/top-selling # 获取热门商品
GET /api/products/{id}/stock-check # 检查库存
GET /api/products/low-stock # 获取库存预警商品
POST /api/products # 创建商品
PUT /api/products/{id} # 更新商品
DELETE /api/products/{id} # 删除商品
```
### RSocket API 接口
```bash
notification.subscribe # 订阅实时通知(Request-Stream)
notification.send # 发送单条通知(Fire-and-Forget)
notification.batch # 批量发送通知(Request-Channel)
notification.broadcast # 广播系统消息(Request-Response)
notification.check-online # 检查用户在线状态(Request-Response)
notification.online-count # 获取在线用户数(Request-Response)
```
### 监控接口
```bash
GET /actuator/health # 健康检查
GET /actuator/metrics # 指标列表
GET /actuator/prometheus # Prometheus 格式指标
GET /actuator/info # 应用信息
```
---
## 🏗️ 项目结构
```
webflux-rsocket-running/
├── src/main/java/com/running/
│ ├── controller/ # HTTP 控制器
│ │ ├── AuthController.java # 认证控制器
│ │ ├── OrderController.java # 订单控制器
│ │ ├── ProductController.java # 商品控制器
│ │ └── UserController.java # 用户控制器
│ ├── rsocket/ # RSocket 模块
│ │ ├── controller/ # RSocket 控制器
│ │ │ └── NotificationRSocketController.java
│ │ ├── service/ # RSocket 服务
│ │ │ ├── RSocketNotificationService.java
│ │ │ └── RSocketConnectionManager.java
│ │ ├── event/ # 事件
│ │ │ ├── OrderStatusChangedEvent.java
│ │ │ └── NotificationEventListener.java
│ │ └── config/
│ │ └── RSocketConfig.java # RSocket 配置
│ ├── service/ # 业务服务
│ │ ├── auth/
│ │ │ └── AuthService.java # 认证服务
│ │ ├── OrderService.java # 订单服务
│ │ ├── ProductService.java # 商品服务
│ │ ├── UserService.java # 用户服务
│ │ └── NotificationService.java # 通知服务
│ ├── repository/ # 数据访问层
│ │ ├── OrderRepository.java
│ │ ├── ProductRepository.java
│ │ ├── UserRepository.java
│ │ └── NotificationRepository.java
│ ├── model/ # 数据模型
│ │ ├── entity/ # 实体类
│ │ ├── dto/ # 数据传输对象
│ │ └── req/ # 请求对象
│ ├── frame/ # 框架配置
│ │ ├── config/
│ │ │ ├── security/ # 安全配置
│ │ │ │ ├── SecurityConfig.java
│ │ │ │ ├── JwtAuthenticationManager.java
│ │ │ │ ├── JwtAuthenticationToken.java
│ │ │ │ └── JwtServerAuthenticationConverter.java
│ │ │ ├── cache/ # 缓存配置
│ │ │ │ ├── ReactiveCache.java
│ │ │ │ └── RedisConfig.java
│ │ │ ├── RateLimitConfig.java # 限流配置
│ │ │ └── SysConfig.java # 系统配置
│ │ ├── filter/
│ │ │ └── RateLimitWebFilter.java # 限流过滤器
│ │ ├── monitoring/ # 监控模块
│ │ │ ├── BusinessMetricsService.java
│ │ │ ├── CustomHealthIndicator.java
│ │ │ └── MetricsConfig.java
│ │ └── exception/ # 异常处理
│ │ ├── BusinessException.java
│ │ └── GlobalExceptionHandler.java
│ ├── util/
│ │ └── JwtUtil.java # JWT 工具类
│ └── WebfluxRunningApplication.java # 启动类
├── src/main/resources/
│ ├── application.yml # 应用配置
│ └── docs/ # 文档
│ ├── WebFlux教程.md
│ └── RSocket教程.md
└── pom.xml # Maven 配置
```
---
## 📚 学习资源
项目内置了详细的学习文档:
- 📘 [WebFlux 教程](src/main/resources/docs/WebFlux教程.md) - WebFlux 核心概念和最佳实践
- 📗 [RSocket 教程](src/main/resources/docs/RSocket教程.md) - RSocket 协议和使用指南
---
## 🔧 配置说明
### JWT 配置
```yaml
jwt:
secret: "your-secret-key-at-least-32-characters" # JWT 密钥(至少32字符)
expiration: 864000 # 过期时间(秒),默认10天
```
### 缓存配置
```yaml
cache:
default-ttl: PT1H # 默认缓存过期时间(ISO-8601格式,PT1H = 1小时)
```
### RSocket 配置
```yaml
spring:
rsocket:
server:
port: 8089 # RSocket 服务端口
transport: tcp # 传输协议(tcp/websocket)
```
### 监控配置
```yaml
management:
endpoints:
web:
exposure:
include: health,info,metrics,prometheus
endpoint:
health:
show-details: always # 显示详细健康信息
```
---
## 🧪 测试
### 单元测试
```bash
mvn test
```
### RSocket 集成测试
项目包含完整的 RSocket 集成测试:
```bash
# 运行 RSocket 集成测试
mvn test -Dtest=RSocketIntegrationTest
# 运行优化后的 RSocket 测试
mvn test -Dtest=OptimizedRSocketTest
```
---
## 📈 性能特点
### 响应式编程优势
1. **高并发处理能力**
- 少量线程处理大量请求
- 异步非阻塞,避免线程等待
2. **资源利用率高**
- 不为每个请求分配线程
- 内存占用更低
3. **背压控制**
- 自动调节数据流速
- 防止系统过载
### RSocket 优势
1. **双向通信**
- 客户端和服务端均可主动推送
- 比 WebSocket 更强大
2. **多种通信模式**
- Request-Response
- Fire-and-Forget
- Request-Stream
- Request-Channel
3. **连接复用**
- 单个连接多路复用
- 减少连接开销
4. **断线重连**
- 自动重连机制
- 保证服务可用性
---
## 🎓 最佳实践
### 1. 避免阻塞操作
```java
// ❌ 错误:在响应式链中执行阻塞操作
public Mono getUser(Long id) {
User user = userRepository.findById(id).block(); // 阻塞!
return Mono.just(user);
}
// ✅ 正确:全程响应式
public Mono getUser(Long id) {
return userRepository.findById(id);
}
```
### 2. 合理使用调度器
```java
// 对于必须的阻塞操作,使用专用的调度器
public Mono readFile(String path) {
return Mono.fromCallable(() -> {
// 阻塞的文件读取
return Files.readString(Path.of(path));
}).subscribeOn(Schedulers.boundedElastic()); // 使用弹性调度器
}
```
### 3. 错误处理
```java
// 完善的错误处理
public Mono createOrder(OrderRequest request) {
return validateRequest(request)
.flatMap(this::processOrder)
.onErrorResume(ValidationException.class, e ->
Mono.error(new BusinessException("参数校验失败", e)))
.onErrorResume(e ->
Mono.error(new BusinessException("订单创建失败", e)))
.doOnError(e -> log.error("创建订单失败", e));
}
```
### 4. 资源清理
```java
// 确保资源正确清理
public Flux streamData() {
return dataSource.stream()
.doFinally(signalType -> {
// 无论成功、失败还是取消,都会执行清理
cleanup();
});
}
```
---
## 🚀 生产部署建议
### 1. JVM 参数优化
```bash
java -jar app.jar \
-Xms2g -Xmx2g \ # 堆内存
-XX:+UseG1GC \ # 使用 G1 垃圾回收器
-XX:MaxGCPauseMillis=200 \ # 最大 GC 停顿时间
-XX:+HeapDumpOnOutOfMemoryError \ # OOM 时生成堆转储
-XX:HeapDumpPath=/tmp/heapdump.hprof
```
### 2. 反应式线程池配置
```yaml
spring:
reactor:
schedulers:
bounded-elastic:
max-threads: 100 # 最大线程数
max-queued-tasks: 10000 # 最大队列任务数
```
### 3. 监控告警
- 配置 Prometheus + Grafana 监控面板
- 设置关键指标告警(错误率、响应时间、在线用户数)
- 配置日志聚合(ELK Stack)
### 4. 数据库连接池
```yaml
spring:
r2dbc:
pool:
initial-size: 10 # 初始连接数
max-size: 50 # 最大连接数
max-idle-time: 30m # 最大空闲时间
validation-query: SELECT 1
```
---
## ❓ 常见问题
### Q1: 为什么选择 RSocket 而不是 WebSocket?
**A**: RSocket 相比 WebSocket 有以下优势:
- 四种通信模式(WebSocket 只有双向流)
- 内置背压控制
- 更好的断线重连机制
- 连接复用和多路复用
- 更适合微服务间通信
### Q2: 响应式编程的学习曲线陡峭吗?
**A**: 确实有一定学习成本,但项目提供了:
- 完整的代码示例
- 详细的注释说明
- 最佳实践指导
- 常见错误避免
建议先理解 Mono/Flux 的基本概念,再逐步深入。
### Q3: 如何调试响应式代码?
**A**:
- 使用 `.log()` 操作符打印流日志
- 使用 Reactor Debug Agent 增强错误堆栈
- 使用 BlockHound 检测阻塞调用
- 善用 IDE 的响应式流调试功能
### Q4: 项目适合什么样的场景?
**A**:
- ✅ 高并发场景
- ✅ 实时通信需求
- ✅ I/O 密集型应用
- ✅ 微服务架构
- ❌ CPU 密集型计算
---
## 🤝 贡献指南
欢迎提交 Issue 和 Pull Request!
### 提交规范
```
feat: 新功能
fix: 修复 Bug
docs: 文档更新
style: 代码格式调整
refactor: 重构
test: 测试相关
chore: 构建/工具相关
```
---
## 📄 许可证
本项目采用 [MIT](LICENSE) 许可证。
---
## 👨💻 作者
**coderwing**
如有问题或建议,欢迎通过 Issue 联系。
---
## 🌟 致谢
感谢以下开源项目:
- [Spring Framework](https://spring.io/)
- [Project Reactor](https://projectreactor.io/)
- [RSocket](https://rsocket.io/)
- [R2DBC](https://r2dbc.io/)
---
⭐ 如果这个项目对你有帮助,请给个 Star!⭐