# webflux-rsocket-running **Repository Path**: coderwing/webflux-rsocket-running ## Basic Information - **Project Name**: webflux-rsocket-running - **Description**: WebFlux+RSocket - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2025-07-02 - **Last Updated**: 2025-10-25 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # WebFlux + RSocket 响应式全栈应用
![Spring Boot](https://img.shields.io/badge/Spring%20Boot-3.1.0-brightgreen.svg) ![Java](https://img.shields.io/badge/Java-17-orange.svg) ![WebFlux](https://img.shields.io/badge/WebFlux-Reactive-blue.svg) ![RSocket](https://img.shields.io/badge/RSocket-1.1.x-purple.svg) ![License](https://img.shields.io/badge/License-MIT-yellow.svg) 基于 Spring WebFlux + RSocket 的生产级响应式应用,集成完整的订单管理、实时通知、监控告警等企业级功能。
--- ## 📖 项目简介 这是一个展示**生产级响应式编程最佳实践**的完整项目,实现了从数据访问层到前端通信的全链路响应式架构。项目采用事件驱动设计,支持高并发、低延迟的实时通信场景。 ### 🎯 项目特点 - ✅ **完整业务场景**:订单管理、商品管理、用户认证等真实业务功能 - ✅ **RSocket 实时通信**:四种通信模式全覆盖(Request-Response、Fire-and-Forget、Request-Stream、Channel) - ✅ **响应式全栈**:从数据库到Web层全程无阻塞 - ✅ **事件驱动架构**:解耦业务逻辑,易于扩展 - ✅ **生产级特性**:缓存、限流、监控、异常处理一应俱全 - ✅ **可观测性**:集成 Prometheus 监控和健康检查 --- ## ✨ 核心功能 ### 🔐 1. 响应式安全认证 - **JWT Token 认证**:无状态的分布式认证方案 - **响应式安全过滤器**:基于 Spring Security Reactive - **密码加密**:BCrypt 安全加密 - **接口权限控制**:细粒度的访问控制 **接口示例**: ```bash # 用户登录 POST /auth/login Content-Type: application/json { "username": "admin", "password": "password123" } ``` ### 📦 2. 订单管理系统 完整的订单生命周期管理,展示复杂的响应式业务流程。 **核心功能**: - 订单创建(库存检查 → 价格计算 → 库存扣减 → 订单生成 → 通知发送) - 订单支付(状态校验 → 支付处理 → 状态更新 → 事件发布) - 订单取消(取消校验 → 库存恢复 → 状态变更 → 通知推送) - 超时订单自动取消(定时任务 → 批量处理) **接口示例**: ```bash # 创建订单 POST /api/orders Authorization: Bearer {token} # 支付订单 POST /api/orders/{orderId}/pay # 取消订单 POST /api/orders/{orderId}/cancel # 获取用户订单 GET /api/orders/user/{userId} # 批量取消超时订单 POST /api/orders/cancel-timeout?timeoutMinutes=30 ``` ### 🛍️ 3. 商品管理系统 **核心功能**: - 商品 CRUD 操作 - 库存管理(检查、扣减、恢复) - 商品搜索(关键词、价格区间、分类) - 热门商品排行 - 库存预警(低库存商品监控) **接口示例**: ```bash # 获取商品详情 GET /api/products/{id} # 商品搜索 GET /api/products/search?keyword=手机 # 价格区间查询 GET /api/products/price-range?minPrice=100&maxPrice=1000 # 获取热门商品 GET /api/products/top-selling?limit=10 # 库存检查 GET /api/products/{id}/stock-check?quantity=5 # 库存预警 GET /api/products/low-stock?threshold=10 ``` ### 🔔 4. RSocket 实时通知系统 这是项目的**核心亮点**,展示了 RSocket 四种通信模式的实际应用。 #### 4.1 Request-Stream(请求-流) 用户订阅实时通知,持续接收推送消息。 ```java // 客户端代码示例 rsocketRequester .route("notification.subscribe") .data(userId) .retrieveFlux(NotificationMessage.class) .subscribe(notification -> { System.out.println("收到通知: " + notification.getTitle()); }); ``` **特性**: - 自动心跳检测(每30秒) - 背压控制(限速10条/秒) - 缓冲区管理(最多100条消息) - 自动重试(失败重试3次) - 连接超时保护(30分钟) #### 4.2 Fire-and-Forget(发送不需响应) 高性能消息推送,适合不需要响应的场景。 ```java // 发送单条通知 rsocketRequester .route("notification.send") .data(notificationMessage) .send() .subscribe(); ``` #### 4.3 Request-Response(请求-响应) 系统消息广播,返回成功推送的用户数。 ```java // 系统广播 rsocketRequester .route("notification.broadcast") .data(systemMessage) .retrieveMono(Long.class) .subscribe(count -> { System.out.println("成功推送给 " + count + " 个用户"); }); ``` #### 4.4 Request-Channel(双向流) 批量消息处理,双向流式通信。 ```java // 批量发送通知 Flux messageFlux = Flux.just(msg1, msg2, msg3); rsocketRequester .route("notification.batch") .data(messageFlux) .retrieveFlux(String.class) .subscribe(result -> { System.out.println(result); // "✓ 用户 1001 推送成功" }); ``` **RSocket 路由清单**: | 路由 | 模式 | 功能描述 | |------------------------------|-----------------|--------------| | `notification.subscribe` | Request-Stream | 订阅实时通知 | | `notification.send` | Fire-and-Forget | 发送单条通知 | | `notification.batch` | Request-Channel | 批量发送通知 | | `notification.broadcast` | Request-Response| 广播系统消息 | | `notification.check-online` | Request-Response| 检查用户是否在线 | | `notification.online-count` | Request-Response| 获取在线用户数 | ### 💾 5. 多级缓存系统 **两层缓存架构**: - **L1 - Caffeine 本地缓存**:JVM 内存缓存,速度最快 - **L2 - Redis 分布式缓存**:跨实例共享,支持集群 **缓存策略**: ```java // 商品查询自动缓存(TTL 1小时) @ReactiveCacheable(cacheNames = "products", key = "#id") public Mono getProductById(Long id) { return productRepository.findById(id); } ``` ### 🛡️ 6. 三维度智能限流 基于 Bucket4j 实现的令牌桶限流算法。 **限流维度**: 1. **IP 限流**:防止单个 IP 恶意请求 2. **用户限流**:限制单个用户请求频率 3. **接口限流**:保护热点接口 **限流响应**: ```json { "error": "Rate limit exceeded", "message": "IP访问频率过高", "retryAfter": 5 } ``` ### 📊 7. 监控与指标 集成 **Micrometer + Prometheus**,实时收集业务指标。 **计数器指标**: - `orders.created` - 订单创建数 - `payments.success` - 支付成功数 - `payments.failure` - 支付失败数 - `rsocket.connections` - RSocket 连接数 - `notifications.sent` - 通知发送数 - `cache.hits` / `cache.misses` - 缓存命中/未命中 - `ratelimit.triggered` - 限流触发次数 **计时器指标**: - `order.processing.time` - 订单处理耗时 - `database.query.time` - 数据库查询耗时 - `cache.operation.time` - 缓存操作耗时 - `rsocket.message.time` - RSocket 消息处理耗时 **实时指标**: - `users.online` - 当前在线用户数 - `orders.pending` - 待处理订单数 - `products.low.stock` - 库存不足商品数 **访问监控端点**: ```bash # 健康检查 GET http://localhost:8088/actuator/health # Prometheus 指标 GET http://localhost:8088/actuator/prometheus # 应用信息 GET http://localhost:8088/actuator/info ``` ### 🎯 8. 事件驱动架构 使用 Spring 事件机制解耦业务逻辑。 **事件流**: ``` 订单服务 → 发布 OrderStatusChangedEvent → NotificationEventListener 监听 → RSocket 推送通知给用户 ``` **优点**: - Service 不依赖 RSocket 具体实现 - 易于单元测试 - 易于扩展其他通知方式(短信、邮件等) --- ## 🛠️ 技术栈 ### 核心框架 - **Spring Boot 3.1.0** - 基础框架 - **Spring WebFlux** - 响应式 Web - **Spring Security Reactive** - 响应式安全 - **Spring Data R2DBC** - 响应式数据访问 - **Spring RSocket** - RSocket 协议支持 ### 数据存储 - **MySQL 8.0+** - 业务数据存储 - **R2DBC MySQL** - 响应式数据库驱动 - **Redis 5.0+** - 分布式缓存 - **Lettuce** - 响应式 Redis 客户端 ### 中间件与工具 - **JWT (jjwt 0.11.5)** - 认证令牌 - **Bucket4j** - 分布式限流 - **Caffeine** - 本地缓存 - **Micrometer** - 指标收集 - **Prometheus** - 监控系统 - **Lombok** - 代码简化 - **Flyway** - 数据库版本管理 --- ## 📋 环境要求 | 组件 | 版本要求 | 说明 | |-------|---------|--------------| | Java | 17+ | 支持最新语法特性 | | Maven | 3.6+ | 依赖管理工具 | | MySQL | 8.0+ | 业务数据库 | | Redis | 5.0+ | 缓存和分布式锁 | --- ## 🚀 快速开始 ### 1. 克隆项目 ```bash git clone https://github.com/your-repo/webflux-rsocket-running.git cd webflux-rsocket-running ``` ### 2. 配置数据库 修改 `src/main/resources/application.yml`: ```yaml spring: # R2DBC MySQL 配置 r2dbc: url: r2dbc:mysql://localhost:3306/your_database?useSSL=false username: your_username password: your_password # Redis 配置 data: redis: host: localhost port: 6379 password: your_password # 如果没有密码则留空 ``` ### 3. 初始化数据库 项目使用 Flyway 自动管理数据库版本,首次启动会自动创建表结构。 ### 4. 启动应用 ```bash # 使用 Maven 启动 mvn spring-boot:run # 或者先打包再运行 mvn clean package java -jar target/webflux-rsocket-running-1.0.0.jar ``` ### 5. 验证服务 ```bash # 健康检查 curl http://localhost:8088/actuator/health # 用户登录 curl -X POST http://localhost:8088/auth/login \ -H "Content-Type: application/json" \ -d '{"username":"admin","password":"password123"}' ``` **服务端口**: - HTTP 服务:`http://localhost:8088` - RSocket 服务:`tcp://localhost:8089` --- ## 📡 API 文档 ### REST API 接口 #### 认证接口 ```bash POST /auth/login # 用户登录 ``` #### 订单接口 ```bash GET /api/orders/user/{userId} # 获取用户订单列表 GET /api/orders/number/{orderNumber} # 根据订单号查询 GET /api/orders/timeout # 获取超时待支付订单 POST /api/orders # 创建订单 POST /api/orders/{orderId}/pay # 支付订单 POST /api/orders/{orderId}/cancel # 取消订单 POST /api/orders/cancel-timeout # 批量取消超时订单 ``` #### 商品接口 ```bash GET /api/products/{id} # 获取商品详情 GET /api/products/available # 获取上架商品列表 GET /api/products/category/{categoryId} # 根据分类获取商品 GET /api/products/price-range # 价格区间查询 GET /api/products/search # 搜索商品 GET /api/products/top-selling # 获取热门商品 GET /api/products/{id}/stock-check # 检查库存 GET /api/products/low-stock # 获取库存预警商品 POST /api/products # 创建商品 PUT /api/products/{id} # 更新商品 DELETE /api/products/{id} # 删除商品 ``` ### RSocket API 接口 ```bash notification.subscribe # 订阅实时通知(Request-Stream) notification.send # 发送单条通知(Fire-and-Forget) notification.batch # 批量发送通知(Request-Channel) notification.broadcast # 广播系统消息(Request-Response) notification.check-online # 检查用户在线状态(Request-Response) notification.online-count # 获取在线用户数(Request-Response) ``` ### 监控接口 ```bash GET /actuator/health # 健康检查 GET /actuator/metrics # 指标列表 GET /actuator/prometheus # Prometheus 格式指标 GET /actuator/info # 应用信息 ``` --- ## 🏗️ 项目结构 ``` webflux-rsocket-running/ ├── src/main/java/com/running/ │ ├── controller/ # HTTP 控制器 │ │ ├── AuthController.java # 认证控制器 │ │ ├── OrderController.java # 订单控制器 │ │ ├── ProductController.java # 商品控制器 │ │ └── UserController.java # 用户控制器 │ ├── rsocket/ # RSocket 模块 │ │ ├── controller/ # RSocket 控制器 │ │ │ └── NotificationRSocketController.java │ │ ├── service/ # RSocket 服务 │ │ │ ├── RSocketNotificationService.java │ │ │ └── RSocketConnectionManager.java │ │ ├── event/ # 事件 │ │ │ ├── OrderStatusChangedEvent.java │ │ │ └── NotificationEventListener.java │ │ └── config/ │ │ └── RSocketConfig.java # RSocket 配置 │ ├── service/ # 业务服务 │ │ ├── auth/ │ │ │ └── AuthService.java # 认证服务 │ │ ├── OrderService.java # 订单服务 │ │ ├── ProductService.java # 商品服务 │ │ ├── UserService.java # 用户服务 │ │ └── NotificationService.java # 通知服务 │ ├── repository/ # 数据访问层 │ │ ├── OrderRepository.java │ │ ├── ProductRepository.java │ │ ├── UserRepository.java │ │ └── NotificationRepository.java │ ├── model/ # 数据模型 │ │ ├── entity/ # 实体类 │ │ ├── dto/ # 数据传输对象 │ │ └── req/ # 请求对象 │ ├── frame/ # 框架配置 │ │ ├── config/ │ │ │ ├── security/ # 安全配置 │ │ │ │ ├── SecurityConfig.java │ │ │ │ ├── JwtAuthenticationManager.java │ │ │ │ ├── JwtAuthenticationToken.java │ │ │ │ └── JwtServerAuthenticationConverter.java │ │ │ ├── cache/ # 缓存配置 │ │ │ │ ├── ReactiveCache.java │ │ │ │ └── RedisConfig.java │ │ │ ├── RateLimitConfig.java # 限流配置 │ │ │ └── SysConfig.java # 系统配置 │ │ ├── filter/ │ │ │ └── RateLimitWebFilter.java # 限流过滤器 │ │ ├── monitoring/ # 监控模块 │ │ │ ├── BusinessMetricsService.java │ │ │ ├── CustomHealthIndicator.java │ │ │ └── MetricsConfig.java │ │ └── exception/ # 异常处理 │ │ ├── BusinessException.java │ │ └── GlobalExceptionHandler.java │ ├── util/ │ │ └── JwtUtil.java # JWT 工具类 │ └── WebfluxRunningApplication.java # 启动类 ├── src/main/resources/ │ ├── application.yml # 应用配置 │ └── docs/ # 文档 │ ├── WebFlux教程.md │ └── RSocket教程.md └── pom.xml # Maven 配置 ``` --- ## 📚 学习资源 项目内置了详细的学习文档: - 📘 [WebFlux 教程](src/main/resources/docs/WebFlux教程.md) - WebFlux 核心概念和最佳实践 - 📗 [RSocket 教程](src/main/resources/docs/RSocket教程.md) - RSocket 协议和使用指南 --- ## 🔧 配置说明 ### JWT 配置 ```yaml jwt: secret: "your-secret-key-at-least-32-characters" # JWT 密钥(至少32字符) expiration: 864000 # 过期时间(秒),默认10天 ``` ### 缓存配置 ```yaml cache: default-ttl: PT1H # 默认缓存过期时间(ISO-8601格式,PT1H = 1小时) ``` ### RSocket 配置 ```yaml spring: rsocket: server: port: 8089 # RSocket 服务端口 transport: tcp # 传输协议(tcp/websocket) ``` ### 监控配置 ```yaml management: endpoints: web: exposure: include: health,info,metrics,prometheus endpoint: health: show-details: always # 显示详细健康信息 ``` --- ## 🧪 测试 ### 单元测试 ```bash mvn test ``` ### RSocket 集成测试 项目包含完整的 RSocket 集成测试: ```bash # 运行 RSocket 集成测试 mvn test -Dtest=RSocketIntegrationTest # 运行优化后的 RSocket 测试 mvn test -Dtest=OptimizedRSocketTest ``` --- ## 📈 性能特点 ### 响应式编程优势 1. **高并发处理能力** - 少量线程处理大量请求 - 异步非阻塞,避免线程等待 2. **资源利用率高** - 不为每个请求分配线程 - 内存占用更低 3. **背压控制** - 自动调节数据流速 - 防止系统过载 ### RSocket 优势 1. **双向通信** - 客户端和服务端均可主动推送 - 比 WebSocket 更强大 2. **多种通信模式** - Request-Response - Fire-and-Forget - Request-Stream - Request-Channel 3. **连接复用** - 单个连接多路复用 - 减少连接开销 4. **断线重连** - 自动重连机制 - 保证服务可用性 --- ## 🎓 最佳实践 ### 1. 避免阻塞操作 ```java // ❌ 错误:在响应式链中执行阻塞操作 public Mono getUser(Long id) { User user = userRepository.findById(id).block(); // 阻塞! return Mono.just(user); } // ✅ 正确:全程响应式 public Mono getUser(Long id) { return userRepository.findById(id); } ``` ### 2. 合理使用调度器 ```java // 对于必须的阻塞操作,使用专用的调度器 public Mono readFile(String path) { return Mono.fromCallable(() -> { // 阻塞的文件读取 return Files.readString(Path.of(path)); }).subscribeOn(Schedulers.boundedElastic()); // 使用弹性调度器 } ``` ### 3. 错误处理 ```java // 完善的错误处理 public Mono createOrder(OrderRequest request) { return validateRequest(request) .flatMap(this::processOrder) .onErrorResume(ValidationException.class, e -> Mono.error(new BusinessException("参数校验失败", e))) .onErrorResume(e -> Mono.error(new BusinessException("订单创建失败", e))) .doOnError(e -> log.error("创建订单失败", e)); } ``` ### 4. 资源清理 ```java // 确保资源正确清理 public Flux streamData() { return dataSource.stream() .doFinally(signalType -> { // 无论成功、失败还是取消,都会执行清理 cleanup(); }); } ``` --- ## 🚀 生产部署建议 ### 1. JVM 参数优化 ```bash java -jar app.jar \ -Xms2g -Xmx2g \ # 堆内存 -XX:+UseG1GC \ # 使用 G1 垃圾回收器 -XX:MaxGCPauseMillis=200 \ # 最大 GC 停顿时间 -XX:+HeapDumpOnOutOfMemoryError \ # OOM 时生成堆转储 -XX:HeapDumpPath=/tmp/heapdump.hprof ``` ### 2. 反应式线程池配置 ```yaml spring: reactor: schedulers: bounded-elastic: max-threads: 100 # 最大线程数 max-queued-tasks: 10000 # 最大队列任务数 ``` ### 3. 监控告警 - 配置 Prometheus + Grafana 监控面板 - 设置关键指标告警(错误率、响应时间、在线用户数) - 配置日志聚合(ELK Stack) ### 4. 数据库连接池 ```yaml spring: r2dbc: pool: initial-size: 10 # 初始连接数 max-size: 50 # 最大连接数 max-idle-time: 30m # 最大空闲时间 validation-query: SELECT 1 ``` --- ## ❓ 常见问题 ### Q1: 为什么选择 RSocket 而不是 WebSocket? **A**: RSocket 相比 WebSocket 有以下优势: - 四种通信模式(WebSocket 只有双向流) - 内置背压控制 - 更好的断线重连机制 - 连接复用和多路复用 - 更适合微服务间通信 ### Q2: 响应式编程的学习曲线陡峭吗? **A**: 确实有一定学习成本,但项目提供了: - 完整的代码示例 - 详细的注释说明 - 最佳实践指导 - 常见错误避免 建议先理解 Mono/Flux 的基本概念,再逐步深入。 ### Q3: 如何调试响应式代码? **A**: - 使用 `.log()` 操作符打印流日志 - 使用 Reactor Debug Agent 增强错误堆栈 - 使用 BlockHound 检测阻塞调用 - 善用 IDE 的响应式流调试功能 ### Q4: 项目适合什么样的场景? **A**: - ✅ 高并发场景 - ✅ 实时通信需求 - ✅ I/O 密集型应用 - ✅ 微服务架构 - ❌ CPU 密集型计算 --- ## 🤝 贡献指南 欢迎提交 Issue 和 Pull Request! ### 提交规范 ``` feat: 新功能 fix: 修复 Bug docs: 文档更新 style: 代码格式调整 refactor: 重构 test: 测试相关 chore: 构建/工具相关 ``` --- ## 📄 许可证 本项目采用 [MIT](LICENSE) 许可证。 --- ## 👨‍💻 作者 **coderwing** 如有问题或建议,欢迎通过 Issue 联系。 --- ## 🌟 致谢 感谢以下开源项目: - [Spring Framework](https://spring.io/) - [Project Reactor](https://projectreactor.io/) - [RSocket](https://rsocket.io/) - [R2DBC](https://r2dbc.io/) ---
⭐ 如果这个项目对你有帮助,请给个 Star!⭐