|
|
@@ -1,6 +1,7 @@
|
|
|
package com.xunmei.mediator.api.service.impl;
|
|
|
|
|
|
import cn.hutool.core.util.ObjectUtil;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
import com.xunmei.common.core.constant.CacheConstants;
|
|
|
import com.xunmei.common.core.constant.Constants;
|
|
|
@@ -44,8 +45,7 @@ import java.util.stream.Collectors;
|
|
|
*/
|
|
|
@Service
|
|
|
public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, IotDvrChannel> implements IIotDvrChannelService {
|
|
|
- @Resource
|
|
|
- private IMediatorCategoryService categoryService;
|
|
|
+
|
|
|
@Resource
|
|
|
private RemoteOrgService orgService;
|
|
|
@Resource
|
|
|
@@ -74,8 +74,6 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
if (ObjectUtil.isEmpty(deviceDtoList)) {
|
|
|
return ReceiveErrorDto.error("通道数据为空");
|
|
|
}
|
|
|
- MediatorCategory category = categoryService.findCategoryByCodeAndParentId("18", -1L);
|
|
|
-
|
|
|
DeviceDto deviceDto;
|
|
|
List<NorthError> errors = new ArrayList<>();
|
|
|
List<IotDvrChannel> list = new ArrayList<>();
|
|
|
@@ -85,7 +83,7 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
if (org != null) {
|
|
|
//制作一个随时变化的key
|
|
|
Boolean flag = redisTemplate.hasKey(CacheConstants.CHANNEL_KEY);
|
|
|
- if (!flag) {
|
|
|
+ if (Boolean.FALSE.equals(flag)) {
|
|
|
//判断是否存在这个key
|
|
|
this.redisTemplate.opsForValue().set(CacheConstants.CHANNEL_KEY, UK);
|
|
|
} else {
|
|
|
@@ -139,7 +137,7 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * 先不做数据处理,先缓存所有待处理的数据,然后每5分钟处理一次缓存数据
|
|
|
+ * 先不做数据处理,先缓存所有待处理的数据,然后每10分钟处理一次缓存数据 http://localhost:8080/mediator/channelCacheDeal
|
|
|
*/
|
|
|
|
|
|
|
|
|
@@ -152,7 +150,7 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- IotDvrChannel device = getChannelDevice(deviceDto, category);
|
|
|
+ IotDvrChannel device = getChannelDevice(deviceDto);
|
|
|
|
|
|
list.add(device);
|
|
|
|
|
|
@@ -163,15 +161,17 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
redisTemplate.expire(UK + CacheConstants.AFTER_DATA, 1, TimeUnit.HOURS);
|
|
|
}
|
|
|
northErrorService.saveErrorData(errors);
|
|
|
-
|
|
|
return ReceiveErrorDto.success();
|
|
|
}
|
|
|
|
|
|
- private IotDvrChannel getChannelDevice(DeviceDto deviceDto, MediatorCategory category) {
|
|
|
+ private IotDvrChannel getChannelDevice(DeviceDto deviceDto) {
|
|
|
IotDvrChannel device = new IotDvrChannel();
|
|
|
- IotDvrChannel old = this.findChannel(deviceDto.getEquipmentCode(), deviceDto.getChannelCode(), deviceDto.getOrg().getId());
|
|
|
+ final LocalDateTime now = LocalDateTime.now();
|
|
|
+ IotDvrChannel old = this.findChannel(deviceDto.getChannelCode(), deviceDto.getEquipmentCode(), deviceDto.getOrg().getId());
|
|
|
if (old != null) {
|
|
|
device = old;
|
|
|
+ }else {
|
|
|
+ device.setCreateTime(now);
|
|
|
}
|
|
|
device.setChannelName(deviceDto.getChannelName());
|
|
|
device.setHostCode(deviceDto.getEquipmentCode());
|
|
|
@@ -180,10 +180,7 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
device.setOrgName(deviceDto.getOrg().getShortName());
|
|
|
device.setChannelCode(Integer.parseInt(deviceDto.getChannelCode()));
|
|
|
device.setChannelAddr(deviceDto.getNetAddress());
|
|
|
- final LocalDateTime now = LocalDateTime.now();
|
|
|
device.setUpdateTime(now);
|
|
|
- device.setCreateTime(now);
|
|
|
-
|
|
|
device.setSource(1);
|
|
|
device.setDeleted(0);
|
|
|
return device;
|
|
|
@@ -194,12 +191,7 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
public void channelCacheDeal() {
|
|
|
try {
|
|
|
Boolean flag = redisTemplate.hasKey(CacheConstants.CHANNEL_KEY);
|
|
|
- if (!flag) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- Boolean isData = this.redisTemplate.hasKey(CacheConstants.CHANNEL_KEY);
|
|
|
- if (!isData) {
|
|
|
+ if (Boolean.FALSE.equals(flag)) {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -207,33 +199,17 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
|
|
|
ListOperations<String, EquipmentOrgVo> lso = this.redisTemplate.opsForList();
|
|
|
List<EquipmentOrgVo> list = lso.range(UK, 0, -1);
|
|
|
- int voSize = list.size();
|
|
|
- if (voSize > 0) {
|
|
|
- lso.trim(UK, voSize, -1L);
|
|
|
+ if (ObjectUtil.isEmpty(list)) {
|
|
|
+ return;
|
|
|
}
|
|
|
-
|
|
|
+ lso.trim(UK, list.size(), -1L);
|
|
|
ListOperations<String, IotDvrChannel> cso = this.redisTemplate.opsForList();
|
|
|
List<IotDvrChannel> devices = cso.range(UK + RedisKey.AFTER_DATA, 0, -1);
|
|
|
- int devSize = devices.size();
|
|
|
- if (devSize > 0) {
|
|
|
- cso.trim(UK + RedisKey.AFTER_DATA, devSize, -1L);
|
|
|
- }
|
|
|
-
|
|
|
- if (list.size() == 0) {
|
|
|
+ if (ObjectUtil.isEmpty(devices)) {
|
|
|
return;
|
|
|
}
|
|
|
- /**
|
|
|
- * 获取要逻辑删除的主机code和orgId
|
|
|
- */
|
|
|
- Set<EquipmentOrgVo> eos = new HashSet<>();
|
|
|
- eos.addAll(list);
|
|
|
-
|
|
|
- List<EquipmentOrgVo> eosList = new ArrayList<>();
|
|
|
- eosList.addAll(eos);
|
|
|
- /**
|
|
|
- * 获取要进行批量处理的通道信息
|
|
|
- * 并通过主机code,通道id,机构id进行去重
|
|
|
- */
|
|
|
+ cso.trim(UK + RedisKey.AFTER_DATA, devices.size(), -1L);
|
|
|
+ //获取要进行批量处理的通道信息,并通过主机code,通道id,机构id进行去重
|
|
|
List<IotDvrChannel> devis = devices.stream().collect(Collectors.collectingAndThen(
|
|
|
Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(o -> o.getChannelCode() + ";" + o.getHostCode() + ";" + o.getOrgId()))), ArrayList::new));
|
|
|
|
|
|
@@ -242,36 +218,18 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
* 第一步 逻辑删除对应 主机的 通道数据
|
|
|
* 第二步 批量更新数据
|
|
|
*/
|
|
|
- this.batchUpdateDel(eosList);
|
|
|
-
|
|
|
- /**
|
|
|
- * 自己写批量插入,更新方法,保证效率
|
|
|
- */
|
|
|
- //批量更新列表
|
|
|
- List<IotDvrChannel> upList = new ArrayList<>();
|
|
|
+ final List<EquipmentOrgVo> collect = list.stream().distinct().collect(Collectors.toList());
|
|
|
+ this.batchUpdateDel(collect);
|
|
|
//批量插入列表
|
|
|
- List<IotDvrChannel> inList = new ArrayList<>();
|
|
|
- if (devis.size() > 0) {
|
|
|
- for (IotDvrChannel dev : devis) {
|
|
|
- if (ObjectUtil.isNotEmpty(dev.getId())) {
|
|
|
- upList.add(dev);
|
|
|
- } else {
|
|
|
- inList.add(dev);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (upList.size() > 0) {
|
|
|
- this.batchUpdateDevice(upList);
|
|
|
- }
|
|
|
-
|
|
|
- if (inList.size() > 0) {
|
|
|
- this.batchInsertDevice(inList);
|
|
|
- }
|
|
|
+ final List<IotDvrChannel> inList = devis.stream().filter(d -> ObjectUtil.isEmpty(d.getId())).collect(Collectors.toList());
|
|
|
+ //批量更新列表
|
|
|
+ final List<IotDvrChannel> upList = devis.stream().filter(d -> ObjectUtil.isNotEmpty(d.getId())).collect(Collectors.toList());
|
|
|
+ this.batchUpdateDevice(upList);
|
|
|
+ this.batchInsertDevice(inList);
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
- LogUtils.BASE_INFO_CHANNEL.error("【北向定时任务:{} 执行异常】【异常原因:{}】", "通道信息-DataDealJobService", e);
|
|
|
+ LogUtils.BASE_INFO_CHANNEL.error("【北向定时任务:{} 执行异常】【异常原因:{}】", "通道信息-DataDealJobService", e.getMessage());
|
|
|
throw new RuntimeException(e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
@@ -279,18 +237,30 @@ public class IotDvrChannelServiceImpl extends ServiceImpl<IotDvrChannelMapper, I
|
|
|
@Override
|
|
|
@Transactional(rollbackFor = Exception.class)
|
|
|
public void batchUpdateDel(List<EquipmentOrgVo> eosList) {
|
|
|
- baseMapper.batchUpdateDel(eosList);
|
|
|
+ for (EquipmentOrgVo equipmentOrgVo : eosList) {
|
|
|
+ LambdaUpdateWrapper<IotDvrChannel> wrapper = new LambdaUpdateWrapper<IotDvrChannel>();
|
|
|
+ wrapper.eq(IotDvrChannel::getHostCode, equipmentOrgVo.getEquipmentCode());
|
|
|
+ wrapper.set(IotDvrChannel::getOrgId, equipmentOrgVo.getOrgId());
|
|
|
+ wrapper.set(IotDvrChannel::getDeleted, 1);
|
|
|
+ update(wrapper);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@Transactional(rollbackFor = Exception.class)
|
|
|
public void batchUpdateDevice(List<IotDvrChannel> upList) {
|
|
|
+ if (ObjectUtil.isEmpty(upList)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
updateBatchById(upList);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@Transactional(rollbackFor = Exception.class)
|
|
|
public void batchInsertDevice(List<IotDvrChannel> inList) {
|
|
|
+ if (ObjectUtil.isEmpty(inList)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
saveBatch(inList);
|
|
|
}
|
|
|
}
|