Przeglądaj źródła

修改轨迹查询逻辑与抛锚状态逻辑

liangjianf 2 lat temu
rodzic
commit
6d2b5ebb4d

+ 2 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/KafkaConfig.java

@@ -31,8 +31,8 @@ public class KafkaConfig {
         factory.setConsumerFactory(consumerFactory());
         factory.setConcurrency(3);
         factory.getContainerProperties().setPollTimeout(3000);
-        factory.setBatchListener(true);
         factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+        factory.setBatchListener(true);
         return factory;
     }
 
@@ -44,6 +44,7 @@ public class KafkaConfig {
         Map<String, Object> props = new HashMap<>();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1000);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

+ 2 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/constants/WarningCodeConstants.java

@@ -11,5 +11,7 @@ public class WarningCodeConstants {
 
     public static final String OFFLINE_WARNING = "TJ_CB_10_02";
 
+    public static final String ANCHOR_WARNING = "TJ_CB_01_03";
+
     public static final String FUSION_SHIP = "taiji_ax_ship_dynamic_fusion";
 }

+ 2 - 2
beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouShipTrackConsumer.java

@@ -27,8 +27,8 @@ public class BeidouShipTrackConsumer {
     private IBeidouLocationService beidouLocationService;
 
     @KafkaListener(
-//            containerFactory = "kafkaContainerFactory",
-            groupId = "${taiji.kafka.consumer.beidou.group}", //测试时切换配置
+            containerFactory = "kafkaContainerFactory",
+//            groupId = "${taiji.kafka.consumer.beidou.group}", //测试时切换配置
             topics = {"${taiji.kafka.consumer.beidou.topic}"}
     )
     public void dynamicTrack(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {

+ 4 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/FocusShipEntity.java

@@ -22,6 +22,10 @@ public class FocusShipEntity {
      */
     private String focusId;
     /**
+     * 设备id
+     */
+    private String devideNo;
+    /**
      * 关注船舶id
      */
     private String shipId;

+ 30 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/ShipStatusEntity.java

@@ -39,6 +39,11 @@ public class ShipStatusEntity{
     /** 在港状态变更时间 */
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
     private Date inportChangeTime;
+    /**  0否 1是 */
+    private AnchorStatus isAnchor;
+    /** 在港状态变更时间 */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
+    private Date anchorChangeTime;
 
     public enum OnlineStatus {
         ONLINE("1","在线"),
@@ -58,4 +63,29 @@ public class ShipStatusEntity{
             this.text = text;
         }
     }
+    public enum AnchorStatus {
+        FALSE("0","否"),
+        TRUE("1","是");
+        @EnumValue
+        private final String value;
+        @JsonValue    //需要在前端展示哪个值就在哪个属性上加上该注解
+        private String text;
+
+        public String getValue() {
+            return value;
+        }
+
+        public static AnchorStatus getStatus(String start) {
+            if("true".equals(start)){
+                return TRUE;
+            }else{
+                return FALSE;
+            }
+        }
+
+        private AnchorStatus(String value,String text) {
+            this.value = value;
+            this.text = text;
+        }
+    }
 }

+ 2 - 2
beidou-track-geomesa/src/main/java/cn/com/taiji/track/handler/WebSocketHandler.java

@@ -182,7 +182,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
      * @param msg
      */
     public void sendMsgToSpecifyUser(String userId, String msg) {
-        log.info("推送数据--{},到指定用户{}", msg, userId);
+//        log.info("推送数据--{},到指定用户{}", msg, userId);
         ConcurrentHashMap<String, Object> userChannelMap = NettyConfig.getUserChannelMap();
         Channel channel = (Channel) userChannelMap.get(userId);
         if (channel != null) {
@@ -197,7 +197,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketF
      * @param msg
      */
     public void sendMsgToSpecifyChannel(ChannelId channelId, String msg) {
-        log.info("推送数据--{},到指定信道{}", msg, channelId);
+//        log.info("推送数据--{},到指定信道{}", msg, channelId);
         ChannelGroup channelGroup = NettyConfig.getChannelGroup();
         Channel channel = channelGroup.find(channelId);
         if (channel != null) {

+ 2 - 2
beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/FocusTrackChannelListener.java

@@ -119,14 +119,14 @@ public class FocusTrackChannelListener {
             List<Filter> filters = new ArrayList<>();
             if(entities != null) {
                 for (FocusShipEntity input : entities) {
-                    Filter filter = CQL.toFilter("deviceId = " + input.getShipId());
+                    Filter filter = CQL.toFilter("deviceId = \'" + input.getDevideNo() + "\'");
                     filters.add(filter);
                 }
                 Filter filter = ff.or(filters);
                 String cql = dto.getCql();
                 String realCql = dto.getRealCql();
                 if (StringUtils.hasText(cql)) {
-                    Filter cqlFilter = ECQL.toFilter(cql);
+                    Filter cqlFilter = CQL.toFilter(cql);
                     filter = ff.and(filter,cqlFilter);
                 }if (StringUtils.hasText(realCql)) {
                     Filter realCqlFilter = ECQL.toFilter(realCql);

+ 6 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IShipStatusService.java

@@ -3,6 +3,8 @@ package cn.com.taiji.track.service;
 import cn.com.taiji.track.entity.ShipStatusEntity;
 import com.baomidou.mybatisplus.extension.service.IService;
 
+import java.util.Map;
+
 /**
  * @author kok20
  */
@@ -13,4 +15,8 @@ public interface IShipStatusService extends IService<ShipStatusEntity> {
     void cacheToMySql();
 
     void pushOnLineDeviceId(String deviceId);
+
+    void anchorCacheToMySql();
+
+    void pushAnchorDeviceId(String deviceId, String start);
 }

+ 50 - 13
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/ShipStatusServiceImpl.java

@@ -7,10 +7,8 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Vector;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author kok20
@@ -19,9 +17,9 @@ import java.util.Vector;
 @Slf4j
 public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipStatusEntity> implements IShipStatusService {
 
-    private Vector<ShipStatusEntity> entities= new Vector<>();
     private Vector<String> offLineDeviceIds= new Vector<>();
     private Vector<String> onLineDeviceIds= new Vector<>();
+    private Map<String, String> anchorMap= new ConcurrentHashMap<>();
 
     @Override
     public void pushOffLineDeviceId(String deviceId){
@@ -32,9 +30,16 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
     public void pushOnLineDeviceId(String deviceId) {
         onLineDeviceIds.add(deviceId);
     }
+
+    @Override
+    public void pushAnchorDeviceId(String deviceId, String start) {
+        anchorMap.put(deviceId, start);
+    }
+
     @Override
     public void cacheToMySql() {
         Date begin = new Date();
+        List<ShipStatusEntity> saveList = new ArrayList<>();
         if(offLineDeviceIds.size() > 0){
             List<String> idList = new ArrayList<>(offLineDeviceIds);
             offLineDeviceIds.clear();
@@ -45,11 +50,11 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
                     entity.setDeviceId(id);
                     entity.setIsOnline(ShipStatusEntity.OnlineStatus.OFFLINE);
                     entity.setOnlineChangeTime(new Date());
-                    entities.add(entity);
-                }else if (!ShipStatusEntity.OnlineStatus.OFFLINE.equals(entity.getIsOnline())) {
+                    saveList.add(entity);
+                }else if (ShipStatusEntity.OnlineStatus.ONLINE.equals(entity.getIsOnline())) {
                     entity.setIsOnline(ShipStatusEntity.OnlineStatus.OFFLINE);
                     entity.setOnlineChangeTime(new Date());
-                    entities.add(entity);
+                    saveList.add(entity);
                 }
             }
         }
@@ -63,15 +68,47 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
                     entity.setDeviceId(id);
                     entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
                     entity.setOnlineChangeTime(new Date());
-                    entities.add(entity);
-                }else if (!ShipStatusEntity.OnlineStatus.ONLINE.equals(entity.getIsOnline())) {
+                    saveList.add(entity);
+                }else if (ShipStatusEntity.OnlineStatus.OFFLINE.equals(entity.getIsOnline())) {
                     entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
                     entity.setOnlineChangeTime(new Date());
-                    entities.add(entity);
+                    saveList.add(entity);
                 }
             }
         }
-        saveBatch(entities);
-        log.info("船舶状态写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), entities.size());
+        if(!saveList.isEmpty()){
+            saveBatch(saveList);
+            log.info("船舶状态写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
+        }
+    }
+
+    @Override
+    public void anchorCacheToMySql() {
+        Date begin = new Date();
+        List<ShipStatusEntity> saveList = new ArrayList<>();
+        if(anchorMap.size() > 0){
+            Map<String,String> map = new HashMap<>(anchorMap);
+            anchorMap.clear();
+            for (String id : anchorMap.keySet()) {
+                String start = map.get(id);
+                ShipStatusEntity entity = baseMapper.selectById(id);
+                ShipStatusEntity.AnchorStatus status = ShipStatusEntity.AnchorStatus.getStatus(start);
+                if (entity == null) {
+                    entity = new ShipStatusEntity();
+                    entity.setDeviceId(id);
+                    entity.setIsAnchor(status);
+                    entity.setAnchorChangeTime(new Date());
+                    saveList.add(entity);
+                }else if (!status.equals(entity.getIsAnchor())) {
+                    entity.setIsAnchor(status);
+                    entity.setAnchorChangeTime(new Date());
+                    saveList.add(entity);
+                }
+            }
+        }
+        if(!saveList.isEmpty()){
+            saveBatch(saveList);
+            log.info("抛锚状态写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
+        }
     }
 }

+ 3 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/WarningRecordServiceImpl.java

@@ -47,6 +47,9 @@ public class WarningRecordServiceImpl extends ServiceImpl<WarningRecordMapper, W
                             shipStatusService.pushOnLineDeviceId(entity.getDeviceId());
                         }
                     }
+                    if (WarningCodeConstants.ANCHOR_WARNING.equals(entity.getModelCode())) {
+                        shipStatusService.pushAnchorDeviceId(entity.getDeviceId(),entity.getStart());
+                    }
                 }
             }
         } catch (Exception e) {

+ 1 - 1
beidou-track-geomesa/src/main/resources/application-prod.yml

@@ -103,7 +103,7 @@ taiji:
       beidou:
         topic: taiji_ax_beidou_dynamic_ship
     consumer:
-      bootstrap-servers: 172.28.19.20:29092 #废弃
+      bootstrap-servers: 172.28.19.20:29092
       beidou:
         enable: true
         expiry: 70