Bläddra i källkod

修改实时轨迹接口更新逻辑

liangjianf 2 år sedan
förälder
incheckning
43df1692b6

+ 0 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouShipTrackConsumer.java

@@ -32,7 +32,6 @@ public class BeidouShipTrackConsumer {
             topics = {"${taiji.kafka.consumer.beidou.topic}"}
     )
     public void dynamicTrack(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
-//        beidouTrackService.beidouDynamicShipToRedis(records);
         beidouTrackService.beidouDynamicShipToCache(records);
         beidouTrackService.beidouDynamicShipToKafka(records);
         beidouLocationService.beidouDynamicShipToCache(records);

+ 6 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/BeidouShipLocationDTO.java

@@ -45,6 +45,7 @@ public class BeidouShipLocationDTO extends IGeomesaTrackDTO implements Serializa
     private String isInport;
     private String isAnchor;
     private String deviceStatus;
+    private String xwDeptId;
 
     @Override
     public boolean checkPoint() {
@@ -98,6 +99,7 @@ public class BeidouShipLocationDTO extends IGeomesaTrackDTO implements Serializa
             attributes.append("isInport:String,");
             attributes.append("isAnchor:String,");
             attributes.append("deviceStatus:String,");
+            attributes.append("xwDeptId:String,");
 
             attributes.append("layerType:String,");
             attributes.append("syncTime:Date");
@@ -131,6 +133,7 @@ public class BeidouShipLocationDTO extends IGeomesaTrackDTO implements Serializa
         }else{
             builder.set("deviceStatus", 0);
         }
+        builder.set("xwDeptId", xwDeptId);
 
         builder.set("layerType", getLayerType());
         builder.set("syncTime", new Date());
@@ -141,7 +144,7 @@ public class BeidouShipLocationDTO extends IGeomesaTrackDTO implements Serializa
     @Override
     public String[] getPropList(){
         return new String[] {
-//                "deviceId",
+                "deviceId",
                 "location",
                 "shipType",
                 "workType",
@@ -162,6 +165,7 @@ public class BeidouShipLocationDTO extends IGeomesaTrackDTO implements Serializa
                 "isInport",
                 "isAnchor",
                 "deviceStatus",
+                "xwDeptId",
                 "layerType",
                 "syncTime"
         };
@@ -191,6 +195,7 @@ public class BeidouShipLocationDTO extends IGeomesaTrackDTO implements Serializa
                 isInport,
                 isAnchor,
                 deviceStatus,
+                xwDeptId,
                 getLayerType(),
                 new Date()
         };

+ 18 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/ShipStatusEntity.java

@@ -1,5 +1,6 @@
 package cn.com.taiji.track.entity;
 
+import cn.com.taiji.track.constants.WarningCodeConstants;
 import com.baomidou.mybatisplus.annotation.EnumValue;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
@@ -54,6 +55,14 @@ public class ShipStatusEntity{
         @JsonValue    //需要在前端展示哪个值就在哪个属性上加上该注解
         private String text;
 
+        public static OnlineStatus getStatus(String start) {
+            if("true".equals(start)){
+                return ONLINE;
+            }else{
+                return OFFLINE;
+            }
+        }
+
         public String getValue() {
             return value;
         }
@@ -71,6 +80,15 @@ public class ShipStatusEntity{
         @JsonValue    //需要在前端展示哪个值就在哪个属性上加上该注解
         private String text;
 
+        public static InportStatus getStatus(String inportCode) {
+            switch (inportCode) {
+                case WarningCodeConstants.INPORT_WARNING:
+                    return TRUE;
+                default:
+                    return FALSE;
+            }
+        }
+
         public String getValue() {
             return value;
         }

+ 6 - 2
beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/BeidouLocationMapper.java

@@ -17,10 +17,14 @@ import java.util.List;
  */
 @Mapper
 public interface BeidouLocationMapper extends BaseMapper<BeidouLocationEntity> {
-    @Select("SELECT l.*,s.is_online,s.is_inport,s.is_anchor,d.dispose_type " +
+    @Select("SELECT l.*,s.is_online,s.is_inport,s.is_anchor,d.dispose_type,dept.xw_dept_id,\n" +
+            "IFNULL(l.ship_name,ship_id) AS ship_name\n" +
             "FROM ax_beidou_ship_location l\n" +
             "LEFT JOIN ax_beidou_ship_status s on s.device_id = l.device_id\n" +
-            "LEFT JOIN ax_beidou_ship_dispose d on d.devide_no = l.device_id")
+            "LEFT JOIN ax_beidou_ship_dispose d on d.devide_no = l.device_id\n" +
+            "LEFT JOIN ax_beidou_ship_archives a on a.devide_no = l.device_id\n" +
+            "LEFT JOIN ax_beidou_dept dept on dept.pscbh = a.police_station_id\n" +
+            "ORDER BY device_id,location_time")
     List<BeidouShipLocationDTO> listLocation();
 }
 

+ 1 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/schedule/LocationSchedule.java

@@ -17,7 +17,7 @@ public class LocationSchedule {
     @Autowired
     private IBeidouLocationService beidouLocationService;
 
-    @Scheduled(cron = "*/3 * * * * ?")
+    @Scheduled(cron = "*/30 * * * * ?")
     private void listLocationToCache() {
         beidouLocationService.listLocationToCache();
     }

+ 4 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/schedule/PersistenceSchedule.java

@@ -36,7 +36,10 @@ public class PersistenceSchedule {
         beidouLocationService.cacheBeidouToMySql();
     }
     @Scheduled(cron = "*/10 * * * * ?")
-    private void cacheStatusToMySql() { shipStatusService.cacheToMySql();}
+    private void cacheStatusToMySql() {
+//        shipStatusService.cacheToMySql();
+        shipStatusService.onlineCacheToMySql();
+    }
     @Scheduled(cron = "*/10 * * * * ?")
     private void anchorCacheToMySql() { shipStatusService.anchorCacheToMySql();}
     @Scheduled(cron = "*/10 * * * * ?")

+ 4 - 6
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IShipStatusService.java

@@ -10,12 +10,6 @@ import java.util.Map;
  */
 public interface IShipStatusService extends IService<ShipStatusEntity> {
 
-    void pushOffLineDeviceId(String deviceId);
-
-    void cacheToMySql();
-
-    void pushOnLineDeviceId(String deviceId);
-
     void anchorCacheToMySql();
 
     void pushAnchorDeviceId(String deviceId, String start);
@@ -23,4 +17,8 @@ public interface IShipStatusService extends IService<ShipStatusEntity> {
     void pushinportDeviceId(String deviceId, String inportWarning);
 
     void InportCacheToMySql();
+
+    void pushOnLineDeviceId(String deviceId, String start);
+
+    void onlineCacheToMySql();
 }

+ 11 - 5
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouLocationServiceImpl.java

@@ -66,7 +66,7 @@ public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper,
                     BigDecimal latitudeDecimal = new BigDecimal(locationEntity.getLatitude());
                     locationEntity.setLatitude(LatLngUtil.latLng2Dfm(latitudeDecimal.doubleValue()));
                     locationEntities.add(locationEntity);
-                    shipStatusService.pushOnLineDeviceId(locationEntity.getDeviceId());
+                    shipStatusService.pushOnLineDeviceId(locationEntity.getDeviceId(),"true");
                 }
             }
         } catch (Exception e) {
@@ -81,12 +81,13 @@ public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper,
             List<BeidouLocationEntity> saveList = new ArrayList<>(locationEntities);
             locationEntities.clear();
             saveBatch(saveList);
-//            log.info("位置数据写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
+            log.info("位置数据写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
         }
     }
 
     @Override
     public void listLocationToCache() {
+        Date begin = new Date();
         List<BeidouShipLocationDTO> results = baseMapper.listLocation();
         if (results.size() > 0) {
             FeatureReader<SimpleFeatureType, SimpleFeature> reader = null;
@@ -114,9 +115,13 @@ public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper,
                     reader = redisDataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
                     if (reader.hasNext()) {
                         append = false;
-                        String[] propList = dto.getPropList();
-                        Object[] valueList = dto.getValueList();
-                        redisDataStore.getFeatureSource(dto.getTypeName()).modifyFeatures(propList, valueList, filter);
+                        SimpleFeature feature=reader.next();
+                        String locationTime = feature.getAttribute("locationTime").toString();
+                        if(!locationTime.equals(String.valueOf(dto.getValueList()[6]))){
+                            String[] propList = dto.getPropList();
+                            Object[] valueList = dto.getValueList();
+                            redisDataStore.getFeatureSource(dto.getTypeName()).modifyFeatures(propList, valueList, filter);
+                        }
                     }
                     reader.close();
                     reader = null;
@@ -147,6 +152,7 @@ public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper,
                     e.printStackTrace();
                 }
             }
+//            log.info("位置数据插入geomesa完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), results.size());
         }
     }
 }

+ 78 - 106
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/ShipStatusServiceImpl.java

@@ -4,7 +4,6 @@ import cn.com.taiji.track.constants.WarningCodeConstants;
 import cn.com.taiji.track.entity.ShipStatusEntity;
 import cn.com.taiji.track.mapper.ShipStatusMapper;
 import cn.com.taiji.track.service.IShipStatusService;
-import com.alibaba.druid.util.StringUtils;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -19,19 +18,13 @@ import java.util.concurrent.ConcurrentHashMap;
 @Slf4j
 public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipStatusEntity> implements IShipStatusService {
 
-    private Vector<String> offLineDeviceIds= new Vector<>();
-    private Vector<String> onLineDeviceIds= new Vector<>();
+    private Map<String, String> onlineMap= new ConcurrentHashMap<>();
     private Map<String, String> anchorMap= new ConcurrentHashMap<>();
     private Map<String, String> inportMap= new ConcurrentHashMap<>();
 
     @Override
-    public void pushOffLineDeviceId(String deviceId){
-        offLineDeviceIds.add(deviceId);
-    }
-
-    @Override
-    public void pushOnLineDeviceId(String deviceId) {
-        onLineDeviceIds.add(deviceId);
+    public void pushOnLineDeviceId(String deviceId, String start) {
+        onlineMap.put(deviceId, start);
     }
 
     @Override
@@ -45,75 +38,34 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
     }
 
     @Override
-    public void cacheToMySql() {
-        Date begin = new Date();
-        List<ShipStatusEntity> saveList = new ArrayList<>();
-        if(offLineDeviceIds.size() > 0){
-            List<String> idList = new ArrayList<>(offLineDeviceIds);
-            offLineDeviceIds.clear();
-            for (String id : idList) {
-                ShipStatusEntity entity = baseMapper.selectById(id);
-                if (entity == null) {
-                    entity = new ShipStatusEntity();
-                    entity.setDeviceId(id);
-                    entity.setIsOnline(ShipStatusEntity.OnlineStatus.OFFLINE);
-                    entity.setOnlineChangeTime(new Date());
-                    saveList.add(entity);
-                }else if (ShipStatusEntity.OnlineStatus.ONLINE.equals(entity.getIsOnline())) {
-                    entity.setIsOnline(ShipStatusEntity.OnlineStatus.OFFLINE);
-                    entity.setOnlineChangeTime(new Date());
-                    saveList.add(entity);
-                }
-            }
-        }
-        if(onLineDeviceIds.size() > 0){
-            List<String> idList = new ArrayList<>(onLineDeviceIds);
-            onLineDeviceIds.clear();
-            for (String id : idList) {
-                ShipStatusEntity entity = baseMapper.selectById(id);
-                if (entity == null) {
-                    entity = new ShipStatusEntity();
-                    entity.setDeviceId(id);
-                    entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
-                    entity.setOnlineChangeTime(new Date());
-                    saveList.add(entity);
-                }else if (ShipStatusEntity.OnlineStatus.OFFLINE.equals(entity.getIsOnline())) {
-                    entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
-                    entity.setOnlineChangeTime(new Date());
-                    saveList.add(entity);
-                }
-            }
-        }
-        if(!saveList.isEmpty()){
-            saveBatch(saveList);
-            log.info("船舶状态写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
-        }
-    }
-
-    @Override
     public void anchorCacheToMySql() {
         Date begin = new Date();
         List<ShipStatusEntity> saveList = new ArrayList<>();
         if(anchorMap.size() > 0){
             Map<String,String> map = new HashMap<>(anchorMap);
             anchorMap.clear();
+            List<ShipStatusEntity> statusEntityList = baseMapper.selectBatchIds(map.keySet());
+            if(statusEntityList.size() > 0) {
+                for(ShipStatusEntity statusEntity : statusEntityList){
+                    String start = map.get(statusEntity.getDeviceId());
+                    ShipStatusEntity.AnchorStatus status = ShipStatusEntity.AnchorStatus.getStatus(start);
+                    if(!status.equals(statusEntity.getIsAnchor())){
+                        statusEntity.setIsAnchor(status);
+                        saveList.add(statusEntity);
+                    }
+                    map.remove(statusEntity.getDeviceId());
+                }
+            }
             for (String id : map.keySet()) {
                 String start = map.get(id);
-                ShipStatusEntity entity = baseMapper.selectById(id);
                 ShipStatusEntity.AnchorStatus status = ShipStatusEntity.AnchorStatus.getStatus(start);
-                if (entity == null) {
-                    entity = new ShipStatusEntity();
-                    entity.setDeviceId(id);
-                    entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
-                    entity.setOnlineChangeTime(new Date());
-                    entity.setIsAnchor(status);
-                    entity.setAnchorChangeTime(new Date());
-                    saveList.add(entity);
-                }else if (!status.equals(entity.getIsAnchor())) {
-                    entity.setIsAnchor(status);
-                    entity.setAnchorChangeTime(new Date());
-                    saveList.add(entity);
-                }
+                ShipStatusEntity entity = new ShipStatusEntity();
+                entity.setDeviceId(id);
+                entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
+                entity.setOnlineChangeTime(new Date());
+                entity.setIsAnchor(status);
+                entity.setAnchorChangeTime(new Date());
+                saveList.add(entity);
             }
         }
         if(!saveList.isEmpty()){
@@ -128,48 +80,68 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
         if (inportMap.size() > 0) {
             Map<String, String> map = new HashMap<>(inportMap);
             inportMap.clear();
-            for (String deviceId : map.keySet()) {
-                String inportCode = map.get(deviceId);
-                ShipStatusEntity entity = baseMapper.selectById(deviceId);
-                switch (inportCode) {
-                    case WarningCodeConstants.INPORT_WARNING:
-                        if (entity == null) {
-                            entity = new ShipStatusEntity();
-                            entity.setDeviceId(deviceId);
-                            entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
-                            entity.setOnlineChangeTime(new Date());
-                            entity.setIsInport(ShipStatusEntity.InportStatus.TRUE);
-                            entity.setInportChangeTime(new Date());
-                            saveList.add(entity);
-                        } else {
-                            entity.setIsInport(ShipStatusEntity.InportStatus.TRUE);
-                            entity.setInportChangeTime(new Date());
-                            saveList.add(entity);
-                        }
-                        break;
-                    case WarningCodeConstants.OUTPROT_WARNING:
-                        if (entity == null) {
-                            entity = new ShipStatusEntity();
-                            entity.setDeviceId(deviceId);
-                            entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
-                            entity.setOnlineChangeTime(new Date());
-                            entity.setIsInport(ShipStatusEntity.InportStatus.FALSE);
-                            entity.setInportChangeTime(new Date());
-                            saveList.add(entity);
-                        } else {
-                            entity.setIsInport(ShipStatusEntity.InportStatus.FALSE);
-                            entity.setInportChangeTime(new Date());
-                            saveList.add(entity);
-                        }
-                        break;
-                    default:
-                        break;
+            List<ShipStatusEntity> statusEntityList = baseMapper.selectBatchIds(map.keySet());
+            if(statusEntityList.size() > 0) {
+                for(ShipStatusEntity statusEntity : statusEntityList){
+                    String inportCode = map.get(statusEntity.getDeviceId());
+                    ShipStatusEntity.InportStatus status = ShipStatusEntity.InportStatus.getStatus(inportCode);
+                    if(!status.equals(statusEntity.getIsAnchor())){
+                        statusEntity.setIsInport(status);
+                        saveList.add(statusEntity);
+                    }
+                    map.remove(statusEntity.getDeviceId());
                 }
             }
+            for (String id : map.keySet()) {
+                String inportCode = map.get(id);
+                ShipStatusEntity.InportStatus status = ShipStatusEntity.InportStatus.getStatus(inportCode);
+                ShipStatusEntity entity = new ShipStatusEntity();
+                entity.setDeviceId(id);
+                entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
+                entity.setOnlineChangeTime(new Date());
+                entity.setIsInport(status);
+                entity.setInportChangeTime(new Date());
+                saveList.add(entity);
+            }
             if (!saveList.isEmpty()) {
                 saveBatch(saveList);
                 log.info("进出港状态写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
             }
         }
     }
+
+    @Override
+    public void onlineCacheToMySql() {
+        Date begin = new Date();
+        List<ShipStatusEntity> saveList = new ArrayList<>();
+        if(onlineMap.size() > 0) {
+            Map<String, String> map = new HashMap<>(onlineMap);
+            onlineMap.clear();
+            List<ShipStatusEntity> statusEntityList = baseMapper.selectBatchIds(map.keySet());
+            if(statusEntityList.size() > 0){
+                for(ShipStatusEntity statusEntity : statusEntityList){
+                    String start = map.get(statusEntity.getDeviceId());
+                    ShipStatusEntity.OnlineStatus status = ShipStatusEntity.OnlineStatus.getStatus(start);
+                    if(!status.equals(statusEntity.getIsOnline())){
+                        statusEntity.setIsOnline(status);
+                        saveList.add(statusEntity);
+                    }
+                    map.remove(statusEntity.getDeviceId());
+                }
+            }
+            for (String id : map.keySet()) {
+                String start = map.get(id);
+                ShipStatusEntity.OnlineStatus status = ShipStatusEntity.OnlineStatus.getStatus(start);
+                ShipStatusEntity entity = new ShipStatusEntity();
+                entity.setDeviceId(id);
+                entity.setIsOnline(status);
+                entity.setOnlineChangeTime(new Date());
+                saveList.add(entity);
+            }
+        }
+        if(!saveList.isEmpty()){
+            saveBatch(saveList);
+            log.info("船舶状态写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
+        }
+    }
 }

+ 6 - 5
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/WarningRecordServiceImpl.java

@@ -42,11 +42,12 @@ public class WarningRecordServiceImpl extends ServiceImpl<WarningRecordMapper, W
                     entity.setUpdateTime(new Date());
                     entities.add(entity);
                     if (WarningCodeConstants.OFFLINE_WARNING.equals(entity.getModelCode())) {
-                        if("true".equals(entity.getStart())){
-                            shipStatusService.pushOffLineDeviceId(entity.getDeviceId());
-                        }else{
-                            shipStatusService.pushOnLineDeviceId(entity.getDeviceId());
-                        }
+                        shipStatusService.pushOnLineDeviceId(entity.getDeviceId(),entity.getStart());
+//                        if("true".equals(entity.getStart())){
+//                            shipStatusService.pushOffLineDeviceId(entity.getDeviceId());
+//                        }else{
+//                            shipStatusService.pushOnLineDeviceId(entity.getDeviceId());
+//                        }
                     }
                     if (WarningCodeConstants.ANCHOR_WARNING.equals(entity.getModelCode())) {
                         shipStatusService.pushAnchorDeviceId(entity.getDeviceId(),entity.getStart());

+ 2 - 2
beidou-track-geomesa/src/main/resources/application-prod.yml

@@ -106,10 +106,10 @@ taiji:
       beidou:
         topic: taiji_ax_beidou_dynamic_ship
     consumer:
-      bootstrap-servers: 10.112.89.239:9092
+      bootstrap-servers: 172.28.19.20:29092
       beidou:
         enable: true
-        expiry: 70
+        expiry: 3600
         topic: sgAxBeidouTrack
         group: sgAxBeidouTrack—${random.uuid}
       warning: