From f9486cd628dafa159d05ee3b0b1e6868790d42db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E9=BE=99=E5=B0=91?= Date: Wed, 12 Oct 2022 11:58:49 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E6=8D=95=E8=8E=B7=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deploy.sh | 2 +- .../core/network/mqtt/MqttService.java | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/deploy.sh b/deploy.sh index 57305fc..6ea63cf 100755 --- a/deploy.sh +++ b/deploy.sh @@ -4,7 +4,7 @@ # ./deploy upload 生成 maven jar 包并 commit set -e -VERSION="3.5.9" +VERSION="3.5.10-SNAPSHOT" echo "版本号:$VERSION" updateVersion() { diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java index 1c6a58d..19339de 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java @@ -320,15 +320,27 @@ public class MqttService { int waitTime = iotOsConfig.getMqttConfig().getConnectTimeout() * 1000; // 优先发送注册信息 - sendRegisterMessage(waitTime); + try { + sendRegisterMessage(waitTime); + } catch (Exception e) { + log.error(e.getMessage(), e); + } // 然后发送添加拓扑信息 - sendAddTopoMessage(waitTime); + try { + sendAddTopoMessage(waitTime); + } catch (Exception e) { + log.error(e.getMessage(), e); + } // 处理完了登录和拓扑 // 发送其他数据 - sendOtherMessage(); + try { + sendOtherMessage(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } } log.warn("收到打断信号,停止发送消息"); -- Gitee From cdfa8055f5a96343fcfdd869cdb5a541c9ee5fa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E9=BE=99=E5=B0=91?= Date: Wed, 12 Oct 2022 12:00:59 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E9=98=9F=E5=88=97=E9=A2=84=E8=AD=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 杜龙少 --- .../hekr/iotos/softgateway/core/network/mqtt/MqttService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java index 19339de..c17bf6e 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java @@ -87,7 +87,7 @@ public class MqttService { () -> { checkAndLogQueueSize(registerQueue, 2, "register"); checkAndLogQueueSize(addTopoQueue, 2, "topo"); - checkAndLogQueueSize(queue, iotOsConfig.getKlinkQueueSize(), "klink"); + checkAndLogQueueSize(queue, 100, "klink"); }, 0, 3, -- Gitee From 94b17c833805f12af1dcbeac7e14c708918b57e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E9=BE=99=E5=B0=91?= Date: Wed, 12 Oct 2022 12:33:46 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E9=85=8D=E7=BD=AE=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E7=9A=84=E5=9C=A8=E7=A6=BB=E7=BA=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 杜龙少 --- example/pom.xml | 2 +- framework-common-utils/pom.xml | 2 +- framework-core/pom.xml | 2 +- .../core/config/DeviceRemoteConfig.java | 14 ++++++++------ .../softgateway/core/network/mqtt/MqttService.java | 14 ++++++-------- framework-network-common/pom.xml | 2 +- framework-network-http/pom.xml | 2 +- framework-network-mqtt/pom.xml | 4 ++-- framework-network-tcp/pom.xml | 2 +- framework-network-udp/pom.xml | 2 +- pom.xml | 2 +- 11 files changed, 24 insertions(+), 24 deletions(-) diff --git a/example/pom.xml b/example/pom.xml index c3e5337..c1874f8 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10-SNAPSHOT 8 diff --git a/framework-common-utils/pom.xml b/framework-common-utils/pom.xml index 04ecc30..eeb709e 100644 --- a/framework-common-utils/pom.xml +++ b/framework-common-utils/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10-SNAPSHOT 4.0.0 diff --git a/framework-core/pom.xml b/framework-core/pom.xml index 07fb96b..3d914b3 100644 --- a/framework-core/pom.xml +++ b/framework-core/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10-SNAPSHOT 4.0.0 jar diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java index cb1c52b..9e8e9c4 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java @@ -81,9 +81,13 @@ public class DeviceRemoteConfig implements Serializable { public static void update(DeviceRemoteConfig d) { synchronized (SET) { - remove(d); - add(d); - log.info("after update: {}", getStatus()); + Optional optConfig = SET.stream().filter(e -> e.equals(d)).findAny(); + + if (optConfig.isPresent()) { + optConfig.get().data = d.data; + } else { + add(d); + } } } @@ -91,9 +95,7 @@ public class DeviceRemoteConfig implements Serializable { Objects.requireNonNull(d, "deviceRemoteConfig is null"); Objects.requireNonNull(d.getPk(), "pk is null"); Objects.requireNonNull(d.getDevId(), "devId is null"); - SET.add(d); - log.info("after add: {}", getStatus()); } public static void remove(DeviceRemoteConfig d) { @@ -119,7 +121,7 @@ public class DeviceRemoteConfig implements Serializable { } public static String getStatus() { - return "size: " + size() + ", devices: " + getAllSubDevices(); + return "deviceRemoteConfig size: " + size(); } /** diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java index c17bf6e..8c8faa5 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java @@ -20,8 +20,6 @@ import me.hekr.iotos.softgateway.core.config.IotOsConfig; import me.hekr.iotos.softgateway.core.config.MqttConfig; import me.hekr.iotos.softgateway.core.enums.Action; import me.hekr.iotos.softgateway.core.klink.AddTopo; -import me.hekr.iotos.softgateway.core.klink.DevLogin; -import me.hekr.iotos.softgateway.core.klink.DevLogout; import me.hekr.iotos.softgateway.core.klink.DevSend; import me.hekr.iotos.softgateway.core.klink.Klink; import me.hekr.iotos.softgateway.core.klink.KlinkDev; @@ -407,7 +405,7 @@ public class MqttService { private void trySend(KlinkDev klink) { if (log.isDebugEnabled()) { - log.debug("尝试发送MQTT:{}", JsonUtil.toJson(klink)); + log.debug("尝试发送到MQTT:{}", JsonUtil.toJson(klink)); } String pk = klink.getPk(); @@ -427,22 +425,22 @@ public class MqttService { return; } + Action action = Action.of(klink.getAction()); // 报错就重试 for (int i = 0; i < MAX_RETRY_COUNT; i++) { try { doPublish(klink); // 发送成功,如果是发送在线,则设置为在线 - if (klink instanceof DevLogin && dev.isOffline()) { + if (action == Action.DEV_LOGIN && dev.isOffline()) { dev.setOnline(); - return; + break; } - if (klink instanceof DevLogout && dev.isOnline()) { + if (action == Action.DEV_LOGOUT && dev.isOnline()) { dev.setOffline(); - return; + break; } - break; } catch (MqttException e) { log.error("mqtt发布报错:" + e.getMessage() + ",第 " + (i + 1) + "次重试", e); ThreadUtil.sleep(1000); diff --git a/framework-network-common/pom.xml b/framework-network-common/pom.xml index 0eee8ef..a1de591 100644 --- a/framework-network-common/pom.xml +++ b/framework-network-common/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10-SNAPSHOT 4.0.0 diff --git a/framework-network-http/pom.xml b/framework-network-http/pom.xml index 98964d9..be57d73 100644 --- a/framework-network-http/pom.xml +++ b/framework-network-http/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10-SNAPSHOT 4.0.0 framework-network-http diff --git a/framework-network-mqtt/pom.xml b/framework-network-mqtt/pom.xml index 05c3385..2fdbbbd 100644 --- a/framework-network-mqtt/pom.xml +++ b/framework-network-mqtt/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10-SNAPSHOT 4.0.0 @@ -24,7 +24,7 @@ me.hekr.iotos.softgateway framework-network-common - 3.5.9 + 3.5.10-SNAPSHOT \ No newline at end of file diff --git a/framework-network-tcp/pom.xml b/framework-network-tcp/pom.xml index 86f5614..c599ab7 100644 --- a/framework-network-tcp/pom.xml +++ b/framework-network-tcp/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10-SNAPSHOT 4.0.0 diff --git a/framework-network-udp/pom.xml b/framework-network-udp/pom.xml index 514546e..380181e 100644 --- a/framework-network-udp/pom.xml +++ b/framework-network-udp/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10-SNAPSHOT 4.0.0 diff --git a/pom.xml b/pom.xml index e276cef..45426f0 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ me.hekr.iotos.softgateway iotos-soft-gateway - 3.5.9 + 3.5.10-SNAPSHOT ${project.groupId}:${project.artifactId} IoTOS 软网关 https://gitee.com/geekhekr/iotos-soft-gateway -- Gitee From 348a9e9f0639bb60cc6a57297617f4d89d717597 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E9=BE=99=E5=B0=91?= Date: Wed, 12 Oct 2022 15:18:56 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E6=97=A5=E5=BF=97=E7=BA=A7=E5=88=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../me/hekr/iotos/softgateway/core/klink/KlinkService.java | 2 +- .../iotos/softgateway/core/network/mqtt/MqttService.java | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java index d14dbc6..f13c752 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java @@ -443,7 +443,7 @@ public class KlinkService { Objects.requireNonNull(props, "mapper.getProps 必须不为 null"); Optional subsystemDev = DeviceRemoteConfig.getBySubSystemProperties(props); if (!subsystemDev.isPresent()) { - log.warn("没有配置映射设备信息,请在远程配置中进行配置,设备信息:{}", props); + log.debug("没有配置映射设备信息,请在远程配置中进行配置,设备信息:{}", props); } return subsystemDev; diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java index 8c8faa5..cd4aff1 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java @@ -88,7 +88,7 @@ public class MqttService { checkAndLogQueueSize(queue, 100, "klink"); }, 0, - 3, + 60, TimeUnit.SECONDS); if (iotOsConfig.getMqttConfig().isDataChanged() @@ -139,8 +139,8 @@ public class MqttService { private void checkAndLogQueueSize(Queue queue, int threadhole, String type) { int size = queue.size(); - if (log.isTraceEnabled()) { - log.trace(type + " 队列还有 {} 个", size); + if (log.isInfoEnabled()) { + log.info(type + " 队列还有 {} 个", size); } if (size > threadhole) { -- Gitee From 1e12a5361ff143e4e4f70b574451a2044b000d03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E9=BE=99=E5=B0=91?= Date: Wed, 12 Oct 2022 17:48:16 +0800 Subject: [PATCH 5/9] =?UTF-8?q?fix=20=E9=87=8D=E5=A4=8D=E5=8F=91=E9=80=81g?= =?UTF-8?q?etConfig?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 杜龙少 --- .../core/network/mqtt/MqttService.java | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java index cd4aff1..1c47e6e 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java @@ -97,10 +97,10 @@ public class MqttService { dataFullSendExecutor = Executors.newSingleThreadScheduledExecutor( - ThreadUtil.newNamedThreadFactory("dataFullSendExecutor", true)); + ThreadUtil.newNamedThreadFactory("checkKlinkQueueStatus", true)); dataFullSendExecutor.scheduleWithFixedDelay( () -> sendAllDeviceModelParams(iotOsConfig), - 60, + iotOsConfig.getMqttConfig().getDataFullInterval(), iotOsConfig.getMqttConfig().getDataFullInterval(), TimeUnit.SECONDS); } else { @@ -139,9 +139,7 @@ public class MqttService { private void checkAndLogQueueSize(Queue queue, int threadhole, String type) { int size = queue.size(); - if (log.isInfoEnabled()) { - log.info(type + " 队列还有 {} 个", size); - } + log.info(type + " 队列还有 {} 个", size); if (size > threadhole) { log.warn(type + " 队列未及时消费,还有: {} 个记录", size); @@ -432,15 +430,16 @@ public class MqttService { doPublish(klink); // 发送成功,如果是发送在线,则设置为在线 - if (action == Action.DEV_LOGIN && dev.isOffline()) { - dev.setOnline(); - break; - } - - if (action == Action.DEV_LOGOUT && dev.isOnline()) { - dev.setOffline(); - break; + switch (action) { + case DEV_LOGIN: + dev.setOnline(); + break; + case DEV_LOGOUT: + dev.setOffline(); + break; + default: } + break; } catch (MqttException e) { log.error("mqtt发布报错:" + e.getMessage() + ",第 " + (i + 1) + "次重试", e); ThreadUtil.sleep(1000); -- Gitee From cc7c68c34c51dd4ac3bfdc414b4f916b0298efdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E9=BE=99=E5=B0=91?= Date: Wed, 12 Oct 2022 18:17:22 +0800 Subject: [PATCH 6/9] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E6=97=B6=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 杜龙少 --- .../core/config/ResourcesConfig.java | 10 +++---- .../core/listener/SpringEventListener.java | 26 +++++++++++++++++++ .../core/network/mqtt/MqttCallBackImpl.java | 2 +- .../core/network/mqtt/MqttService.java | 2 +- 4 files changed, 33 insertions(+), 7 deletions(-) create mode 100644 framework-core/src/main/java/me/hekr/iotos/softgateway/core/listener/SpringEventListener.java diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java index 4f6dce9..f787fb0 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java @@ -1,6 +1,5 @@ package me.hekr.iotos.softgateway.core.config; -import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import me.hekr.iotos.softgateway.common.utils.ThreadPoolUtil; @@ -8,15 +7,16 @@ import me.hekr.iotos.softgateway.core.network.mqtt.MqttService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -/** @author iotos */ +/** + * @author iotos + */ @Slf4j @Component public class ResourcesConfig { @Autowired private MqttService mqttService; - @PostConstruct public void init() { - mqttService.init(); + mqttService.start(); } @PreDestroy @@ -24,7 +24,7 @@ public class ResourcesConfig { try { mqttService.close(); } catch (Exception e) { - log.error(e.getMessage(),e); + log.error(e.getMessage(), e); } ThreadPoolUtil.close(); diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/listener/SpringEventListener.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/listener/SpringEventListener.java new file mode 100644 index 0000000..a600784 --- /dev/null +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/listener/SpringEventListener.java @@ -0,0 +1,26 @@ +package me.hekr.iotos.softgateway.core.listener; + +import cn.hutool.core.thread.ThreadUtil; +import lombok.extern.slf4j.Slf4j; +import me.hekr.iotos.softgateway.core.config.ResourcesConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class SpringEventListener { + @Autowired private ResourcesConfig resourcesConfig; + + @EventListener(value = {ApplicationReadyEvent.class}) + public void onApplicationStartedEvent(ApplicationReadyEvent event) { + log.info("应用启动成功, 开始链接MQTT,初始化业务流程"); + // 只是为了看清楚日志 + for (int i = 0; i < 5; i++) { + log.info("业务初始化中..."); + ThreadUtil.safeSleep(1000); + } + resourcesConfig.init(); + } +} diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java index 788d467..4dcc204 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java @@ -53,7 +53,7 @@ public class MqttCallBackImpl implements MqttCallback { // 连接丢失后,一般在这里面进行重连 log.warn("驱动已经连接断开,准备开始重连, cause: ", cause.getCause()); triggerConnectionLost(); - mqttService.init(); + mqttService.start(); } private void triggerConnectionLost() { diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java index 1c47e6e..973c824 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java @@ -222,7 +222,7 @@ public class MqttService { } } - public void init() { + public void start() { connectExecutor.execute(this::loopConnect); } -- Gitee From 40386ae7d1c2fc69d8effbca1a5528a42c385c64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E9=BE=99=E5=B0=91?= Date: Wed, 12 Oct 2022 18:32:28 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 杜龙少 --- framework-core/pom.xml | 6 ++++++ .../core/network/mqtt/MqttService.java | 19 +++++++++++++++++-- 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/framework-core/pom.xml b/framework-core/pom.xml index 3d914b3..b694adf 100644 --- a/framework-core/pom.xml +++ b/framework-core/pom.xml @@ -38,5 +38,11 @@ framework-common-utils + + com.google.guava + guava + 31.1-jre + + \ No newline at end of file diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java index 973c824..0dcbe18 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java @@ -1,15 +1,19 @@ package me.hekr.iotos.softgateway.core.network.mqtt; import cn.hutool.core.thread.ThreadUtil; +import com.google.common.base.Stopwatch; import java.util.List; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -44,6 +48,7 @@ import org.springframework.stereotype.Component; @Slf4j @Component public class MqttService { + private static final int CUSTOM_CODE_SPENT_SECONDS = 10; private static final int MAX_RETRY_COUNT = 3; private final Object registerLock = new Object(); private final Object addTopoLock = new Object(); @@ -179,8 +184,18 @@ public class MqttService { client.subscribe(iotOsConfig.getMqttConfig().getSubscribeTopic(), 0); ThreadUtil.safeSleep(1000); - triggerConnectedListeners(); - + log.info("开始执行 triggerConnectedListeners"); + Stopwatch stopWatch = Stopwatch.createStarted(); + CompletableFuture future = CompletableFuture.runAsync(this::triggerConnectedListeners); + try { + future.get(CUSTOM_CODE_SPENT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + log.warn("执行 triggerConnectedListeners 太长,请将业务逻辑放在异步处理"); + } + stopWatch.stop(); + log.info("执行结束 triggerConnectedListeners 耗时 :{}", stopWatch); } catch (Exception e) { log.error("软件网关连接失败!" + e.getMessage(), e); if (e instanceof MqttSecurityException) { -- Gitee From 7f984cde1e39afa8dff595f35d42e3635713f03f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E9=BE=99=E5=B0=91?= Date: Mon, 17 Oct 2022 15:03:10 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E4=BC=98=E5=8C=96cloudSend=20=E8=AF=B4?= =?UTF-8?q?=E6=98=8E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- RELEASE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RELEASE b/RELEASE index 787715f..69e4fbd 100644 --- a/RELEASE +++ b/RELEASE @@ -10,4 +10,4 @@ 2022-04-09 -1. 允许配置 cloudSendResp 是否自动回复,`connect.mqtt.autoCloudSendResp` 默认 true,自动回复,设置为 false \ No newline at end of file +1. 允许配置 cloudSendResp 是否自动回复,`connect.mqtt.autoCloudSendResp` 默认 true,自动回复; 设置为 false 需要手动回复 \ No newline at end of file -- Gitee From 34a87d7a4c46f88d32889f5da61169efe966e136 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=9C=E9=BE=99=E5=B0=91?= Date: Thu, 3 Nov 2022 14:36:42 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E5=8D=87=E7=BA=A7=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E5=88=B0=203.5.10?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- deploy.sh | 2 +- example/pom.xml | 2 +- framework-common-utils/pom.xml | 2 +- framework-core/pom.xml | 2 +- framework-network-common/pom.xml | 2 +- framework-network-http/pom.xml | 2 +- framework-network-mqtt/pom.xml | 4 ++-- framework-network-tcp/pom.xml | 2 +- framework-network-udp/pom.xml | 2 +- pom.xml | 2 +- 10 files changed, 11 insertions(+), 11 deletions(-) diff --git a/deploy.sh b/deploy.sh index 6ea63cf..3f54b34 100755 --- a/deploy.sh +++ b/deploy.sh @@ -4,7 +4,7 @@ # ./deploy upload 生成 maven jar 包并 commit set -e -VERSION="3.5.10-SNAPSHOT" +VERSION="3.5.10" echo "版本号:$VERSION" updateVersion() { diff --git a/example/pom.xml b/example/pom.xml index c1874f8..2cf9def 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.10-SNAPSHOT + 3.5.10 8 diff --git a/framework-common-utils/pom.xml b/framework-common-utils/pom.xml index eeb709e..ff766ee 100644 --- a/framework-common-utils/pom.xml +++ b/framework-common-utils/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.10-SNAPSHOT + 3.5.10 4.0.0 diff --git a/framework-core/pom.xml b/framework-core/pom.xml index b694adf..62635a0 100644 --- a/framework-core/pom.xml +++ b/framework-core/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.10-SNAPSHOT + 3.5.10 4.0.0 jar diff --git a/framework-network-common/pom.xml b/framework-network-common/pom.xml index a1de591..1d73de4 100644 --- a/framework-network-common/pom.xml +++ b/framework-network-common/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.10-SNAPSHOT + 3.5.10 4.0.0 diff --git a/framework-network-http/pom.xml b/framework-network-http/pom.xml index be57d73..70ef8b4 100644 --- a/framework-network-http/pom.xml +++ b/framework-network-http/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.10-SNAPSHOT + 3.5.10 4.0.0 framework-network-http diff --git a/framework-network-mqtt/pom.xml b/framework-network-mqtt/pom.xml index 2fdbbbd..538c803 100644 --- a/framework-network-mqtt/pom.xml +++ b/framework-network-mqtt/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.10-SNAPSHOT + 3.5.10 4.0.0 @@ -24,7 +24,7 @@ me.hekr.iotos.softgateway framework-network-common - 3.5.10-SNAPSHOT + 3.5.10 \ No newline at end of file diff --git a/framework-network-tcp/pom.xml b/framework-network-tcp/pom.xml index c599ab7..fe474d5 100644 --- a/framework-network-tcp/pom.xml +++ b/framework-network-tcp/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.10-SNAPSHOT + 3.5.10 4.0.0 diff --git a/framework-network-udp/pom.xml b/framework-network-udp/pom.xml index 380181e..87213e7 100644 --- a/framework-network-udp/pom.xml +++ b/framework-network-udp/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.10-SNAPSHOT + 3.5.10 4.0.0 diff --git a/pom.xml b/pom.xml index 45426f0..62a146a 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ me.hekr.iotos.softgateway iotos-soft-gateway - 3.5.10-SNAPSHOT + 3.5.10 ${project.groupId}:${project.artifactId} IoTOS 软网关 https://gitee.com/geekhekr/iotos-soft-gateway -- Gitee