Browse Source

北向接口路由转发代码提交

jingyuanchao 1 year ago
parent
commit
f90ac69c79

+ 4 - 2
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/api/video/service/impl/VideoIntegrityCheckServiceImpl.java

@@ -2,6 +2,7 @@ package com.xunmei.mediator.api.video.service.impl;
 
 import cn.hutool.core.util.ObjectUtil;
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson2.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.IdWorker;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@@ -27,8 +28,8 @@ import com.xunmei.mediator.domain.dto.videoIntegrityCheck.VideoIntegrityCheckReq
 import com.xunmei.mediator.util.RedisUtil;
 import com.xunmei.mediator.websocket.constant.WebSocketConstants;
 import com.xunmei.mediator.websocket.dto.WebsocketResult;
+import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
 import com.xunmei.mediator.websocket.service.RouterService;
-import com.xunmei.mediator.websocket.utils.WebSocketUtils;
 import com.xunmei.system.api.RemoteOrgService;
 import com.xunmei.system.api.domain.SysOrg;
 import com.xunmei.system.api.domain.north.NorthError;
@@ -95,8 +96,9 @@ public class VideoIntegrityCheckServiceImpl extends ServiceImpl<VideoIntegrityCh
         websocketResult.setTimestamp(new Date());
         HashMap<String, Object> hashMap = new HashMap<>();
         hashMap.put(WebSocketConstants.SERVICE, WebSocketConstants.GET_RECORD_INFOS_SERVICES);
+        hashMap.put(WebSocketConstants.ARGS,new  JSONObject());
         websocketResult.setPayload(hashMap);
-        WebSocketUtils.publishAll(JacksonUtils.toJSONString(websocketResult));
+        WebSocketSessionHolder.sendAll(JacksonUtils.toJSONString(websocketResult));
 
     }
 

+ 8 - 2
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/constant/WebSocketConstants.java

@@ -34,10 +34,16 @@ public interface WebSocketConstants {
     // ------------------ IOT提供的服务能力开始 ------------------
 
     /**
-     * 服务端主动请求 固定字段
+     * 服务端主动请求/客户端回复消息 固定字段
      */
     String SERVICE="service";
 
+
+    /**
+     * 客户端主动上报 固定字段
+     */
+    String TRIGGER="trigger";
+
     /**
      * 客户端主动上报 固定字段
      */
@@ -45,7 +51,7 @@ public interface WebSocketConstants {
 
 
     /**
-     * 客户端主动上报 固定字段
+     * 服务端主动请求/客户端回复消息/客户端主动上报  固定字段
      */
     String ARGS="args";
 

+ 6 - 1
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/handler/SocWebSocketHandler.java

@@ -116,13 +116,18 @@ public class SocWebSocketHandler extends AbstractWebSocketHandler {
                 Map map = JSON.parseObject(obj.toString(), Map.class);
                 //上报事件
                 String event = (String) map.get(WebSocketConstants.EVENT);
+                String service = (String) map.get(WebSocketConstants.SERVICE);
+                String trigger = (String) map.get(WebSocketConstants.TRIGGER);
+
+                String routingKey=ObjectUtil.isNotEmpty(event) ? event : service;
+
                 //上报消息内容
                 JSONObject args = (JSONObject) map.get(WebSocketConstants.ARGS);
                 if (ObjectUtil.isEmpty(event)||args.isEmpty()){
                     log.error("消息内容为空:{}",message.getPayload());
                     return;
                 }
-                RouterService routeService = RouterServiceHandler.getRouteService(event);
+                RouterService routeService = RouterServiceHandler.getRouteService(routingKey);
                 routeService.execute(event,args);
                 //todo 返回iot消息
             }

+ 13 - 1
soc-modules/soc-modules-mediator/src/main/java/com/xunmei/mediator/websocket/holder/WebSocketSessionHolder.java

@@ -6,10 +6,11 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.redisson.api.RBucket;
 import org.redisson.api.RedissonClient;
+import org.springframework.web.socket.TextMessage;
 import org.springframework.web.socket.WebSocketSession;
 
+import java.io.IOException;
 import java.time.Duration;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -100,4 +101,15 @@ public class WebSocketSessionHolder {
     public static Boolean existSession(String sessionKey) {
         return USER_SESSION_MAP.containsKey(sessionKey);
     }
+
+
+    public static void sendAll(String message) {
+        try {
+            for (WebSocketSession session : USER_SESSION_MAP.values()) {
+                session.sendMessage(new TextMessage(message));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }