8
diff --git a/framework-common-utils/pom.xml b/framework-common-utils/pom.xml
index 7a64d10b01d170ed7c00c87e4d79bfa091d3aa7d..fbcd91a020dbb15dd2bfc20a65b8cea344fa77cf 100644
--- a/framework-common-utils/pom.xml
+++ b/framework-common-utils/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.14
+ 3.5.15
4.0.0
diff --git a/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/NumberUtil.java b/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/NumberUtil.java
new file mode 100644
index 0000000000000000000000000000000000000000..6e9e10d80254f25b5b16e145446b5c903c111ca5
--- /dev/null
+++ b/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/NumberUtil.java
@@ -0,0 +1,27 @@
+package me.hekr.iotos.softgateway.common.utils;
+
+/**
+ * @author du
+ */
+public class NumberUtil extends cn.hutool.core.util.NumberUtil {
+ public static Double getOrDefault0(Double value) {
+ if (value == null || value.isNaN() || value.isInfinite()) {
+ return 0D;
+ }
+ return value;
+ }
+
+ public static Integer getOrDefault0(Integer value) {
+ if (value == null) {
+ return 0;
+ }
+ return value;
+ }
+
+ public static Long getOrDefault0(Long value) {
+ if (value == null) {
+ return 0L;
+ }
+ return value;
+ }
+}
diff --git a/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/TimedCachedBlockingQueue.java b/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/TimedCachedBlockingQueue.java
new file mode 100644
index 0000000000000000000000000000000000000000..6b33da769da8a1094b40e20c8e798562440156df
--- /dev/null
+++ b/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/TimedCachedBlockingQueue.java
@@ -0,0 +1,168 @@
+package me.hekr.iotos.softgateway.common.utils;
+
+import cn.hutool.core.lang.Assert;
+import cn.hutool.core.thread.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 定时, 固定缓存大小的queue。可以实现批量操作
+ *
+ * 没到时间先缓存到Bag里面,Bag 有大小。 如果Bag 满则 打包发送到 Queue。
+ *
+ *
Bag 没满但是有值,到时间也会被发送到Queue中
+ *
+ * @author du
+ */
+@Slf4j
+public class TimedCachedBlockingQueue {
+
+ private final int queueSize;
+ private final String name;
+ private ScheduledExecutorService scheduledExecutorService;
+ private final BlockingDeque> bagQueue;
+ private final int batchSize;
+ private Bag bag;
+ /**
+ * 总数量为 batchSize * queueSize
+ *
+ * @param interval 定时周期
+ * @param timeUnit 定时单位
+ * @param batchSize 批量缓存大小
+ * @param queueSize 队列大小
+ */
+ public TimedCachedBlockingQueue(
+ String name, int interval, TimeUnit timeUnit, int batchSize, int queueSize) {
+ this.name = name;
+ Assert.isTrue(interval > 0, "interval 必须大于0");
+ Assert.notNull(timeUnit, "timeUnit 不能为null");
+ Assert.isTrue(interval > 0, "batchSize 必须大于0");
+ this.batchSize = batchSize;
+ this.queueSize = queueSize;
+ this.bagQueue = new LinkedBlockingDeque<>(queueSize);
+ scheduledExecutorService =
+ new ScheduledThreadPoolExecutor(
+ 1,
+ new ThreadFactoryBuilder()
+ .setNamePrefix("TimedCachedBlockingQueue-")
+ .setDaemon(true)
+ .build(),
+ new AbortPolicy());
+ scheduledExecutorService.scheduleAtFixedRate(
+ () -> {
+ try {
+ checkAndPutBag();
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
+ },
+ interval,
+ interval,
+ timeUnit);
+ }
+
+ /** 监测背包,如果不为null,说明有元素,入队 */
+ private synchronized void checkAndPutBag() {
+ if (bagQueue.size() == queueSize) {
+ log.warn("TimedCachedBlockingQueue: {}, 队列满({}),建议加快消费或者加大队列长度", name, queueSize);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "name: {} check and put bag into queue, current bag: {} , bag size: {}",
+ name,
+ bagQueue.size() + 1,
+ bag == null ? 0 : bag.getSize());
+ }
+ if (bag != null) {
+ boolean offer = bagQueue.offer(bag);
+ if (offer) {
+ bag = null;
+ }
+ }
+ }
+
+ /**
+ * 放入元素(阻塞)
+ *
+ * @param t
+ * @return
+ * @throws InterruptedException
+ */
+ public synchronized TimedCachedBlockingQueue put(T t) throws InterruptedException {
+ if (bag == null) {
+ bag = new Bag<>(batchSize);
+ }
+
+ boolean bagOffer = bag.offer(t);
+ boolean isFull = !bagOffer || bag.isFull();
+ // 背包满了
+ if (isFull) {
+ bagQueue.put(bag);
+ }
+
+ return this;
+ }
+
+ /**
+ * 拉取背包(注意该方法不能使用 synchronized, 会和 put 方法发生死锁)
+ *
+ * @param timeout
+ * @param timeUnit
+ * @return
+ * @throws InterruptedException
+ */
+ public Bag poll(long timeout, TimeUnit timeUnit) throws InterruptedException {
+ Bag tBag = bagQueue.poll(timeout, timeUnit);
+ return tBag;
+ }
+
+ /**
+ * 背包
+ *
+ * @param
+ */
+ public static class Bag {
+ private final ArrayBlockingQueue bagItemQueue;
+ private final int capacity;
+
+ public Bag(int capacity) {
+ bagItemQueue = new ArrayBlockingQueue<>(capacity);
+ this.capacity = capacity;
+ }
+
+ public boolean offer(T t) {
+ return bagItemQueue.offer(t);
+ }
+
+ public boolean isFull() {
+ return bagItemQueue.size() == capacity;
+ }
+
+ public List getAll() {
+ if (bagItemQueue.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List list = new ArrayList<>();
+ while (!bagItemQueue.isEmpty()) {
+ list.add(bagItemQueue.poll());
+ }
+
+ return list;
+ }
+
+ public int getSize() {
+ return bagItemQueue.size();
+ }
+ }
+}
diff --git a/framework-core/pom.xml b/framework-core/pom.xml
index b3814cbbf6a9143fd0eb105620c1b3529d2e2d30..5a3745c5ffbd69e6a31782986e81287c33b12a8b 100644
--- a/framework-core/pom.xml
+++ b/framework-core/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.14
+ 3.5.15
4.0.0
jar
diff --git a/framework-network-common/pom.xml b/framework-network-common/pom.xml
index 5c0b1c2c80ef9da130e87951f1c165b7174ff2db..2d5b093984999d6eb43ee9d29c16270ee59f8153 100644
--- a/framework-network-common/pom.xml
+++ b/framework-network-common/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.14
+ 3.5.15
4.0.0
diff --git a/framework-network-http/pom.xml b/framework-network-http/pom.xml
index 001691716e0e1b8d792d76944c9442ea9b7027d3..0fadcfc0359dc90f2f878a13f5c2a068d0f0494e 100644
--- a/framework-network-http/pom.xml
+++ b/framework-network-http/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.14
+ 3.5.15
4.0.0
framework-network-http
diff --git a/framework-network-mqtt/pom.xml b/framework-network-mqtt/pom.xml
index b37226242003c671c94a7a7ee52528c4f6dc5256..00acbc3619b03348ae885b2e878091d630cc4b59 100644
--- a/framework-network-mqtt/pom.xml
+++ b/framework-network-mqtt/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.14
+ 3.5.15
4.0.0
@@ -24,7 +24,7 @@
me.hekr.iotos.softgateway
framework-network-common
- 3.5.14
+ 3.5.15
\ No newline at end of file
diff --git a/framework-network-tcp/pom.xml b/framework-network-tcp/pom.xml
index 7badc09fc085e09a424485d2531a891fc64ead40..784613e79b48cf6322a2adbe693a09d46f0c1951 100644
--- a/framework-network-tcp/pom.xml
+++ b/framework-network-tcp/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.14
+ 3.5.15
4.0.0
diff --git a/framework-network-udp/pom.xml b/framework-network-udp/pom.xml
index 6f750364ec7ba2b2411f3a783cba24cff43771a0..aa898d5dd282b032cccf922bbba67e954dd546e0 100644
--- a/framework-network-udp/pom.xml
+++ b/framework-network-udp/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.14
+ 3.5.15
4.0.0
diff --git a/pom.xml b/pom.xml
index f0c728ad7fcf292fe24f8b591886cb02fd28c54c..f76065cd2a687623f3f6e3f1f5acd1c83c10905f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,7 @@
me.hekr.iotos.softgateway
iotos-soft-gateway
- 3.5.14
+ 3.5.15
${project.groupId}:${project.artifactId}
IoTOS 软网关
https://gitee.com/geekhekr/iotos-soft-gateway