diff --git a/deploy.sh b/deploy.sh index 56a4798384c3df707280b2b2c820b553d813039f..12266a95ebca261deada1b0961668459778d6759 100755 --- a/deploy.sh +++ b/deploy.sh @@ -4,7 +4,7 @@ # ./deploy upload 生成 maven jar 包并 commit set -e -VERSION="3.5.14" +VERSION="3.5.15" echo "版本号:$VERSION" updateVersion() { diff --git a/example/pom.xml b/example/pom.xml index 8086795bb55f1b36f23cfc2115732ba5256d5be6..fa15da7b3e216921610e1d7ce8de1259b4e859ce 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.14 + 3.5.15 8 diff --git a/framework-common-utils/pom.xml b/framework-common-utils/pom.xml index 7a64d10b01d170ed7c00c87e4d79bfa091d3aa7d..fbcd91a020dbb15dd2bfc20a65b8cea344fa77cf 100644 --- a/framework-common-utils/pom.xml +++ b/framework-common-utils/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.14 + 3.5.15 4.0.0 diff --git a/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/NumberUtil.java b/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/NumberUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..6e9e10d80254f25b5b16e145446b5c903c111ca5 --- /dev/null +++ b/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/NumberUtil.java @@ -0,0 +1,27 @@ +package me.hekr.iotos.softgateway.common.utils; + +/** + * @author du + */ +public class NumberUtil extends cn.hutool.core.util.NumberUtil { + public static Double getOrDefault0(Double value) { + if (value == null || value.isNaN() || value.isInfinite()) { + return 0D; + } + return value; + } + + public static Integer getOrDefault0(Integer value) { + if (value == null) { + return 0; + } + return value; + } + + public static Long getOrDefault0(Long value) { + if (value == null) { + return 0L; + } + return value; + } +} diff --git a/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/TimedCachedBlockingQueue.java b/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/TimedCachedBlockingQueue.java new file mode 100644 index 0000000000000000000000000000000000000000..6b33da769da8a1094b40e20c8e798562440156df --- /dev/null +++ b/framework-common-utils/src/main/java/me/hekr/iotos/softgateway/common/utils/TimedCachedBlockingQueue.java @@ -0,0 +1,168 @@ +package me.hekr.iotos.softgateway.common.utils; + +import cn.hutool.core.lang.Assert; +import cn.hutool.core.thread.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; + +/** + * 定时, 固定缓存大小的queue。可以实现批量操作 + * + *

没到时间先缓存到Bag里面,Bag 有大小。 如果Bag 满则 打包发送到 Queue。 + * + *

Bag 没满但是有值,到时间也会被发送到Queue中 + * + * @author du + */ +@Slf4j +public class TimedCachedBlockingQueue { + + private final int queueSize; + private final String name; + private ScheduledExecutorService scheduledExecutorService; + private final BlockingDeque> bagQueue; + private final int batchSize; + private Bag bag; + /** + * 总数量为 batchSize * queueSize + * + * @param interval 定时周期 + * @param timeUnit 定时单位 + * @param batchSize 批量缓存大小 + * @param queueSize 队列大小 + */ + public TimedCachedBlockingQueue( + String name, int interval, TimeUnit timeUnit, int batchSize, int queueSize) { + this.name = name; + Assert.isTrue(interval > 0, "interval 必须大于0"); + Assert.notNull(timeUnit, "timeUnit 不能为null"); + Assert.isTrue(interval > 0, "batchSize 必须大于0"); + this.batchSize = batchSize; + this.queueSize = queueSize; + this.bagQueue = new LinkedBlockingDeque<>(queueSize); + scheduledExecutorService = + new ScheduledThreadPoolExecutor( + 1, + new ThreadFactoryBuilder() + .setNamePrefix("TimedCachedBlockingQueue-") + .setDaemon(true) + .build(), + new AbortPolicy()); + scheduledExecutorService.scheduleAtFixedRate( + () -> { + try { + checkAndPutBag(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + }, + interval, + interval, + timeUnit); + } + + /** 监测背包,如果不为null,说明有元素,入队 */ + private synchronized void checkAndPutBag() { + if (bagQueue.size() == queueSize) { + log.warn("TimedCachedBlockingQueue: {}, 队列满({}),建议加快消费或者加大队列长度", name, queueSize); + } + + if (log.isDebugEnabled()) { + log.debug( + "name: {} check and put bag into queue, current bag: {} , bag size: {}", + name, + bagQueue.size() + 1, + bag == null ? 0 : bag.getSize()); + } + if (bag != null) { + boolean offer = bagQueue.offer(bag); + if (offer) { + bag = null; + } + } + } + + /** + * 放入元素(阻塞) + * + * @param t + * @return + * @throws InterruptedException + */ + public synchronized TimedCachedBlockingQueue put(T t) throws InterruptedException { + if (bag == null) { + bag = new Bag<>(batchSize); + } + + boolean bagOffer = bag.offer(t); + boolean isFull = !bagOffer || bag.isFull(); + // 背包满了 + if (isFull) { + bagQueue.put(bag); + } + + return this; + } + + /** + * 拉取背包(注意该方法不能使用 synchronized, 会和 put 方法发生死锁) + * + * @param timeout + * @param timeUnit + * @return + * @throws InterruptedException + */ + public Bag poll(long timeout, TimeUnit timeUnit) throws InterruptedException { + Bag tBag = bagQueue.poll(timeout, timeUnit); + return tBag; + } + + /** + * 背包 + * + * @param + */ + public static class Bag { + private final ArrayBlockingQueue bagItemQueue; + private final int capacity; + + public Bag(int capacity) { + bagItemQueue = new ArrayBlockingQueue<>(capacity); + this.capacity = capacity; + } + + public boolean offer(T t) { + return bagItemQueue.offer(t); + } + + public boolean isFull() { + return bagItemQueue.size() == capacity; + } + + public List getAll() { + if (bagItemQueue.isEmpty()) { + return Collections.emptyList(); + } + + List list = new ArrayList<>(); + while (!bagItemQueue.isEmpty()) { + list.add(bagItemQueue.poll()); + } + + return list; + } + + public int getSize() { + return bagItemQueue.size(); + } + } +} diff --git a/framework-core/pom.xml b/framework-core/pom.xml index b3814cbbf6a9143fd0eb105620c1b3529d2e2d30..5a3745c5ffbd69e6a31782986e81287c33b12a8b 100644 --- a/framework-core/pom.xml +++ b/framework-core/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.14 + 3.5.15 4.0.0 jar diff --git a/framework-network-common/pom.xml b/framework-network-common/pom.xml index 5c0b1c2c80ef9da130e87951f1c165b7174ff2db..2d5b093984999d6eb43ee9d29c16270ee59f8153 100644 --- a/framework-network-common/pom.xml +++ b/framework-network-common/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.14 + 3.5.15 4.0.0 diff --git a/framework-network-http/pom.xml b/framework-network-http/pom.xml index 001691716e0e1b8d792d76944c9442ea9b7027d3..0fadcfc0359dc90f2f878a13f5c2a068d0f0494e 100644 --- a/framework-network-http/pom.xml +++ b/framework-network-http/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.14 + 3.5.15 4.0.0 framework-network-http diff --git a/framework-network-mqtt/pom.xml b/framework-network-mqtt/pom.xml index b37226242003c671c94a7a7ee52528c4f6dc5256..00acbc3619b03348ae885b2e878091d630cc4b59 100644 --- a/framework-network-mqtt/pom.xml +++ b/framework-network-mqtt/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.14 + 3.5.15 4.0.0 @@ -24,7 +24,7 @@ me.hekr.iotos.softgateway framework-network-common - 3.5.14 + 3.5.15 \ No newline at end of file diff --git a/framework-network-tcp/pom.xml b/framework-network-tcp/pom.xml index 7badc09fc085e09a424485d2531a891fc64ead40..784613e79b48cf6322a2adbe693a09d46f0c1951 100644 --- a/framework-network-tcp/pom.xml +++ b/framework-network-tcp/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.14 + 3.5.15 4.0.0 diff --git a/framework-network-udp/pom.xml b/framework-network-udp/pom.xml index 6f750364ec7ba2b2411f3a783cba24cff43771a0..aa898d5dd282b032cccf922bbba67e954dd546e0 100644 --- a/framework-network-udp/pom.xml +++ b/framework-network-udp/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.14 + 3.5.15 4.0.0 diff --git a/pom.xml b/pom.xml index f0c728ad7fcf292fe24f8b591886cb02fd28c54c..f76065cd2a687623f3f6e3f1f5acd1c83c10905f 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ me.hekr.iotos.softgateway iotos-soft-gateway - 3.5.14 + 3.5.15 ${project.groupId}:${project.artifactId} IoTOS 软网关 https://gitee.com/geekhekr/iotos-soft-gateway