|
|
@@ -0,0 +1,189 @@
|
|
|
+package com.xunmei.mediator.api.airconditioner.service.impl;
|
|
|
+
|
|
|
+import cn.hutool.core.util.ObjectUtil;
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import com.baomidou.mybatisplus.core.toolkit.IdWorker;
|
|
|
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
+import com.xunmei.common.core.constant.CacheConstants;
|
|
|
+import com.xunmei.common.core.domain.iot.domain.IotAirConditioner;
|
|
|
+import com.xunmei.common.core.enums.iot.UpsStatus;
|
|
|
+import com.xunmei.common.core.thread.ThreadPoolConfig;
|
|
|
+import com.xunmei.common.redis.utils.RedisUtils;
|
|
|
+import com.xunmei.mediator.api.airconditioner.dto.AirConditionerDeviceDataReq;
|
|
|
+import com.xunmei.mediator.api.airconditioner.dto.AirConditionerDeviceStatusReq;
|
|
|
+import com.xunmei.mediator.api.airconditioner.mapper.IotAirConditionerMapper;
|
|
|
+import com.xunmei.mediator.api.airconditioner.service.IotAirConditionerService;
|
|
|
+import com.xunmei.mediator.api.north.service.NorthErrorService;
|
|
|
+import com.xunmei.mediator.api.north.service.impl.NorthStatisticsSyncService;
|
|
|
+import com.xunmei.mediator.util.CheckDataUtil;
|
|
|
+import com.xunmei.mediator.util.RedisCheckRepeatDataUtil;
|
|
|
+import com.xunmei.system.api.domain.north.NorthError;
|
|
|
+import com.xunmei.system.api.dto.DataPageDto;
|
|
|
+import com.xunmei.system.api.dto.protection.ReceiveErrorDto;
|
|
|
+import com.xunmei.system.api.enums.DataType;
|
|
|
+import com.xunmei.system.api.util.LogUtils;
|
|
|
+import com.xunmei.system.api.vo.SysOrgVO;
|
|
|
+import io.netty.util.internal.StringUtil;
|
|
|
+import org.springframework.beans.BeanUtils;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.*;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+
|
|
|
+/**
|
|
|
+ * <p>
|
|
|
+ * 服务实现类
|
|
|
+ * </p>
|
|
|
+ *
|
|
|
+ * @author jingYuanChao
|
|
|
+ * @since 2024-06-25
|
|
|
+ */
|
|
|
+@Service
|
|
|
+public class IotAirConditionerServiceImpl extends ServiceImpl<IotAirConditionerMapper, IotAirConditioner> implements IotAirConditionerService {
|
|
|
+ @Autowired
|
|
|
+ @Qualifier(ThreadPoolConfig.SOC_EXECUTOR)
|
|
|
+ private ThreadPoolTaskExecutor threadPoolTaskExecutor;
|
|
|
+ @Autowired
|
|
|
+ NorthStatisticsSyncService northStatisticsSyncService;
|
|
|
+ @Autowired
|
|
|
+ NorthErrorService northErrorService;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ReceiveErrorDto saveDeviceData(DataPageDto<AirConditionerDeviceDataReq> request, String msgId, String path) {
|
|
|
+ try { //验证基础分页信息
|
|
|
+ ReceiveErrorDto dto = CheckDataUtil.checkObjFieldIsNull(request);
|
|
|
+ if (!dto.getSuccess()) {
|
|
|
+ return dto;
|
|
|
+ }
|
|
|
+ List<AirConditionerDeviceDataReq> airConditionerDeviceList = RedisCheckRepeatDataUtil.isCompleted(request, AirConditionerDeviceDataReq.class);
|
|
|
+ LogUtils.BASE_INFO_UPS_DEVICE.info("判断空调基础信息是否已获取完整分页数据:{}", !airConditionerDeviceList.isEmpty());
|
|
|
+ if (airConditionerDeviceList.isEmpty()) {
|
|
|
+ return ReceiveErrorDto.success();
|
|
|
+ }
|
|
|
+ Map<String, Object> dataMap = checkParmaAndBuildData(airConditionerDeviceList, msgId, path);
|
|
|
+ doSyncTask(() -> saveData(dataMap,airConditionerDeviceList.size()));
|
|
|
+ } catch (Exception e) {
|
|
|
+ LogUtils.BASE_INFO_UPS_DEVICE.error("保存空调基础信息发生异常:{}", e);
|
|
|
+ return ReceiveErrorDto.error(e.getMessage());
|
|
|
+ }
|
|
|
+ return ReceiveErrorDto.success();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void saveData(Map<String, Object> map,int size) {
|
|
|
+
|
|
|
+ List<NorthError> errors = (List<NorthError>) map.get("errors");
|
|
|
+ List<IotAirConditioner> insertList = (List<IotAirConditioner>) map.get("insertList");
|
|
|
+ List<IotAirConditioner> updateList = (List<IotAirConditioner>) map.get("updateList");
|
|
|
+ List<Long> orgIdList = (List<Long>) map.get("orgIdList");
|
|
|
+ this.northStatisticsSyncService.saveOrUpdateBaseCountByDataType(DataType.airConditionerCount.getIndex(), size, false);
|
|
|
+
|
|
|
+ try {
|
|
|
+ northErrorService.saveErrorData(errors);
|
|
|
+ //this.updateDel(orgIdList);
|
|
|
+ if (ObjectUtil.isNotEmpty(insertList)) {
|
|
|
+ this.saveOrUpdateBatch(insertList);
|
|
|
+ }
|
|
|
+ if (ObjectUtil.isNotEmpty(updateList)) {
|
|
|
+ this.saveOrUpdateBatch(updateList);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LogUtils.BASE_INFO_UPS_DEVICE.error("批量保存ups失败,{}", e);
|
|
|
+ throw new RuntimeException(e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private void doSyncTask(Runnable runnable) throws RuntimeException {
|
|
|
+ Future<?> future = threadPoolTaskExecutor.submit(runnable);
|
|
|
+ try {
|
|
|
+ future.get(); // 这将阻塞直到任务完成,并捕获任何异常
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ Thread.currentThread().interrupt(); // 恢复中断状态
|
|
|
+ LogUtils.BASE_INFO_UPS_DEVICE.error("异步任务被中断:{}", e.getMessage());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LogUtils.BASE_INFO_UPS_DEVICE.error("异步任务执行异常:{}", e.getMessage());
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private Map<String, Object> checkParmaAndBuildData(List<AirConditionerDeviceDataReq> airConditionerDeviceList, String msgId, String path) throws IllegalAccessException {
|
|
|
+ List<SysOrgVO> orgList = RedisUtils.getCacheList(CacheConstants.ORG_CACHE_LIST_KEY);
|
|
|
+ Map<String, Object> map = new HashMap<>();
|
|
|
+ List<NorthError> errors = new ArrayList<>();
|
|
|
+ List<Long> list = new ArrayList<>();
|
|
|
+ List<IotAirConditioner> insertList = new ArrayList<>();
|
|
|
+ List<IotAirConditioner> updateList = new ArrayList<>();
|
|
|
+ List<IotAirConditioner> conditionerList = findList();
|
|
|
+ NorthError error = null;
|
|
|
+ for (AirConditionerDeviceDataReq req : airConditionerDeviceList) {
|
|
|
+ error = CheckDataUtil.checkObjFieldIsNull(req, msgId, path, null);
|
|
|
+ if (error != null) {
|
|
|
+ errors.add(error);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ Optional<SysOrgVO> orgOptional = orgList.stream().filter(r -> r.getCode().equals(req.getOrgCode())).findFirst();
|
|
|
+ if (!orgOptional.isPresent()) {
|
|
|
+ error = new NorthError(msgId, path, req, "orgCode:" + req.getOrgCode() + "不正确");
|
|
|
+ errors.add(error);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ SysOrgVO org = orgOptional.get();
|
|
|
+ if (!list.contains(org.getId())) {
|
|
|
+ list.add(org.getId());
|
|
|
+ }
|
|
|
+ IotAirConditioner newAirConditioner = populateAirConditioner(req, org);
|
|
|
+ //判断老数据中是否有新数据
|
|
|
+ Optional<IotAirConditioner> optional = conditionerList.stream().filter(r -> ObjectUtil.equal(r.getUniqueCode(), newAirConditioner.getUniqueCode())).findAny();
|
|
|
+ if (optional.isPresent()) {
|
|
|
+ IotAirConditioner airConditioner = optional.get();
|
|
|
+ BeanUtils.copyProperties(newAirConditioner, airConditioner, "id");
|
|
|
+ airConditioner.setUpdateTime(LocalDateTime.now());
|
|
|
+ updateList.add(airConditioner);
|
|
|
+ } else {
|
|
|
+ insertList.add(newAirConditioner);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ map.put("insertList", insertList);
|
|
|
+ map.put("updateList", updateList);
|
|
|
+ map.put("errors", errors);
|
|
|
+ map.put("orgIdList", list);
|
|
|
+ return map;
|
|
|
+ }
|
|
|
+
|
|
|
+ private IotAirConditioner populateAirConditioner(AirConditionerDeviceDataReq req, SysOrgVO org) {
|
|
|
+ return IotAirConditioner.builder()
|
|
|
+ .id(IdWorker.getId())
|
|
|
+ .deviceName(req.getDeviceName())
|
|
|
+ .orgId(org.getId())
|
|
|
+ .orgName(org.getShortName())
|
|
|
+ .orgPath(org.getPath())
|
|
|
+ .hostCode(req.getHostCode())
|
|
|
+ .uniqueCode(IotAirConditioner.getUniqueCode(org.getCode(), req.getHostCode(), req.getDeviceCode()))
|
|
|
+ .status(UpsStatus.UN_KNOW.getStatus())
|
|
|
+ .deviceType(req.getDeviceType())
|
|
|
+ .deviceCode(req.getDeviceCode())
|
|
|
+ .info(StringUtil.EMPTY_STRING)
|
|
|
+ .createTime(LocalDateTime.now())
|
|
|
+ .updateTime(LocalDateTime.now())
|
|
|
+ .deleted(0)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private List<IotAirConditioner> findList() {
|
|
|
+ LambdaQueryWrapper<IotAirConditioner> wrapper = new LambdaQueryWrapper<>();
|
|
|
+ wrapper.eq(IotAirConditioner::getDeleted, 0);
|
|
|
+ return baseMapper.selectList(wrapper);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public ReceiveErrorDto saveDeviceStatus(AirConditionerDeviceStatusReq request, String msgId, String path) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+}
|