Selaa lähdekoodia

Websocket消息发送工具类调整

jingyuanchao 1 vuosi sitten
vanhempi
commit
e68abe8db7

+ 43 - 43
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/api/protection/service/impl/IotAlarmHostServiceImpl.java

@@ -16,18 +16,19 @@ import com.xunmei.mediator.websocket.constant.WebSocketConstants;
 import com.xunmei.mediator.websocket.dto.WebsocketExecuteReq;
 import com.xunmei.mediator.websocket.dto.WebsocketResult;
 import com.xunmei.mediator.websocket.enums.ProductEnums;
-import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.mediator.websocket.service.RouterService;
 import com.xunmei.mediator.websocket.utils.IotServerUtils;
+import com.xunmei.mediator.websocket.utils.WebSocketUtils;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
 import com.xunmei.system.api.enums.ProtectionStatus;
 import com.xunmei.system.api.util.LogUtils;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
+
 import javax.annotation.Resource;
 import java.io.Serializable;
 import java.time.LocalDateTime;
-import java.util.*;
+import java.util.StringJoiner;
 
 @Service
 public class IotAlarmHostServiceImpl implements IIotAlarmHostService, RouterService {
@@ -42,35 +43,34 @@ public class IotAlarmHostServiceImpl implements IIotAlarmHostService, RouterServ
     ProtectionLogMapper protectionLogMapper;
 
 
-
     @Transactional(rollbackFor = Exception.class)
-    public void changeAlarmHostStatus(WebsocketExecuteReq req){
+    public void changeAlarmHostStatus(WebsocketExecuteReq req) {
         try {
-            LogUtils.STATUS_INFO_DEFENCEAREA.info("【iot服务报警主机设备状态变更事件】【iotToken:{}】【msgId:{}】【接收参数:{}】", req.getToken(), req.getId(),JSON.toJSONString(req));
-            if (req.getData() != null){
+            LogUtils.STATUS_INFO_DEFENCEAREA.info("【iot服务报警主机设备状态变更事件】【iotToken:{}】【msgId:{}】【接收参数:{}】", req.getToken(), req.getId(), JSON.toJSONString(req));
+            if (req.getData() != null) {
                 JSONObject object = (JSONObject) req.getData();
                 String status = object.getString("status");
                 String updateTime = object.getString("time");
 
                 QueryWrapper<IotDeviceInfo> wrapper = new QueryWrapper<>();
                 wrapper.lambda()
-                        .eq(IotDeviceInfo::getDeviceProduct,req.getProductName())
-                        .eq(IotDeviceInfo::getDeviceCode,req.getDeviceName())
-                        .eq(IotDeviceInfo::getIotToken,req.getToken())
-                        .eq(IotDeviceInfo::getDeleted,0);
+                        .eq(IotDeviceInfo::getDeviceProduct, req.getProductName())
+                        .eq(IotDeviceInfo::getDeviceCode, req.getDeviceName())
+                        .eq(IotDeviceInfo::getIotToken, req.getToken())
+                        .eq(IotDeviceInfo::getDeleted, 0);
                 IotDeviceInfo deviceInfo = iIotDeviceInfoService.getOne(wrapper);
-                if (deviceInfo != null){
-                    if ("Online".equals(status)){
+                if (deviceInfo != null) {
+                    if ("Online".equals(status)) {
                         deviceInfo.setNetStatus("1");
-                    }else if ("Offline".equals(status)){
+                    } else if ("Offline".equals(status)) {
                         deviceInfo.setNetStatus("2");
-                    }else {
+                    } else {
                         deviceInfo.setNetStatus("0");
                     }
                     iIotDeviceInfoService.updateById(deviceInfo);
                 }
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             e.printStackTrace();
             LogUtils.STATUS_INFO_DEFENCEAREA.error("处理报警主机设备状态改变事件出错", e);
             throw new RuntimeException(e);
@@ -78,31 +78,31 @@ public class IotAlarmHostServiceImpl implements IIotAlarmHostService, RouterServ
     }
 
     @Transactional(rollbackFor = Exception.class)
-    public void changeSubSystemStatus(WebsocketExecuteReq req){
+    public void changeSubSystemStatus(WebsocketExecuteReq req) {
         try {
-            LogUtils.STATUS_INFO_DEFENCEAREA.info("【iot服务报警主机布撤防状态变更事件】【iotToken:{}】【msgId:{}】【接收参数:{}】", req.getToken(), req.getId(),JSON.toJSONString(req));
-            if (req.getData() != null){
+            LogUtils.STATUS_INFO_DEFENCEAREA.info("【iot服务报警主机布撤防状态变更事件】【iotToken:{}】【msgId:{}】【接收参数:{}】", req.getToken(), req.getId(), JSON.toJSONString(req));
+            if (req.getData() != null) {
                 JSONObject object = (JSONObject) req.getData();
                 Integer subSystemId = object.getInteger("id");
                 String status = object.getString("status");
 
                 QueryWrapper<IotAlarmSubsystem> wrapper = new QueryWrapper<>();
                 wrapper.lambda()
-                        .eq(IotAlarmSubsystem::getAlarmHostCode,req.getDeviceName())
-                        .eq(IotAlarmSubsystem::getCode,String.valueOf(subSystemId))
-                        .eq(IotAlarmSubsystem::getIotToken,req.getToken())
-                        .eq(IotAlarmSubsystem::getDeleted,0);
+                        .eq(IotAlarmSubsystem::getAlarmHostCode, req.getDeviceName())
+                        .eq(IotAlarmSubsystem::getCode, String.valueOf(subSystemId))
+                        .eq(IotAlarmSubsystem::getIotToken, req.getToken())
+                        .eq(IotAlarmSubsystem::getDeleted, 0);
                 IotAlarmSubsystem subsystem = subsystemService.getOne(wrapper);
 
-                if (subsystem != null){
+                if (subsystem != null) {
                     Integer dbStatus = subsystem.getStatus();
                     //Arm:布防 DisArm:撤防 Unknown:未知
                     //0:撤防,1:布防,2:未知
-                    if ("Arm".equals(status)){
+                    if ("Arm".equals(status)) {
                         subsystem.setStatus(ProtectionStatus.PROTECTION.ordinal());
-                    }else if ("DisArm".equals(status)){
+                    } else if ("DisArm".equals(status)) {
                         subsystem.setStatus(ProtectionStatus.REMOVAL.ordinal());
-                    }else {
+                    } else {
                         subsystem.setStatus(ProtectionStatus.UNKNOWN.ordinal());
                     }
                     subsystem.setStatusUpdateTime(LocalDateTime.now());
@@ -114,7 +114,7 @@ public class IotAlarmHostServiceImpl implements IIotAlarmHostService, RouterServ
                     this.saveProtectionLog(subsystem);
                 }
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             e.printStackTrace();
             LogUtils.STATUS_INFO_DEFENCEAREA.error("处理报警主机布撤防状态改变事件出错", e);
             throw new RuntimeException(e);
@@ -125,36 +125,36 @@ public class IotAlarmHostServiceImpl implements IIotAlarmHostService, RouterServ
     public void subSystemControl(Serializable id, boolean isArm) {
         try {
             IotAlarmSubsystem subsystem = subsystemService.getById(id);
-            if (subsystem != null){
+            if (subsystem != null) {
                 QueryWrapper<IotDeviceInfo> wrapper = new QueryWrapper<>();
                 wrapper.lambda()
-                        .eq(IotDeviceInfo::getDeviceCode,subsystem.getAlarmHostCode())
-                        .eq(IotDeviceInfo::getIotToken,subsystem.getIotToken());
+                        .eq(IotDeviceInfo::getDeviceCode, subsystem.getAlarmHostCode())
+                        .eq(IotDeviceInfo::getIotToken, subsystem.getIotToken());
                 IotDeviceInfo deviceInfo = iIotDeviceInfoService.getOne(wrapper);
-                if (deviceInfo != null){
+                if (deviceInfo != null) {
                     JSONObject args = new JSONObject();
-                    args.put("id",Integer.valueOf(subsystem.getCode()));
-                    args.put("isArm",isArm);
+                    args.put("id", Integer.valueOf(subsystem.getCode()));
+                    args.put("isArm", isArm);
                     String topic = "/things/" + deviceInfo.getDeviceProduct() + "/" + deviceInfo.getDeviceCode() + "/service/invoke";
                     JSONObject headers = new JSONObject();
                     headers.put("productName", deviceInfo.getDeviceProduct());
                     headers.put("deviceName", deviceInfo.getDeviceCode());
-                    LogUtils.DEVICE_CONTROL_LOG.info("报警主机布撤防控制topic:{},控制参数:{}",topic,args.toJSONString());
+                    LogUtils.DEVICE_CONTROL_LOG.info("报警主机布撤防控制topic:{},控制参数:{}", topic, args.toJSONString());
                     WebsocketResult websocketResult = IotServerUtils.invokeIotServer(topic, headers, WebSocketConstants.SUB_SYSTEM_CONTROL, args);
                     LogUtils.WEBSOCKET_MSG.info("布撤防控制指令:{}", JacksonUtils.toJSONString(websocketResult));
-                    boolean invoked = WebSocketSessionHolder.sendMessage(subsystem.getIotToken(), JacksonUtils.toJSONString(websocketResult));
-                    if (!invoked){
+                    boolean invoked = WebSocketUtils.sendMessage(subsystem.getIotToken(), JacksonUtils.toJSONString(websocketResult));
+                    if (!invoked) {
                         LogUtils.DEVICE_CONTROL_LOG.error("报警主机布撤防控制失败,指令下发iot服务失败");
                         throw new RuntimeException();
                     }
-                }else {
-                    LogUtils.DEVICE_CONTROL_LOG.error("报警主机布撤防控制失败,未找到对应的报警主机,报警控制器编码:{}",id);
+                } else {
+                    LogUtils.DEVICE_CONTROL_LOG.error("报警主机布撤防控制失败,未找到对应的报警主机,报警控制器编码:{}", id);
                     throw new RuntimeException();
                 }
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             e.printStackTrace();
-            LogUtils.DEVICE_CONTROL_LOG.error("报警主机布撤防控制失败",e);
+            LogUtils.DEVICE_CONTROL_LOG.error("报警主机布撤防控制失败", e);
             throw new RuntimeException(e);
         }
     }
@@ -188,12 +188,12 @@ public class IotAlarmHostServiceImpl implements IIotAlarmHostService, RouterServ
     @Transactional(rollbackFor = Exception.class)
     public Object execute(WebsocketExecuteReq req) {
         try {
-            if (WebSocketConstants.ON_LINE_STATUS.equals(req.getEvent())){
+            if (WebSocketConstants.ON_LINE_STATUS.equals(req.getEvent())) {
                 changeAlarmHostStatus(req);
-            }else if(WebSocketConstants.SUB_SYSTEM_STATUS.equals(req.getEvent())){
+            } else if (WebSocketConstants.SUB_SYSTEM_STATUS.equals(req.getEvent())) {
                 changeSubSystemStatus(req);
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             e.printStackTrace();
             throw new RuntimeException(e);
         }

+ 7 - 10
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/api/video/service/impl/IotDvrHardDiskDetectionServiceImpl.java

@@ -8,6 +8,7 @@ import com.xunmei.common.core.constant.Constants;
 import com.xunmei.common.core.constant.SecurityConstants;
 import com.xunmei.common.core.domain.iot.domain.IotDvrHardDiskDetection;
 import com.xunmei.common.core.domain.iot.domain.IotDvrHardDiskDetectionLog;
+import com.xunmei.common.core.enums.iot.DeviceTypeEnum;
 import com.xunmei.common.core.util.BeanHelper;
 import com.xunmei.common.core.utils.DateUtils;
 import com.xunmei.common.core.utils.IDHelper;
@@ -23,10 +24,9 @@ import com.xunmei.mediator.iot.service.IIotDeviceInfoService;
 import com.xunmei.mediator.util.CheckDataUtil;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
 import com.xunmei.mediator.websocket.dto.WebsocketExecuteReq;
-import com.xunmei.common.core.enums.iot.DeviceTypeEnum;
-import com.xunmei.mediator.websocket.dto.WebsocketResult;
 import com.xunmei.mediator.websocket.enums.ProductEnums;
 import com.xunmei.mediator.websocket.service.RouterService;
+import com.xunmei.mediator.websocket.utils.IotServerUtils;
 import com.xunmei.system.api.RemoteOrgService;
 import com.xunmei.system.api.domain.SysOrg;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
@@ -93,21 +93,18 @@ public class IotDvrHardDiskDetectionServiceImpl extends ServiceImpl<IotDvrHardDi
             dto.setEquipmentCode(recorderHardDiskDetectionReq.getDvsCode());
             dto.setCheckStatus(recorderHardDiskDetectionReq.getCheckStatus());
             dto.setCheckTime(recorderHardDiskDetectionReq.getCheckTime());
-            List<Map<String, Object>> maps = new ArrayList<>();
-            /*for (JSONObject object : recorderHardDiskDetectionReq.getDetailInfo()) {
-                Map javaObject = object.toJavaObject(Map.class);
-                maps.add(javaObject);
-            }*/
             dto.setDetailInfo(recorderHardDiskDetectionReq.getDetailInfo());
-
             String token = req.getToken();
             SysOrg sysOrg = iotServerInfoService.selectOrgByToken(token);
             dto.setOrganizationGuid(sysOrg.getCode());
             dto.setToken(token);
             dto.setDeviceName(req.getDeviceName());
             dto.setProductName(req.getProductName());
-            WebsocketResult result = WebsocketResult.of(saveData(dto, req.getId()), req.getTopic(), req.getId());
-            return result;
+            final ReceiveErrorDto res = saveData(dto, req.getId());
+            //构建返回数据
+            String topic = req.getTopic() + "/reply";
+            final JSONObject errorDto = IotServerUtils.dealReceiveErrorDto(res);
+            return IotServerUtils.invokeUpLinkServer(topic, req.getProductName(), req.getDeviceName(), req.getEvent(), errorDto, req.getId());
         } catch (IllegalAccessException e) {
             throw new RuntimeException(e);
         }

+ 3 - 1
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/api/video/service/impl/VideoIntegrityCheckServiceImpl.java

@@ -33,6 +33,7 @@ import com.xunmei.mediator.websocket.enums.TopicTypeEnums;
 import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.mediator.websocket.service.RouterService;
 import com.xunmei.mediator.websocket.utils.IotServerUtils;
+import com.xunmei.mediator.websocket.utils.WebSocketUtils;
 import com.xunmei.system.api.RemoteOrgService;
 import com.xunmei.system.api.domain.SysOrg;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
@@ -102,6 +103,7 @@ public class VideoIntegrityCheckServiceImpl extends ServiceImpl<VideoIntegrityCh
         checkDto.setOrganizationGuid(sysOrg.getCode());
         //执行业务
         final ReceiveErrorDto dto = this.saveData(checkDto, req.getId());
+        //构建返回数据
         String topic = req.getTopic() + "/reply";
         final JSONObject errorDto = IotServerUtils.dealReceiveErrorDto(dto);
         return IotServerUtils.invokeUpLinkServer(topic, req.getProductName(), req.getDeviceName(), req.getEvent(), errorDto, req.getId());
@@ -120,7 +122,7 @@ public class VideoIntegrityCheckServiceImpl extends ServiceImpl<VideoIntegrityCh
             object.put("recordDate", Arrays.asList(DateUtil.format(DateUtil.offsetDay(new Date(), -1), Constants.DAILY_FORMAT)));
             WebsocketResult websocketResult = IotServerUtils.invokeDownLinkServer(TopicTypeEnums.PRODUCT_SERVICE_INVOKE, iotDeviceInfo.getDeviceProduct(), iotDeviceInfo.getDeviceCode(), WebSocketConstants.GET_RECORD_INFOS_SERVICES, object);
             LogUtils.WEBSOCKET_MSG.info("获取录像完整性数据:{}", JacksonUtils.toJSONString(websocketResult));
-            WebSocketSessionHolder.sendAll(JacksonUtils.toJSONString(websocketResult));
+            WebSocketUtils.sendAll(JacksonUtils.toJSONString(websocketResult));
         }
 
     }

+ 1 - 2
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/handler/SocWebSocketHandler.java

@@ -173,7 +173,7 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
             WebSocketUtils.sendMessage(session, result);
             LogUtils.WEBSOCKET_MSG.info("中心平台返回消息,token:{},内容:{}", token, JacksonUtils.toJSONString(result));
         } catch (Exception e) {
-            LogUtils.WEBSOCKET_MSG.error("转换消息内容时出错:{}", e);
+            LogUtils.WEBSOCKET_MSG.error("消息处理失败:{}", e);
         }
     }
 
@@ -243,5 +243,4 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
     public boolean supportsPartialMessages() {
         return false;
     }
-
 }

+ 5 - 32
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/holder/WebSocketSessionHolder.java

@@ -1,8 +1,10 @@
 package com.xunmei.mediator.websocket.holder;
 
 import cn.hutool.core.util.ObjectUtil;
+import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.common.core.utils.StringUtils;
 import com.xunmei.common.redis.utils.RedisUtils;
+import com.xunmei.mediator.websocket.dto.WebsocketResult;
 import com.xunmei.system.api.util.LogUtils;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
@@ -119,36 +121,7 @@ public class WebSocketSessionHolder {
     }
 
 
-    public static void sendAll(String message) {
-        if (ObjectUtil.isEmpty(USER_SESSION_MAP)) {
-            LogUtils.WEBSOCKET_MSG.error("消息广播失败,没有Iot服务在线,消息内容:{}", message);
-            return;
-        }
-        try {
-            for (WebSocketSession session : USER_SESSION_MAP.values()) {
-                session.sendMessage(new TextMessage(message));
-                String hostAddress = session.getRemoteAddress().getAddress().getHostAddress();
-                LogUtils.WEBSOCKET_MSG.info("消息广播成功,发送目标ip:{},消息内容:{}", hostAddress, message);
-            }
-        } catch (IOException e) {
-            log.error("消息广播失败:{}", e);
-        }
-    }
-
-    public static boolean sendMessage(String token,String message) {
-        try {
-            WebSocketSession webSocketSession = USER_SESSION_MAP.get(token);
-            if (null == webSocketSession){
-                LogUtils.WEBSOCKET_MSG.error("消息广播失败,未找到对应在线的iot服务,iot服务:{},消息内容:{}", token,message);
-                return false;
-            }
-            webSocketSession.sendMessage(new TextMessage(message));
-            String hostAddress = webSocketSession.getRemoteAddress().getAddress().getHostAddress();
-            LogUtils.WEBSOCKET_MSG.info("消息广播成功,发送目标ip:{},消息内容:{}", hostAddress, message);
-            return true;
-        } catch (IOException e) {
-            log.error("消息广播失败:{}", e);
-            return false;
-        }
-    }
+   public static Map<String,WebSocketSession> map(){
+        return USER_SESSION_MAP;
+   }
 }

+ 4 - 3
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/service/impl/WebsocketServiceImpl.java

@@ -37,6 +37,7 @@ import com.xunmei.mediator.websocket.redis.WebsocketPublisher;
 import com.xunmei.mediator.websocket.service.RouterService;
 import com.xunmei.mediator.websocket.service.WebsocketService;
 import com.xunmei.mediator.websocket.utils.IotServerUtils;
+import com.xunmei.mediator.websocket.utils.WebSocketUtils;
 import com.xunmei.system.api.domain.iot.IotDeviceInfo;
 import com.xunmei.system.api.domain.iot.IotDeviceInfoExtend;
 import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
@@ -92,7 +93,7 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
         args.put("categories", categories);
         WebsocketResult websocketResult = IotServerUtils.invokeIotServer(iotServerDeviceTopic, new JSONObject(), WebSocketConstants.GET_DEVICE_BASE_INFOS, args);
         LogUtils.WEBSOCKET_MSG.info("获取设备基础数据:{}", JacksonUtils.toJSONString(websocketResult));
-        WebSocketSessionHolder.sendAll(JacksonUtils.toJSONString(websocketResult));
+        WebSocketUtils.sendAll(JacksonUtils.toJSONString(websocketResult));
     }
 
     @Override
@@ -102,7 +103,7 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
         args.put("productNames", productNames);
         WebsocketResult websocketResult = IotServerUtils.invokeIotServer(iotServerDeviceTopic, new JSONObject(), WebSocketConstants.GET_DVS_DEVICE_INFOS, args);
         LogUtils.WEBSOCKET_MSG.info("获取dvs下基础数据:{}", JacksonUtils.toJSONString(websocketResult));
-        WebSocketSessionHolder.sendAll(JacksonUtils.toJSONString(websocketResult));
+        WebSocketUtils.sendAll(JacksonUtils.toJSONString(websocketResult));
     }
 
     @Override
@@ -112,7 +113,7 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
         args.put("productNames", productNames);
         WebsocketResult websocketResult = IotServerUtils.invokeIotServer(iotServerDeviceTopic, new JSONObject(), WebSocketConstants.GET_ALARM_HOST_DEVICE_INFOS, args);
         LogUtils.WEBSOCKET_MSG.info("获取报警主机下数据:{}", JacksonUtils.toJSONString(websocketResult));
-        WebSocketSessionHolder.sendAll(JacksonUtils.toJSONString(websocketResult));
+        WebSocketUtils.sendAll(JacksonUtils.toJSONString(websocketResult));
     }
 
     /**

+ 19 - 0
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/utils/IotServerUtils.java

@@ -34,6 +34,15 @@ public class IotServerUtils {
         return websocketResult;
     }
 
+    /**
+     * 构建主动调用主机service的消息
+     * @param topicTypeEnums
+     * @param produceName
+     * @param deviceName
+     * @param invokeMethod
+     * @param object
+     * @return
+     */
     public static WebsocketResult invokeDownLinkServer(TopicTypeEnums topicTypeEnums,String produceName,String deviceName,String invokeMethod,JSONObject object){
         Date date = new Date();
         String id = UUID.randomUUID().toString();
@@ -70,6 +79,16 @@ public class IotServerUtils {
         return iotWebsocketResult;
     }
 
+    /**
+     * 接受主机上报事件后,构建返回的消息
+     * @param topic
+     * @param produceName
+     * @param deviceName
+     * @param invokeMethod
+     * @param object
+     * @param replyId
+     * @return
+     */
     public static WebsocketResult invokeUpLinkServer(String topic,String produceName,String deviceName,String invokeMethod,JSONObject object,String replyId){
         WebsocketResult iotWebsocketResult = new WebsocketResult();
         //Iot消息透穿至主机 固定Topic

+ 53 - 41
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/utils/WebSocketUtils.java

@@ -1,12 +1,14 @@
 package com.xunmei.mediator.websocket.utils;
 
 import cn.hutool.core.collection.CollUtil;
+import cn.hutool.core.util.ObjectUtil;
 import com.xunmei.common.core.utils.JacksonUtils;
 import com.xunmei.common.redis.utils.RedisUtils;
 import com.xunmei.mediator.websocket.dto.WebSocketMessageDto;
 import com.xunmei.mediator.websocket.dto.WebsocketResult;
 import com.xunmei.mediator.websocket.dto.WebsocketUrlInfo;
 import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
+import com.xunmei.system.api.util.LogUtils;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -18,6 +20,7 @@ import org.springframework.web.socket.WebSocketSession;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Consumer;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -34,18 +37,6 @@ import static com.xunmei.mediator.websocket.constant.WebSocketConstants.WEB_SOCK
 @Slf4j
 @NoArgsConstructor(access = AccessLevel.PRIVATE)
 public class WebSocketUtils {
-
-    /**
-     * 向指定的WebSocket会话发送消息
-     *
-     * @param sessionKey 要发送消息的用户id
-     * @param message    要发送的消息内容
-     */
-    public static void sendMessage(String sessionKey, String message) {
-        WebSocketSession session = WebSocketSessionHolder.getSessions(sessionKey);
-        sendMessage(session, message);
-    }
-
     /**
      * 订阅WebSocket消息主题,并提供一个消费者函数来处理接收到的消息
      *
@@ -117,13 +108,6 @@ public class WebSocketUtils {
         sendMessage(session, new TextMessage(message));
     }
 
-    public static void sendMessage(WebSocketSession session, WebsocketResult message) {
-        if (message == null) {
-            return;
-        }
-        sendMessage(session, new TextMessage(JacksonUtils.toJSONString(message)));
-    }
-
     /**
      * 向指定的WebSocket会话发送WebSocket消息对象
      *
@@ -142,28 +126,56 @@ public class WebSocketUtils {
         }
     }
 
-    /**
-     * 解析websocket topic 路径参数
-     *
-     * @param url
-     * @return
-     */
-    public static WebsocketUrlInfo resolveUrlInfo(String url) {
-        // 正则表达式模式
-        Pattern pattern = Pattern.compile("^/things/(\\w+)/(\\w+)/service/invoke$");
-        // 创建Matcher对象
-        Matcher matcher = pattern.matcher(url);
-        // 检查是否匹配成功
-        if (matcher.find()) {
-            // 提取动态参数
-            String productName = matcher.group(1);
-            String deviceName = matcher.group(2);
-            WebsocketUrlInfo websocketUrlInfo = new WebsocketUrlInfo();
-            websocketUrlInfo.setDeviceName(deviceName);
-            websocketUrlInfo.setProductName(productName);
-            return websocketUrlInfo;
-        } else {
-            return null;
+
+    public static void sendAll(String message) {
+        final Map<String, WebSocketSession> USER_SESSION_MAP = WebSocketSessionHolder.map();
+        if (ObjectUtil.isEmpty(USER_SESSION_MAP)) {
+            LogUtils.WEBSOCKET_MSG.error("消息广播失败,没有Iot服务在线,消息内容:{}", message);
+            return;
+        }
+        try {
+            for (WebSocketSession session : USER_SESSION_MAP.values()) {
+                session.sendMessage(new TextMessage(message));
+                String hostAddress = session.getRemoteAddress().getAddress().getHostAddress();
+                LogUtils.WEBSOCKET_MSG.info("消息广播成功,发送目标ip:{},消息内容:{}", hostAddress, message);
+            }
+        } catch (IOException e) {
+            log.error("消息广播失败:{}", e);
+        }
+    }
+
+    public static boolean sendMessage(String token, String message) {
+        final Map<String, WebSocketSession> USER_SESSION_MAP = WebSocketSessionHolder.map();
+        try {
+            WebSocketSession webSocketSession = USER_SESSION_MAP.get(token);
+            if (null == webSocketSession) {
+                LogUtils.WEBSOCKET_MSG.error("消息广播失败,未找到对应在线的iot服务,iot服务:{},消息内容:{}", token, message);
+                return false;
+            }
+            webSocketSession.sendMessage(new TextMessage(message));
+            String hostAddress = webSocketSession.getRemoteAddress().getAddress().getHostAddress();
+            LogUtils.WEBSOCKET_MSG.info("消息广播成功,发送目标ip:{},消息内容:{}", hostAddress, message);
+            return true;
+        } catch (IOException e) {
+            log.error("消息广播失败:{}", e);
+            return false;
+        }
+    }
+
+    public static boolean sendMessage(WebSocketSession webSocketSession, WebsocketResult message) {
+        final Map<String, WebSocketSession> USER_SESSION_MAP = WebSocketSessionHolder.map();
+        try {
+            if (webSocketSession == null || !webSocketSession.isOpen()) {
+                return false;
+            }
+            final String msg = JacksonUtils.toJSONString(message);
+            webSocketSession.sendMessage(new TextMessage(msg));
+            String hostAddress = webSocketSession.getRemoteAddress().getAddress().getHostAddress();
+            LogUtils.WEBSOCKET_MSG.info("消息广播成功,发送目标ip:{},消息内容:{}", hostAddress, msg);
+            return true;
+        } catch (IOException e) {
+            log.error("消息广播失败:{}", e);
+            return false;
         }
     }
 }