diff --git a/RELEASE b/RELEASE index 787715f86742d00797e0120058580db099fedcc2..69e4fbdad43dc85a4c820c0845e01f6e86443a6b 100644 --- a/RELEASE +++ b/RELEASE @@ -10,4 +10,4 @@ 2022-04-09 -1. 允许配置 cloudSendResp 是否自动回复,`connect.mqtt.autoCloudSendResp` 默认 true,自动回复,设置为 false \ No newline at end of file +1. 允许配置 cloudSendResp 是否自动回复,`connect.mqtt.autoCloudSendResp` 默认 true,自动回复; 设置为 false 需要手动回复 \ No newline at end of file diff --git a/deploy.sh b/deploy.sh index 57305fc87c1dde27531f2994bbcbc2c92c6555e3..3f54b3422bd73369ea2a1dad1bb45b02b50b7cc7 100755 --- a/deploy.sh +++ b/deploy.sh @@ -4,7 +4,7 @@ # ./deploy upload 生成 maven jar 包并 commit set -e -VERSION="3.5.9" +VERSION="3.5.10" echo "版本号:$VERSION" updateVersion() { diff --git a/example/pom.xml b/example/pom.xml index c3e5337f6320efdec3a6ebde6681c7b7e6d8eac8..2cf9def22890f94f60123e4f6ea2b12844a212ea 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10 8 diff --git a/framework-common-utils/pom.xml b/framework-common-utils/pom.xml index 04ecc304178f2f492efb06b3bc9488f76d92701e..ff766ee425e3b40920f5dfc106823c6166dbd474 100644 --- a/framework-common-utils/pom.xml +++ b/framework-common-utils/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10 4.0.0 diff --git a/framework-core/pom.xml b/framework-core/pom.xml index 07fb96b6a2be8b115bf3b3429f65c0140efac3cb..62635a09df35d7873c2067fc54c6dfab29ae27b1 100644 --- a/framework-core/pom.xml +++ b/framework-core/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10 4.0.0 jar @@ -38,5 +38,11 @@ framework-common-utils + + com.google.guava + guava + 31.1-jre + + \ No newline at end of file diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java index cb1c52b04ed5cc2e2abfa7efa8031226b7e7ced5..9e8e9c456c3a1ba3af13b9f5f2fd0d80f6b56ec8 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/DeviceRemoteConfig.java @@ -81,9 +81,13 @@ public class DeviceRemoteConfig implements Serializable { public static void update(DeviceRemoteConfig d) { synchronized (SET) { - remove(d); - add(d); - log.info("after update: {}", getStatus()); + Optional optConfig = SET.stream().filter(e -> e.equals(d)).findAny(); + + if (optConfig.isPresent()) { + optConfig.get().data = d.data; + } else { + add(d); + } } } @@ -91,9 +95,7 @@ public class DeviceRemoteConfig implements Serializable { Objects.requireNonNull(d, "deviceRemoteConfig is null"); Objects.requireNonNull(d.getPk(), "pk is null"); Objects.requireNonNull(d.getDevId(), "devId is null"); - SET.add(d); - log.info("after add: {}", getStatus()); } public static void remove(DeviceRemoteConfig d) { @@ -119,7 +121,7 @@ public class DeviceRemoteConfig implements Serializable { } public static String getStatus() { - return "size: " + size() + ", devices: " + getAllSubDevices(); + return "deviceRemoteConfig size: " + size(); } /** diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java index 4f6dce979c558b2dd3db0b9b9238b6565f51595c..f787fb01a00abaf2cf885944da9e22ac8301e9cb 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/config/ResourcesConfig.java @@ -1,6 +1,5 @@ package me.hekr.iotos.softgateway.core.config; -import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import me.hekr.iotos.softgateway.common.utils.ThreadPoolUtil; @@ -8,15 +7,16 @@ import me.hekr.iotos.softgateway.core.network.mqtt.MqttService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -/** @author iotos */ +/** + * @author iotos + */ @Slf4j @Component public class ResourcesConfig { @Autowired private MqttService mqttService; - @PostConstruct public void init() { - mqttService.init(); + mqttService.start(); } @PreDestroy @@ -24,7 +24,7 @@ public class ResourcesConfig { try { mqttService.close(); } catch (Exception e) { - log.error(e.getMessage(),e); + log.error(e.getMessage(), e); } ThreadPoolUtil.close(); diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java index d14dbc60e08f09345e8da4182c2fdb07b1d30903..f13c752299c67033c6ba000945efbb6b8e0e4260 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/klink/KlinkService.java @@ -443,7 +443,7 @@ public class KlinkService { Objects.requireNonNull(props, "mapper.getProps 必须不为 null"); Optional subsystemDev = DeviceRemoteConfig.getBySubSystemProperties(props); if (!subsystemDev.isPresent()) { - log.warn("没有配置映射设备信息,请在远程配置中进行配置,设备信息:{}", props); + log.debug("没有配置映射设备信息,请在远程配置中进行配置,设备信息:{}", props); } return subsystemDev; diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/listener/SpringEventListener.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/listener/SpringEventListener.java new file mode 100644 index 0000000000000000000000000000000000000000..a60078445c80d169a9930fb3e0d41f09282c72dc --- /dev/null +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/listener/SpringEventListener.java @@ -0,0 +1,26 @@ +package me.hekr.iotos.softgateway.core.listener; + +import cn.hutool.core.thread.ThreadUtil; +import lombok.extern.slf4j.Slf4j; +import me.hekr.iotos.softgateway.core.config.ResourcesConfig; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class SpringEventListener { + @Autowired private ResourcesConfig resourcesConfig; + + @EventListener(value = {ApplicationReadyEvent.class}) + public void onApplicationStartedEvent(ApplicationReadyEvent event) { + log.info("应用启动成功, 开始链接MQTT,初始化业务流程"); + // 只是为了看清楚日志 + for (int i = 0; i < 5; i++) { + log.info("业务初始化中..."); + ThreadUtil.safeSleep(1000); + } + resourcesConfig.init(); + } +} diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java index 788d46706ef2ab6d3bd67c0e8801e1bfdfe438a6..4dcc204fc0fddcaad1058131df23e6e9776fb6fd 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttCallBackImpl.java @@ -53,7 +53,7 @@ public class MqttCallBackImpl implements MqttCallback { // 连接丢失后,一般在这里面进行重连 log.warn("驱动已经连接断开,准备开始重连, cause: ", cause.getCause()); triggerConnectionLost(); - mqttService.init(); + mqttService.start(); } private void triggerConnectionLost() { diff --git a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java index 1c6a58de64fd1169845bb34f054a395c721ffd45..0dcbe18c15e8311b29c69fa03de3f1453c3e4705 100644 --- a/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java +++ b/framework-core/src/main/java/me/hekr/iotos/softgateway/core/network/mqtt/MqttService.java @@ -1,15 +1,19 @@ package me.hekr.iotos.softgateway.core.network.mqtt; import cn.hutool.core.thread.ThreadUtil; +import com.google.common.base.Stopwatch; import java.util.List; import java.util.Optional; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -20,8 +24,6 @@ import me.hekr.iotos.softgateway.core.config.IotOsConfig; import me.hekr.iotos.softgateway.core.config.MqttConfig; import me.hekr.iotos.softgateway.core.enums.Action; import me.hekr.iotos.softgateway.core.klink.AddTopo; -import me.hekr.iotos.softgateway.core.klink.DevLogin; -import me.hekr.iotos.softgateway.core.klink.DevLogout; import me.hekr.iotos.softgateway.core.klink.DevSend; import me.hekr.iotos.softgateway.core.klink.Klink; import me.hekr.iotos.softgateway.core.klink.KlinkDev; @@ -46,6 +48,7 @@ import org.springframework.stereotype.Component; @Slf4j @Component public class MqttService { + private static final int CUSTOM_CODE_SPENT_SECONDS = 10; private static final int MAX_RETRY_COUNT = 3; private final Object registerLock = new Object(); private final Object addTopoLock = new Object(); @@ -87,10 +90,10 @@ public class MqttService { () -> { checkAndLogQueueSize(registerQueue, 2, "register"); checkAndLogQueueSize(addTopoQueue, 2, "topo"); - checkAndLogQueueSize(queue, iotOsConfig.getKlinkQueueSize(), "klink"); + checkAndLogQueueSize(queue, 100, "klink"); }, 0, - 3, + 60, TimeUnit.SECONDS); if (iotOsConfig.getMqttConfig().isDataChanged() @@ -99,10 +102,10 @@ public class MqttService { dataFullSendExecutor = Executors.newSingleThreadScheduledExecutor( - ThreadUtil.newNamedThreadFactory("dataFullSendExecutor", true)); + ThreadUtil.newNamedThreadFactory("checkKlinkQueueStatus", true)); dataFullSendExecutor.scheduleWithFixedDelay( () -> sendAllDeviceModelParams(iotOsConfig), - 60, + iotOsConfig.getMqttConfig().getDataFullInterval(), iotOsConfig.getMqttConfig().getDataFullInterval(), TimeUnit.SECONDS); } else { @@ -141,9 +144,7 @@ public class MqttService { private void checkAndLogQueueSize(Queue queue, int threadhole, String type) { int size = queue.size(); - if (log.isTraceEnabled()) { - log.trace(type + " 队列还有 {} 个", size); - } + log.info(type + " 队列还有 {} 个", size); if (size > threadhole) { log.warn(type + " 队列未及时消费,还有: {} 个记录", size); @@ -183,8 +184,18 @@ public class MqttService { client.subscribe(iotOsConfig.getMqttConfig().getSubscribeTopic(), 0); ThreadUtil.safeSleep(1000); - triggerConnectedListeners(); - + log.info("开始执行 triggerConnectedListeners"); + Stopwatch stopWatch = Stopwatch.createStarted(); + CompletableFuture future = CompletableFuture.runAsync(this::triggerConnectedListeners); + try { + future.get(CUSTOM_CODE_SPENT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + log.warn("执行 triggerConnectedListeners 太长,请将业务逻辑放在异步处理"); + } + stopWatch.stop(); + log.info("执行结束 triggerConnectedListeners 耗时 :{}", stopWatch); } catch (Exception e) { log.error("软件网关连接失败!" + e.getMessage(), e); if (e instanceof MqttSecurityException) { @@ -226,7 +237,7 @@ public class MqttService { } } - public void init() { + public void start() { connectExecutor.execute(this::loopConnect); } @@ -320,15 +331,27 @@ public class MqttService { int waitTime = iotOsConfig.getMqttConfig().getConnectTimeout() * 1000; // 优先发送注册信息 - sendRegisterMessage(waitTime); + try { + sendRegisterMessage(waitTime); + } catch (Exception e) { + log.error(e.getMessage(), e); + } // 然后发送添加拓扑信息 - sendAddTopoMessage(waitTime); + try { + sendAddTopoMessage(waitTime); + } catch (Exception e) { + log.error(e.getMessage(), e); + } // 处理完了登录和拓扑 // 发送其他数据 - sendOtherMessage(); + try { + sendOtherMessage(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } } log.warn("收到打断信号,停止发送消息"); @@ -395,7 +418,7 @@ public class MqttService { private void trySend(KlinkDev klink) { if (log.isDebugEnabled()) { - log.debug("尝试发送MQTT:{}", JsonUtil.toJson(klink)); + log.debug("尝试发送到MQTT:{}", JsonUtil.toJson(klink)); } String pk = klink.getPk(); @@ -415,20 +438,21 @@ public class MqttService { return; } + Action action = Action.of(klink.getAction()); // 报错就重试 for (int i = 0; i < MAX_RETRY_COUNT; i++) { try { doPublish(klink); // 发送成功,如果是发送在线,则设置为在线 - if (klink instanceof DevLogin && dev.isOffline()) { - dev.setOnline(); - return; - } - - if (klink instanceof DevLogout && dev.isOnline()) { - dev.setOffline(); - return; + switch (action) { + case DEV_LOGIN: + dev.setOnline(); + break; + case DEV_LOGOUT: + dev.setOffline(); + break; + default: } break; } catch (MqttException e) { diff --git a/framework-network-common/pom.xml b/framework-network-common/pom.xml index 0eee8efe2a2bd8c202674884b9a0e6c8044124aa..1d73de4af9450d06359eaf203a359d2b26a29f5f 100644 --- a/framework-network-common/pom.xml +++ b/framework-network-common/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10 4.0.0 diff --git a/framework-network-http/pom.xml b/framework-network-http/pom.xml index 98964d955093a6fde9c5b6ae3b484be85f865af1..70ef8b46da90f94f15c087aa427d59dd66932dab 100644 --- a/framework-network-http/pom.xml +++ b/framework-network-http/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10 4.0.0 framework-network-http diff --git a/framework-network-mqtt/pom.xml b/framework-network-mqtt/pom.xml index 05c3385f87dae2e79a9c0a116f9f234c8e16f9f1..538c80378e4d934be87ca58f5ae257bbfd8c1593 100644 --- a/framework-network-mqtt/pom.xml +++ b/framework-network-mqtt/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10 4.0.0 @@ -24,7 +24,7 @@ me.hekr.iotos.softgateway framework-network-common - 3.5.9 + 3.5.10 \ No newline at end of file diff --git a/framework-network-tcp/pom.xml b/framework-network-tcp/pom.xml index 86f56148bdb3f70d5a976d6c3c631df56583996d..fe474d5e3d761c9393bf4f8b5977e3a2c5ccd228 100644 --- a/framework-network-tcp/pom.xml +++ b/framework-network-tcp/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10 4.0.0 diff --git a/framework-network-udp/pom.xml b/framework-network-udp/pom.xml index 514546e3c0fbba09df6de2923d12ba77116f40be..87213e7ed3d8b606773f65b23374787b27be59a0 100644 --- a/framework-network-udp/pom.xml +++ b/framework-network-udp/pom.xml @@ -5,7 +5,7 @@ iotos-soft-gateway me.hekr.iotos.softgateway - 3.5.9 + 3.5.10 4.0.0 diff --git a/pom.xml b/pom.xml index e276cefad27709ae1a146c0d1304907a1208ba85..62a146a48a5a0e324bcb6d8eb66c4cb849ca1ef1 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ me.hekr.iotos.softgateway iotos-soft-gateway - 3.5.9 + 3.5.10 ${project.groupId}:${project.artifactId} IoTOS 软网关 https://gitee.com/geekhekr/iotos-soft-gateway