diff --git a/RELEASE b/RELEASE
index 787715f86742d00797e0120058580db099fedcc2..69e4fbdad43dc85a4c820c0845e01f6e86443a6b 100644
--- a/RELEASE
+++ b/RELEASE
@@ -10,4 +10,4 @@
2022-04-09
-1. 允许配置 cloudSendResp 是否自动回复,`connect.mqtt.autoCloudSendResp` 默认 true,自动回复,设置为 false
\ No newline at end of file
+1. 允许配置 cloudSendResp 是否自动回复,`connect.mqtt.autoCloudSendResp` 默认 true,自动回复; 设置为 false 需要手动回复
\ No newline at end of file
diff --git a/deploy.sh b/deploy.sh
index 57305fc87c1dde27531f2994bbcbc2c92c6555e3..3f54b3422bd73369ea2a1dad1bb45b02b50b7cc7 100755
--- a/deploy.sh
+++ b/deploy.sh
@@ -4,7 +4,7 @@
# ./deploy upload 生成 maven jar 包并 commit
set -e
-VERSION="3.5.9"
+VERSION="3.5.10"
echo "版本号:$VERSION"
updateVersion() {
diff --git a/example/pom.xml b/example/pom.xml
index c3e5337f6320efdec3a6ebde6681c7b7e6d8eac8..2cf9def22890f94f60123e4f6ea2b12844a212ea 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.9
+ 3.5.10
8
diff --git a/framework-common-utils/pom.xml b/framework-common-utils/pom.xml
index 04ecc304178f2f492efb06b3bc9488f76d92701e..ff766ee425e3b40920f5dfc106823c6166dbd474 100644
--- a/framework-common-utils/pom.xml
+++ b/framework-common-utils/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.9
+ 3.5.10
4.0.0
diff --git a/framework-core/pom.xml b/framework-core/pom.xml
index 07fb96b6a2be8b115bf3b3429f65c0140efac3cb..62635a09df35d7873c2067fc54c6dfab29ae27b1 100644
--- a/framework-core/pom.xml
+++ b/framework-core/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.9
+ 3.5.10
4.0.0
jar
@@ -38,5 +38,11 @@
framework-common-utils
+
+ com.google.guava
+ guava
+ 31.1-jre
+
+
\ No newline at end of file
diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java
index cb1c52b04ed5cc2e2abfa7efa8031226b7e7ced5..9e8e9c456c3a1ba3af13b9f5f2fd0d80f6b56ec8 100644
--- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java
+++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java
@@ -81,9 +81,13 @@ public class DeviceRemoteConfig implements Serializable {
public static void update(DeviceRemoteConfig d) {
synchronized (SET) {
- remove(d);
- add(d);
- log.info("after update: {}", getStatus());
+ Optional optConfig = SET.stream().filter(e -> e.equals(d)).findAny();
+
+ if (optConfig.isPresent()) {
+ optConfig.get().data = d.data;
+ } else {
+ add(d);
+ }
}
}
@@ -91,9 +95,7 @@ public class DeviceRemoteConfig implements Serializable {
Objects.requireNonNull(d, "deviceRemoteConfig is null");
Objects.requireNonNull(d.getPk(), "pk is null");
Objects.requireNonNull(d.getDevId(), "devId is null");
-
SET.add(d);
- log.info("after add: {}", getStatus());
}
public static void remove(DeviceRemoteConfig d) {
@@ -119,7 +121,7 @@ public class DeviceRemoteConfig implements Serializable {
}
public static String getStatus() {
- return "size: " + size() + ", devices: " + getAllSubDevices();
+ return "deviceRemoteConfig size: " + size();
}
/**
diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java
index 4f6dce979c558b2dd3db0b9b9238b6565f51595c..f787fb01a00abaf2cf885944da9e22ac8301e9cb 100644
--- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java
+++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java
@@ -1,6 +1,5 @@
package me.hekr.iotos.softgateway.core.config;
-import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import me.hekr.iotos.softgateway.common.utils.ThreadPoolUtil;
@@ -8,15 +7,16 @@ import me.hekr.iotos.softgateway.core.network.mqtt.MqttService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-/** @author iotos */
+/**
+ * @author iotos
+ */
@Slf4j
@Component
public class ResourcesConfig {
@Autowired private MqttService mqttService;
- @PostConstruct
public void init() {
- mqttService.init();
+ mqttService.start();
}
@PreDestroy
@@ -24,7 +24,7 @@ public class ResourcesConfig {
try {
mqttService.close();
} catch (Exception e) {
- log.error(e.getMessage(),e);
+ log.error(e.getMessage(), e);
}
ThreadPoolUtil.close();
diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java
index d14dbc60e08f09345e8da4182c2fdb07b1d30903..f13c752299c67033c6ba000945efbb6b8e0e4260 100644
--- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java
+++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java
@@ -443,7 +443,7 @@ public class KlinkService {
Objects.requireNonNull(props, "mapper.getProps 必须不为 null");
Optional subsystemDev = DeviceRemoteConfig.getBySubSystemProperties(props);
if (!subsystemDev.isPresent()) {
- log.warn("没有配置映射设备信息,请在远程配置中进行配置,设备信息:{}", props);
+ log.debug("没有配置映射设备信息,请在远程配置中进行配置,设备信息:{}", props);
}
return subsystemDev;
diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/listener/SpringEventListener.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/listener/SpringEventListener.java
new file mode 100644
index 0000000000000000000000000000000000000000..a60078445c80d169a9930fb3e0d41f09282c72dc
--- /dev/null
+++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/listener/SpringEventListener.java
@@ -0,0 +1,26 @@
+package me.hekr.iotos.softgateway.core.listener;
+
+import cn.hutool.core.thread.ThreadUtil;
+import lombok.extern.slf4j.Slf4j;
+import me.hekr.iotos.softgateway.core.config.ResourcesConfig;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class SpringEventListener {
+ @Autowired private ResourcesConfig resourcesConfig;
+
+ @EventListener(value = {ApplicationReadyEvent.class})
+ public void onApplicationStartedEvent(ApplicationReadyEvent event) {
+ log.info("应用启动成功, 开始链接MQTT,初始化业务流程");
+ // 只是为了看清楚日志
+ for (int i = 0; i < 5; i++) {
+ log.info("业务初始化中...");
+ ThreadUtil.safeSleep(1000);
+ }
+ resourcesConfig.init();
+ }
+}
diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java
index 788d46706ef2ab6d3bd67c0e8801e1bfdfe438a6..4dcc204fc0fddcaad1058131df23e6e9776fb6fd 100644
--- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java
+++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java
@@ -53,7 +53,7 @@ public class MqttCallBackImpl implements MqttCallback {
// 连接丢失后,一般在这里面进行重连
log.warn("驱动已经连接断开,准备开始重连, cause: ", cause.getCause());
triggerConnectionLost();
- mqttService.init();
+ mqttService.start();
}
private void triggerConnectionLost() {
diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java
index 1c6a58de64fd1169845bb34f054a395c721ffd45..0dcbe18c15e8311b29c69fa03de3f1453c3e4705 100644
--- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java
+++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java
@@ -1,15 +1,19 @@
package me.hekr.iotos.softgateway.core.network.mqtt;
import cn.hutool.core.thread.ThreadUtil;
+import com.google.common.base.Stopwatch;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -20,8 +24,6 @@ import me.hekr.iotos.softgateway.core.config.IotOsConfig;
import me.hekr.iotos.softgateway.core.config.MqttConfig;
import me.hekr.iotos.softgateway.core.enums.Action;
import me.hekr.iotos.softgateway.core.klink.AddTopo;
-import me.hekr.iotos.softgateway.core.klink.DevLogin;
-import me.hekr.iotos.softgateway.core.klink.DevLogout;
import me.hekr.iotos.softgateway.core.klink.DevSend;
import me.hekr.iotos.softgateway.core.klink.Klink;
import me.hekr.iotos.softgateway.core.klink.KlinkDev;
@@ -46,6 +48,7 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqttService {
+ private static final int CUSTOM_CODE_SPENT_SECONDS = 10;
private static final int MAX_RETRY_COUNT = 3;
private final Object registerLock = new Object();
private final Object addTopoLock = new Object();
@@ -87,10 +90,10 @@ public class MqttService {
() -> {
checkAndLogQueueSize(registerQueue, 2, "register");
checkAndLogQueueSize(addTopoQueue, 2, "topo");
- checkAndLogQueueSize(queue, iotOsConfig.getKlinkQueueSize(), "klink");
+ checkAndLogQueueSize(queue, 100, "klink");
},
0,
- 3,
+ 60,
TimeUnit.SECONDS);
if (iotOsConfig.getMqttConfig().isDataChanged()
@@ -99,10 +102,10 @@ public class MqttService {
dataFullSendExecutor =
Executors.newSingleThreadScheduledExecutor(
- ThreadUtil.newNamedThreadFactory("dataFullSendExecutor", true));
+ ThreadUtil.newNamedThreadFactory("checkKlinkQueueStatus", true));
dataFullSendExecutor.scheduleWithFixedDelay(
() -> sendAllDeviceModelParams(iotOsConfig),
- 60,
+ iotOsConfig.getMqttConfig().getDataFullInterval(),
iotOsConfig.getMqttConfig().getDataFullInterval(),
TimeUnit.SECONDS);
} else {
@@ -141,9 +144,7 @@ public class MqttService {
private void checkAndLogQueueSize(Queue> queue, int threadhole, String type) {
int size = queue.size();
- if (log.isTraceEnabled()) {
- log.trace(type + " 队列还有 {} 个", size);
- }
+ log.info(type + " 队列还有 {} 个", size);
if (size > threadhole) {
log.warn(type + " 队列未及时消费,还有: {} 个记录", size);
@@ -183,8 +184,18 @@ public class MqttService {
client.subscribe(iotOsConfig.getMqttConfig().getSubscribeTopic(), 0);
ThreadUtil.safeSleep(1000);
- triggerConnectedListeners();
-
+ log.info("开始执行 triggerConnectedListeners");
+ Stopwatch stopWatch = Stopwatch.createStarted();
+ CompletableFuture future = CompletableFuture.runAsync(this::triggerConnectedListeners);
+ try {
+ future.get(CUSTOM_CODE_SPENT_SECONDS, TimeUnit.SECONDS);
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ } catch (TimeoutException e) {
+ log.warn("执行 triggerConnectedListeners 太长,请将业务逻辑放在异步处理");
+ }
+ stopWatch.stop();
+ log.info("执行结束 triggerConnectedListeners 耗时 :{}", stopWatch);
} catch (Exception e) {
log.error("软件网关连接失败!" + e.getMessage(), e);
if (e instanceof MqttSecurityException) {
@@ -226,7 +237,7 @@ public class MqttService {
}
}
- public void init() {
+ public void start() {
connectExecutor.execute(this::loopConnect);
}
@@ -320,15 +331,27 @@ public class MqttService {
int waitTime = iotOsConfig.getMqttConfig().getConnectTimeout() * 1000;
// 优先发送注册信息
- sendRegisterMessage(waitTime);
+ try {
+ sendRegisterMessage(waitTime);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
// 然后发送添加拓扑信息
- sendAddTopoMessage(waitTime);
+ try {
+ sendAddTopoMessage(waitTime);
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
// 处理完了登录和拓扑
// 发送其他数据
- sendOtherMessage();
+ try {
+ sendOtherMessage();
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+ }
}
log.warn("收到打断信号,停止发送消息");
@@ -395,7 +418,7 @@ public class MqttService {
private void trySend(KlinkDev klink) {
if (log.isDebugEnabled()) {
- log.debug("尝试发送MQTT:{}", JsonUtil.toJson(klink));
+ log.debug("尝试发送到MQTT:{}", JsonUtil.toJson(klink));
}
String pk = klink.getPk();
@@ -415,20 +438,21 @@ public class MqttService {
return;
}
+ Action action = Action.of(klink.getAction());
// 报错就重试
for (int i = 0; i < MAX_RETRY_COUNT; i++) {
try {
doPublish(klink);
// 发送成功,如果是发送在线,则设置为在线
- if (klink instanceof DevLogin && dev.isOffline()) {
- dev.setOnline();
- return;
- }
-
- if (klink instanceof DevLogout && dev.isOnline()) {
- dev.setOffline();
- return;
+ switch (action) {
+ case DEV_LOGIN:
+ dev.setOnline();
+ break;
+ case DEV_LOGOUT:
+ dev.setOffline();
+ break;
+ default:
}
break;
} catch (MqttException e) {
diff --git a/framework-network-common/pom.xml b/framework-network-common/pom.xml
index 0eee8efe2a2bd8c202674884b9a0e6c8044124aa..1d73de4af9450d06359eaf203a359d2b26a29f5f 100644
--- a/framework-network-common/pom.xml
+++ b/framework-network-common/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.9
+ 3.5.10
4.0.0
diff --git a/framework-network-http/pom.xml b/framework-network-http/pom.xml
index 98964d955093a6fde9c5b6ae3b484be85f865af1..70ef8b46da90f94f15c087aa427d59dd66932dab 100644
--- a/framework-network-http/pom.xml
+++ b/framework-network-http/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.9
+ 3.5.10
4.0.0
framework-network-http
diff --git a/framework-network-mqtt/pom.xml b/framework-network-mqtt/pom.xml
index 05c3385f87dae2e79a9c0a116f9f234c8e16f9f1..538c80378e4d934be87ca58f5ae257bbfd8c1593 100644
--- a/framework-network-mqtt/pom.xml
+++ b/framework-network-mqtt/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.9
+ 3.5.10
4.0.0
@@ -24,7 +24,7 @@
me.hekr.iotos.softgateway
framework-network-common
- 3.5.9
+ 3.5.10
\ No newline at end of file
diff --git a/framework-network-tcp/pom.xml b/framework-network-tcp/pom.xml
index 86f56148bdb3f70d5a976d6c3c631df56583996d..fe474d5e3d761c9393bf4f8b5977e3a2c5ccd228 100644
--- a/framework-network-tcp/pom.xml
+++ b/framework-network-tcp/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.9
+ 3.5.10
4.0.0
diff --git a/framework-network-udp/pom.xml b/framework-network-udp/pom.xml
index 514546e3c0fbba09df6de2923d12ba77116f40be..87213e7ed3d8b606773f65b23374787b27be59a0 100644
--- a/framework-network-udp/pom.xml
+++ b/framework-network-udp/pom.xml
@@ -5,7 +5,7 @@
iotos-soft-gateway
me.hekr.iotos.softgateway
- 3.5.9
+ 3.5.10
4.0.0
diff --git a/pom.xml b/pom.xml
index e276cefad27709ae1a146c0d1304907a1208ba85..62a146a48a5a0e324bcb6d8eb66c4cb849ca1ef1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,7 +18,7 @@
me.hekr.iotos.softgateway
iotos-soft-gateway
- 3.5.9
+ 3.5.10
${project.groupId}:${project.artifactId}
IoTOS 软网关
https://gitee.com/geekhekr/iotos-soft-gateway