|
|
@@ -34,6 +34,7 @@ import com.xunmei.mediator.websocket.holder.WebSocketSessionHolder;
|
|
|
import com.xunmei.mediator.websocket.redis.WebsocketPublisher;
|
|
|
import com.xunmei.mediator.websocket.service.RouterService;
|
|
|
import com.xunmei.mediator.websocket.service.WebsocketService;
|
|
|
+import com.xunmei.mediator.websocket.utils.IotServerUtils;
|
|
|
import com.xunmei.system.api.domain.iot.IotDeviceInfo;
|
|
|
import com.xunmei.system.api.domain.iot.IotDeviceInfoExtend;
|
|
|
import com.xunmei.system.api.domain.websocket.RedisWebsocketMsg;
|
|
|
@@ -71,28 +72,41 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
|
|
|
private IotAlarmSubsystemService iotAlarmSubsystemService;
|
|
|
|
|
|
|
|
|
- private static final String LOCK_DVS_ACCPET_KEY_PREFIX = "websocket:lock:base_info_dvs_";
|
|
|
+ private static final String LOCK_DVS_BASE_INFO = "websocket:lock:dvs_base_info";
|
|
|
|
|
|
- private static final String LOCK_ALARM_HOST_ACCPET_KEY_PREFIX = "websocket:lock:base_info_alarm_host_";
|
|
|
+ private static final String LOCK_DEVICE_BASE_INFO = "websocket:lock:device_base_info";
|
|
|
|
|
|
- private static final String LOCK_SUBSYSTEM_INPUT_ACCPET_KEY_PREFIX = "websocket:lock:base_info_subSystem_input_";
|
|
|
+ private static final String LOCK_ALARM_HOST_BASE_INFO = "websocket:lock:alarmHost_base_info";
|
|
|
|
|
|
+ //获取基础数据topic
|
|
|
+ private static final String iotServerDeviceTopic = "/things/IoTServer/IoTServerDevice/service/invoke";
|
|
|
|
|
|
@Override
|
|
|
public void invokeBaseDeviceInfoForSchedule() {
|
|
|
|
|
|
- WebsocketResult websocketResult = new WebsocketResult();
|
|
|
- websocketResult.setId(UUID.randomUUID().toString());
|
|
|
- websocketResult.setTimestamp(new Date());
|
|
|
- HashMap<String, Object> hashMap = new HashMap<>();
|
|
|
- hashMap.put(WebSocketConstants.SERVICE, WebSocketConstants.GET_DEVICE_BASE_INFOS);
|
|
|
- JSONObject object = new JSONObject();
|
|
|
- object.put("categories", Arrays.asList(WebSocketConstants.DVS, WebSocketConstants.ALARM_HOST));
|
|
|
- hashMap.put(WebSocketConstants.ARGS, object);
|
|
|
- websocketResult.setPayload(hashMap);
|
|
|
- WebSocketSessionHolder.sendAll(JacksonUtils.toJSONString(websocketResult));
|
|
|
+ List<String> categories = Arrays.asList(WebSocketConstants.DVS, WebSocketConstants.ALARM_HOST);
|
|
|
+ JSONObject args = new JSONObject();
|
|
|
+ args.put("categories", categories);
|
|
|
|
|
|
+ IotServerUtils.invokeIotServer(iotServerDeviceTopic,WebSocketConstants.GET_DEVICE_BASE_INFOS,args);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void invokeDvsBaseInfoForSchedule() {
|
|
|
+ String[] productNames = ProductEnums.DVS.getProductName();
|
|
|
+ JSONObject args = new JSONObject();
|
|
|
+ args.put("productNames", productNames);
|
|
|
+
|
|
|
+ IotServerUtils.invokeIotServer(iotServerDeviceTopic,WebSocketConstants.GET_DVS_DEVICE_INFOS,args);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void invokeAlarmHostBaseInfoForSchedule() {
|
|
|
+ String[] productNames = ProductEnums.ALARM_HOST.getProductName();
|
|
|
+ JSONObject args = new JSONObject();
|
|
|
+ args.put("productNames", productNames);
|
|
|
|
|
|
+ IotServerUtils.invokeIotServer(iotServerDeviceTopic,WebSocketConstants.GET_ALARM_HOST_DEVICE_INFOS,args);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -152,11 +166,17 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
|
|
|
}
|
|
|
|
|
|
|
|
|
+ /**
|
|
|
+ * dvs下通道、硬盘、dvs扩展信息基础信息
|
|
|
+ * @param dvsBaseInfo
|
|
|
+ * @param token
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
@Async
|
|
|
@Transactional(rollbackFor = Exception.class)
|
|
|
public void dealDvsBaseInfo(DvsBaseInfo dvsBaseInfo, String token) throws Exception {
|
|
|
//添加分布式锁
|
|
|
- String lockKey = LOCK_DVS_ACCPET_KEY_PREFIX + token;
|
|
|
+ String lockKey = LOCK_DVS_BASE_INFO + token;
|
|
|
lockAndExecute(lockKey, () -> {
|
|
|
if (StringUtils.isEmpty(token) || dvsBaseInfo == null) {
|
|
|
log.error("同步视频类基础信息失败。原因:token或DvsBaseInfo为null");
|
|
|
@@ -211,13 +231,20 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
|
|
|
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 报警主机和dvs基础信息
|
|
|
+ * @param deviceBaseInfos
|
|
|
+ * @param token
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Async
|
|
|
@Override
|
|
|
@Transactional(rollbackFor = Exception.class)
|
|
|
- public void dealBaseDeviceInfo(List<DeviceBaseInfo> alarmHostInfos, String token) throws Exception {
|
|
|
+ public void dealBaseDeviceInfo(List<DeviceBaseInfo> deviceBaseInfos, String token) throws Exception {
|
|
|
//添加分布式锁
|
|
|
- String lockKey = LOCK_ALARM_HOST_ACCPET_KEY_PREFIX + token;
|
|
|
+ String lockKey = LOCK_DEVICE_BASE_INFO + token;
|
|
|
lockAndExecute(lockKey, () -> {
|
|
|
- if (StringUtils.isEmpty(token) || alarmHostInfos.isEmpty()) {
|
|
|
+ if (StringUtils.isEmpty(token) || deviceBaseInfos.isEmpty()) {
|
|
|
log.error("同步主机基础信息失败。原因:token或alarmHostInfos为空");
|
|
|
return;
|
|
|
}
|
|
|
@@ -241,22 +268,29 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
|
|
|
//更新主机集合
|
|
|
List<IotDeviceInfo> updateHostList = new ArrayList();
|
|
|
|
|
|
- processHostDevices(alarmHostInfos, serverInfo, token, addHostList, updateHostList);
|
|
|
+ processHostDevices(deviceBaseInfos, serverInfo, token, addHostList, updateHostList);
|
|
|
|
|
|
saveOrUpdateHostBatches(addHostList, updateHostList);
|
|
|
|
|
|
|
|
|
} catch (Exception var8) {
|
|
|
- log.error("同步报警主机基础信息过程中出现异常", var8);
|
|
|
+ log.error("同步主机基础信息过程中出现异常", var8);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 报警主机下 子系统及防区基础数据和报警主机扩展信息
|
|
|
+ * @param alarmHostBaseInfo
|
|
|
+ * @param token
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Async
|
|
|
@Override
|
|
|
@Transactional(rollbackFor = Exception.class)
|
|
|
- public void dealSubSystemAndInput(AlarmHostBaseInfo alarmHostBaseInfo, String token) throws Exception {
|
|
|
+ public void dealAlarmHostBaseInfo(AlarmHostBaseInfo alarmHostBaseInfo, String token) throws Exception {
|
|
|
//添加分布式锁
|
|
|
- String lockKey = LOCK_SUBSYSTEM_INPUT_ACCPET_KEY_PREFIX + token;
|
|
|
+ String lockKey = LOCK_ALARM_HOST_BASE_INFO + token;
|
|
|
lockAndExecute(lockKey, () -> {
|
|
|
if (StringUtils.isEmpty(token) || alarmHostBaseInfo == null) {
|
|
|
log.error("同步报警主机子系统和防区信息失败。原因:token或alarmHostBaseInfo为空");
|
|
|
@@ -598,23 +632,23 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
|
|
|
/**
|
|
|
* 处理报警主机数据
|
|
|
*
|
|
|
- * @param alarmHostInfos
|
|
|
+ * @param deviceBaseInfos
|
|
|
* @param serverInfo
|
|
|
* @param token
|
|
|
* @param addList
|
|
|
* @param updateList
|
|
|
*/
|
|
|
- private void processHostDevices(List<DeviceBaseInfo> alarmHostInfos, IotServerInfo serverInfo, String token, List<IotDeviceInfo> addList, List<IotDeviceInfo> updateList) {
|
|
|
- if (ObjectUtil.isEmpty(alarmHostInfos)) {
|
|
|
+ private void processHostDevices(List<DeviceBaseInfo> deviceBaseInfos, IotServerInfo serverInfo, String token, List<IotDeviceInfo> addList, List<IotDeviceInfo> updateList) {
|
|
|
+ if (ObjectUtil.isEmpty(deviceBaseInfos)) {
|
|
|
log.error("同步主机基础信息,获取主机信息为空");
|
|
|
} else {
|
|
|
- for (DeviceBaseInfo alarmHostInfo : alarmHostInfos) {
|
|
|
- IotDeviceInfo info = iIotDeviceInfoService.selectByTypeAndCode(token, getDeviceType(alarmHostInfo.getCategories()), alarmHostInfo.getDeviceName());
|
|
|
+ for (DeviceBaseInfo deviceBaseInfo : deviceBaseInfos) {
|
|
|
+ IotDeviceInfo info = iIotDeviceInfoService.selectByTypeAndCode(token, getDeviceType(deviceBaseInfo.getCategories()), deviceBaseInfo.getDeviceName());
|
|
|
if (info == null) {
|
|
|
- info = createHostInfo(alarmHostInfo, serverInfo);
|
|
|
+ info = createHostInfo(deviceBaseInfo, serverInfo);
|
|
|
addList.add(info);
|
|
|
} else {
|
|
|
- dealHostInfo(alarmHostInfo, info, serverInfo);
|
|
|
+ dealHostInfo(deviceBaseInfo, info, serverInfo);
|
|
|
updateList.add(info);
|
|
|
}
|
|
|
}
|
|
|
@@ -840,28 +874,29 @@ public class WebsocketServiceImpl implements WebsocketService, RouterService {
|
|
|
StringJoiner result = new StringJoiner(",");
|
|
|
result.add(WebSocketConstants.GET_DEVICE_BASE_INFOS);
|
|
|
result.add(WebSocketConstants.GET_ALARM_HOST_DEVICE_INFOS);
|
|
|
+ result.add(WebSocketConstants.GET_DVS_DEVICE_INFOS);
|
|
|
return result.toString();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
public Object execute(WebsocketExecuteReq req) {
|
|
|
try {
|
|
|
if (WebSocketConstants.GET_DEVICE_BASE_INFOS.equals(req.getEvent())) {
|
|
|
JSONObject object = (JSONObject) req.getData();
|
|
|
- if (object != null) {
|
|
|
+ if (object != null){
|
|
|
List<DeviceBaseInfo> deviceBaseInfos = (List<DeviceBaseInfo>) object.get("deviceBaseInfos");
|
|
|
- chooseMethod(deviceBaseInfos);
|
|
|
+ dealBaseDeviceInfo(deviceBaseInfos,req.getToken());
|
|
|
}
|
|
|
} else if (WebSocketConstants.GET_ALARM_HOST_DEVICE_INFOS.equals(req.getEvent())) {
|
|
|
- dealSubSystemAndInput((AlarmHostBaseInfo) req.getData(), req.getToken());
|
|
|
+ dealAlarmHostBaseInfo((AlarmHostBaseInfo) req.getData(), req.getToken());
|
|
|
+ }else if(WebSocketConstants.GET_DVS_DEVICE_INFOS.equals(req.getEvent())){
|
|
|
+ dealDvsBaseInfo((DvsBaseInfo) req.getData(),req.getToken());
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
-
|
|
|
+ e.printStackTrace();
|
|
|
+ throw new RuntimeException(e);
|
|
|
}
|
|
|
return null;
|
|
|
}
|
|
|
-
|
|
|
- private void chooseMethod(List<DeviceBaseInfo> deviceBaseInfos) {
|
|
|
- //new HashMap<String,List>()
|
|
|
- }
|
|
|
}
|