|
|
@@ -1,11 +1,38 @@
|
|
|
package com.xunmei.mediator.api.service.impl;
|
|
|
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
+import com.xunmei.common.core.constant.CacheConstants;
|
|
|
import com.xunmei.common.core.constant.Constants;
|
|
|
+import com.xunmei.common.core.constant.SecurityConstants;
|
|
|
+import com.xunmei.common.core.domain.iot.domain.IotDevice;
|
|
|
import com.xunmei.common.core.domain.iot.domain.IotDvrChannel;
|
|
|
+import com.xunmei.common.core.domain.mediator.domain.MediatorCategory;
|
|
|
+import com.xunmei.common.core.domain.mediator.vo.EquipmentOrgVo;
|
|
|
import com.xunmei.mediator.api.mapper.IotDvrChannelMapper;
|
|
|
+import com.xunmei.mediator.api.service.IIotDeviceService;
|
|
|
import com.xunmei.mediator.api.service.IIotDvrChannelService;
|
|
|
+import com.xunmei.mediator.api.service.IMediatorCategoryService;
|
|
|
+import com.xunmei.mediator.api.service.NorthErrorService;
|
|
|
+import com.xunmei.mediator.domain.dto.redis.RedisKey;
|
|
|
+import com.xunmei.system.api.RemoteOrgService;
|
|
|
+import com.xunmei.system.api.domain.SysOrg;
|
|
|
+import com.xunmei.system.api.domain.north.NorthError;
|
|
|
+import com.xunmei.system.api.dto.device.DeviceDto;
|
|
|
+import com.xunmei.system.api.dto.protection.ReceiveErrorDto;
|
|
|
+import com.xunmei.system.api.util.LogUtils;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.springframework.context.annotation.Lazy;
|
|
|
+import org.springframework.data.redis.core.ListOperations;
|
|
|
+import org.springframework.data.redis.core.RedisTemplate;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* <p>
|
|
|
@@ -17,6 +44,19 @@ import org.springframework.stereotype.Service;
|
|
|
*/
|
|
|
@Service
|
|
|
public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, IotDvrChannel> implements IIotDvrChannelService {
|
|
|
+ @Resource
|
|
|
+ private IMediatorCategoryService categoryService;
|
|
|
+ @Resource
|
|
|
+ private RemoteOrgService orgService;
|
|
|
+ @Resource
|
|
|
+ private RedisTemplate redisTemplate;
|
|
|
+ @Resource
|
|
|
+ private NorthErrorService northErrorService;
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ @Lazy
|
|
|
+ private IIotDeviceService iIotDeviceService;
|
|
|
+
|
|
|
|
|
|
@Override
|
|
|
public IotDvrChannel findChannel(String channelCode, String equipmentCode, Long orgId) {
|
|
|
@@ -27,4 +67,235 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
.last(Constants.LIMIT1)
|
|
|
.one();
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ReceiveErrorDto saveChannel(List<DeviceDto> deviceDtoList, String branchId, String msgId, String inner) {
|
|
|
+ if (org.apache.commons.collections4.CollectionUtils.isEmpty(deviceDtoList)) {
|
|
|
+ return ReceiveErrorDto.error("通道数据为空");
|
|
|
+ } else {
|
|
|
+ MediatorCategory category = categoryService.findCategoryByCodeAndParentId("18", -1L);
|
|
|
+
|
|
|
+ DeviceDto deviceDto;
|
|
|
+ List<NorthError> errors = new ArrayList<>();
|
|
|
+ List<IotDvrChannel> list = new ArrayList<>();
|
|
|
+ String equipmentId = null;
|
|
|
+
|
|
|
+ deviceDto = deviceDtoList.get(0);
|
|
|
+ equipmentId = deviceDto.getEquipmentCode();
|
|
|
+ final SysOrg org = orgService.findByCode(deviceDto.getOrganizationGuid(), SecurityConstants.INNER);
|
|
|
+ String UK = UUID.randomUUID().toString();
|
|
|
+
|
|
|
+ if (org != null) {
|
|
|
+ //制作一个随时变化的key
|
|
|
+ Boolean flag = redisTemplate.hasKey(CacheConstants.CHANNEL_KEY);
|
|
|
+ if (!flag) {
|
|
|
+ //判断是否存在这个key
|
|
|
+ this.redisTemplate.opsForValue().set(CacheConstants.CHANNEL_KEY, UK);
|
|
|
+ } else {
|
|
|
+ UK = (String) this.redisTemplate.opsForValue().get(CacheConstants.CHANNEL_KEY);
|
|
|
+ }
|
|
|
+ //根据变化的key,缓存数据,供定时任务使用
|
|
|
+ EquipmentOrgVo eov = new EquipmentOrgVo();
|
|
|
+ eov.setEquipmentCode(equipmentId);
|
|
|
+ eov.setOrgId(org.getId());
|
|
|
+ redisTemplate.opsForList().rightPush(UK, eov);
|
|
|
+ redisTemplate.expire(UK, 1, TimeUnit.HOURS);
|
|
|
+ }
|
|
|
+
|
|
|
+ for (int i = 0; i < deviceDtoList.size(); i++) {
|
|
|
+ deviceDto = deviceDtoList.get(i);
|
|
|
+ if (StringUtils.isEmpty(deviceDto.getEquipmentCode())) {
|
|
|
+ NorthError error = new NorthError(msgId, branchId, "/data/channelList"
|
|
|
+ , deviceDto, "equipmentCode 为空");
|
|
|
+ errors.add(error);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (StringUtils.isEmpty(deviceDto.getOrganizationGuid())) {
|
|
|
+ NorthError error = new NorthError(msgId, branchId, "/data/channelList"
|
|
|
+ , deviceDto, "organizationGuid 为空");
|
|
|
+ errors.add(error);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (StringUtils.isEmpty(deviceDto.getChannelCode())) {
|
|
|
+ NorthError error = new NorthError(msgId, branchId, "/data/channelList"
|
|
|
+ , deviceDto, "channelCode 为空");
|
|
|
+ errors.add(error);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (StringUtils.isEmpty(deviceDto.getChannelName())) {
|
|
|
+ NorthError error = new NorthError(msgId, branchId, "/data/channelList"
|
|
|
+ , deviceDto, "channelName 为空");
|
|
|
+ errors.add(error);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if (org == null) {
|
|
|
+ NorthError error = new NorthError(msgId, branchId, "/data/channelList"
|
|
|
+ , deviceDto, "orgGUID:" + deviceDto.getOrganizationGuid() + " 不正确");
|
|
|
+ errors.add(error);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 先不做数据处理,先缓存所有待处理的数据,然后每5分钟处理一次缓存数据
|
|
|
+ */
|
|
|
+
|
|
|
+
|
|
|
+ deviceDto.setOrg(org);
|
|
|
+ IotDevice host = iIotDeviceService.findByHostCode(deviceDto.getEquipmentCode(), org.getId());
|
|
|
+ if (host == null) {
|
|
|
+ NorthError error = new NorthError(msgId, branchId, "/data/channelList"
|
|
|
+ , deviceDto, "equipmentCode:" + deviceDto.getEquipmentCode() + "该主机code不存在");
|
|
|
+ errors.add(error);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ IotDvrChannel device = getChannelDevice(deviceDto, category);
|
|
|
+
|
|
|
+ list.add(device);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ if (list.size() > 0) {
|
|
|
+ redisTemplate.opsForList().rightPushAll(UK + CacheConstants.AFTER_DATA, list);
|
|
|
+ redisTemplate.expire(UK + CacheConstants.AFTER_DATA, 1, TimeUnit.HOURS);
|
|
|
+ }
|
|
|
+ if (errors.size() > 0) {
|
|
|
+ northErrorService.saveErrorData(errors);
|
|
|
+ }
|
|
|
+
|
|
|
+ return ReceiveErrorDto.success();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private IotDvrChannel getChannelDevice(DeviceDto deviceDto, MediatorCategory category) {
|
|
|
+ IotDvrChannel device = new IotDvrChannel();
|
|
|
+ IotDvrChannel old = this.findChannel(deviceDto.getEquipmentCode(), deviceDto.getChannelCode(), deviceDto.getOrg().getId());
|
|
|
+ if (old != null) {
|
|
|
+ device = old;
|
|
|
+ }
|
|
|
+ device.setChannelName(deviceDto.getChannelName());
|
|
|
+ device.setHostCode(deviceDto.getEquipmentCode());
|
|
|
+ device.setOrgId(deviceDto.getOrg().getId());
|
|
|
+ device.setOrgPath(deviceDto.getOrg().getPath());
|
|
|
+ device.setOrgName(deviceDto.getOrg().getName());
|
|
|
+ device.setChannelCode(Integer.parseInt(deviceDto.getChannelCode()));
|
|
|
+ device.setChannelAddr(deviceDto.getNetAddress());
|
|
|
+ final LocalDateTime now = LocalDateTime.now();
|
|
|
+ device.setUpdateTime(now);
|
|
|
+ device.setCreateTime(now);
|
|
|
+
|
|
|
+ device.setSource(1);
|
|
|
+ device.setDeleted(0);
|
|
|
+ return device;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void channelCacheDeal() {
|
|
|
+ try {
|
|
|
+ Boolean flag = redisTemplate.hasKey(CacheConstants.CHANNEL_KEY);
|
|
|
+ if (!flag) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Boolean isData = this.redisTemplate.hasKey(CacheConstants.CHANNEL_KEY);
|
|
|
+ if (!isData) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ String UK = (String) this.redisTemplate.opsForValue().get(CacheConstants.CHANNEL_KEY);
|
|
|
+
|
|
|
+ ListOperations<String, EquipmentOrgVo> lso = this.redisTemplate.opsForList();
|
|
|
+ List<EquipmentOrgVo> list = lso.range(UK, 0, -1);
|
|
|
+ int voSize = list.size();
|
|
|
+ if (voSize > 0) {
|
|
|
+ lso.trim(UK, voSize, -1L);
|
|
|
+ }
|
|
|
+
|
|
|
+ ListOperations<String, IotDvrChannel> cso = this.redisTemplate.opsForList();
|
|
|
+ List<IotDvrChannel> devices = cso.range(UK + RedisKey.AFTER_DATA, 0, -1);
|
|
|
+ int devSize = devices.size();
|
|
|
+ if (devSize > 0) {
|
|
|
+ cso.trim(UK + RedisKey.AFTER_DATA, devSize, -1L);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (list.size() == 0) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * 获取要逻辑删除的主机code和orgId
|
|
|
+ */
|
|
|
+ Set<EquipmentOrgVo> eos = new HashSet<>();
|
|
|
+ eos.addAll(list);
|
|
|
+
|
|
|
+ List<EquipmentOrgVo> eosList = new ArrayList<>();
|
|
|
+ eosList.addAll(eos);
|
|
|
+ /**
|
|
|
+ * 获取要进行批量处理的通道信息
|
|
|
+ * 并通过主机code,通道id,机构id进行去重
|
|
|
+ */
|
|
|
+ List<IotDvrChannel> devis = devices.stream().collect(Collectors.collectingAndThen(
|
|
|
+ Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getChannelCode() + ";" + o.getHostCode() + ";" + o.getOrgId()))), ArrayList::new));
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 第一步 逻辑删除对应 主机的 通道数据
|
|
|
+ * 第二步 批量更新数据
|
|
|
+ */
|
|
|
+ this.batchUpdateDel(eosList);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 自己写批量插入,更新方法,保证效率
|
|
|
+ */
|
|
|
+ //批量更新列表
|
|
|
+ List<IotDvrChannel> upList = new ArrayList<>();
|
|
|
+ //批量插入列表
|
|
|
+ List<IotDvrChannel> inList = new ArrayList<>();
|
|
|
+ if (devis.size() > 0) {
|
|
|
+ for (IotDvrChannel dev : devis) {
|
|
|
+ if (ObjectUtil.isNotEmpty(dev.getId())) {
|
|
|
+ upList.add(dev);
|
|
|
+ } else {
|
|
|
+ inList.add(dev);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (upList.size() > 0) {
|
|
|
+ this.batchUpdateDevice(upList);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (inList.size() > 0) {
|
|
|
+ this.batchInsertDevice(inList);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ LogUtils.BASE_INFO_CHANNEL.error("【北向定时任务:{} 执行异常】【异常原因:{}】", "通道信息-DataDealJobService", e);
|
|
|
+ throw new RuntimeException(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
+ public void batchUpdateDel(List<EquipmentOrgVo> eosList) {
|
|
|
+ baseMapper.batchUpdateDel(eosList);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
+ public void batchUpdateDevice(List<IotDvrChannel> upList) {
|
|
|
+ updateBatchById(upList);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @Transactional(rollbackFor = Exception.class)
|
|
|
+ public void batchInsertDevice(List<IotDvrChannel> inList) {
|
|
|
+ saveBatch(inList);
|
|
|
+ }
|
|
|
}
|