Quellcode durchsuchen

Merge branch 'master' of http://120.25.74.229:8003/ax-data-recieve/ax-beidou

minghao-chen vor 2 Jahren
Ursprung
Commit
b669eea49c
29 geänderte Dateien mit 1026 neuen und 26 gelöschten Zeilen
  1. 2 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/AbstractRedisDataStoreConfig.java
  2. 27 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/BeidouShipRedisDataStoreConfig.java
  3. 4 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/constants/WarningCodeConstants.java
  4. 1 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouShipTrackConsumer.java
  5. 20 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/AnchorTrackQueryDTO.java
  6. 198 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/BeidouShipLocationDTO.java
  7. 14 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouLocationEntity.java
  8. 12 12
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouTrackEntity.java
  9. 18 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/ShipStatusEntity.java
  10. 0 2
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/WarningRecordEntity.java
  11. 65 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/event/AnchorTrackChannelAddEvent.java
  12. 29 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/event/AnchorTrackChannelRemoveEvent.java
  13. 167 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/handler/AnchorTrackHandler.java
  14. 1 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/handler/FocusTrackHandler.java
  15. 1 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/handler/HistoryTrackHandler.java
  16. 97 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/AnchorTrackChannelListener.java
  17. 1 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/HistoryTrackChannelListener.java
  18. 9 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/BeidouLocationMapper.java
  19. 25 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/schedule/LocationSchedule.java
  20. 6 3
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/schedule/PersistenceSchedule.java
  21. 153 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/server/anchorTrackServer.java
  22. 2 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouLocationService.java
  23. 1 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouTrackService.java
  24. 4 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IShipStatusService.java
  25. 96 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouLocationServiceImpl.java
  26. 0 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouTrackServiceImpl.java
  27. 62 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/ShipStatusServiceImpl.java
  28. 7 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/WarningRecordServiceImpl.java
  29. 4 1
      beidou-track-geomesa/src/main/resources/application-prod.yml

+ 2 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/AbstractRedisDataStoreConfig.java

@@ -41,4 +41,6 @@ public abstract class AbstractRedisDataStoreConfig {
     }
 
     abstract SimpleFeatureType sft(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaTrackDTO) throws IOException;
+
+    abstract SimpleFeatureType sftLocation(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaLocationDTO) throws IOException;
 }

+ 27 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/BeidouShipRedisDataStoreConfig.java

@@ -1,5 +1,6 @@
 package cn.com.taiji.track.config;
 
+import cn.com.taiji.track.dto.BeidouShipLocationDTO;
 import cn.com.taiji.track.dto.BeidouShipTrackDTO;
 import cn.com.taiji.track.dto.IGeomesaTrackDTO;
 import cn.hutool.core.util.StrUtil;
@@ -46,6 +47,24 @@ public class BeidouShipRedisDataStoreConfig extends AbstractRedisDataStoreConfig
         return sft;
     }
 
+    @Override
+    @Bean(name = "sftLocation")
+    public SimpleFeatureType sftLocation(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaLocationDTO) throws IOException {
+        SimpleFeatureType sft = SimpleFeatureTypes.createType(geomesaLocationDTO.getTypeName(),  geomesaLocationDTO.getSimpleFeatureType().toString());
+        sft.getUserData().put("geomesa.feature.expiry", StrUtil.format("syncTime({} seconds)", expiry));
+
+        try {
+            redisDataStore.getFeatureSource(geomesaLocationDTO.getTypeName()).removeFeatures(Filter.INCLUDE);
+            redisDataStore.removeSchema(geomesaLocationDTO.getTypeName());
+        } catch (Exception e) {
+            log.error("可能属于首次构建,SCHEMA 并不存在");
+            e.printStackTrace();
+        }
+
+        redisDataStore.createSchema(sft);
+        return sft;
+    }
+
     @Bean(name = "geomesaTrackDTO")
     public BeidouShipTrackDTO geomesaTrackDTO() {
         BeidouShipTrackDTO dto = new BeidouShipTrackDTO();
@@ -53,4 +72,12 @@ public class BeidouShipRedisDataStoreConfig extends AbstractRedisDataStoreConfig
         dto.setExpirySeconds(expiry);
         return dto;
     }
+
+    @Bean(name = "geomesaLocationDTO")
+    public BeidouShipLocationDTO geomesaLocationDTO() {
+        BeidouShipLocationDTO dto = new BeidouShipLocationDTO();
+        dto.setKafkaConsumerGroup(group);
+        dto.setExpirySeconds(expiry);
+        return dto;
+    }
 }

+ 4 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/constants/WarningCodeConstants.java

@@ -13,5 +13,9 @@ public class WarningCodeConstants {
 
     public static final String ANCHOR_WARNING = "TJ_CB_01_03";
 
+    public static final String INPORT_WARNING = "TJ_CB_05_02";
+
+    public static final String OUTPROT_WARNING = "TJ_CB_05_09";
+
     public static final String FUSION_SHIP = "taiji_ax_ship_dynamic_fusion";
 }

+ 1 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouShipTrackConsumer.java

@@ -32,7 +32,7 @@ public class BeidouShipTrackConsumer {
             topics = {"${taiji.kafka.consumer.beidou.topic}"}
     )
     public void dynamicTrack(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
-        beidouTrackService.beidouDynamicShipToRedis(records);
+//        beidouTrackService.beidouDynamicShipToRedis(records);
         beidouTrackService.beidouDynamicShipToCache(records);
         beidouTrackService.beidouDynamicShipToKafka(records);
         beidouLocationService.beidouDynamicShipToCache(records);

+ 20 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/AnchorTrackQueryDTO.java

@@ -0,0 +1,20 @@
+package cn.com.taiji.track.dto;
+
+import com.alibaba.fastjson.JSON;
+import lombok.Data;
+
+
+/**
+ * @author kok20
+ */
+@Data
+public class AnchorTrackQueryDTO {
+    private String deviceId;
+    private String startTime;
+    private String endTime;
+
+    @Override
+    public String toString() {
+        return JSON.toJSONString(this);
+    }
+}

+ 198 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/BeidouShipLocationDTO.java

@@ -0,0 +1,198 @@
+package cn.com.taiji.track.dto;
+
+import cn.com.taiji.track.entity.ShipStatusEntity;
+import cn.hutool.core.util.StrUtil;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.util.factory.Hints;
+import org.opengis.feature.simple.SimpleFeature;
+
+import java.io.Serializable;
+import java.util.Date;
+
+
+/**
+ * @author kok20
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class BeidouShipLocationDTO extends IGeomesaTrackDTO implements Serializable {
+
+    private static final long serialVersionUID = -8983779632713666636L;
+
+    private String deviceId;
+    private String location;
+    private String shipType;
+    private String workType;
+    private String workWay;
+    private String sendTime;
+    private String locationTime;
+    private String online;
+    private String shipLength;
+    private String shipWidth;
+    private String texture;
+    private String longitude;
+    private String latitude;
+    private String direction;
+    private String speed;
+    private String kwh;
+    private String shipName;
+    private String isOnline;
+    private String isInport;
+    private String isAnchor;
+    private String deviceStatus;
+
+    @Override
+    public boolean checkPoint() {
+        if (location != null) {
+            longitude = location.substring(location.indexOf("(")+1,location.indexOf(" ",location.indexOf("(")));
+            latitude = location.substring(location.indexOf(" ",location.indexOf("("))+1,location.indexOf(")"));
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String getTypeName() { return "beidou-data"; }
+
+    @Override
+    public String getLayerType() {
+        return "geo_beidou_ship";
+    }
+
+    @Override
+    public String getWsLayerName() {
+        return "beidou-ship";
+    }
+
+    @Override
+    public String getFid() {
+        return deviceId;
+    }
+
+    @Override
+    public StringBuilder getSimpleFeatureType(){
+            StringBuilder attributes = new StringBuilder();
+            attributes.append("deviceId:String:index=true,");
+            attributes.append("*location:Point:srid=4326,");
+            attributes.append("shipType:String,");
+            attributes.append("workType:String,");
+            attributes.append("workWay:String,");
+            attributes.append("sendTime:String,");
+            attributes.append("locationTime:String,");
+            attributes.append("online:String,");
+            attributes.append("shipLength:String,");
+            attributes.append("shipWidth:String,");
+            attributes.append("texture:String,");
+            attributes.append("longitude:String,");
+            attributes.append("latitude:String,");
+            attributes.append("direction:String,");
+            attributes.append("speed:String,");
+            attributes.append("kwh:String,");
+            attributes.append("shipName:String,");
+            attributes.append("isOnline:String,");
+            attributes.append("isInport:String,");
+            attributes.append("isAnchor:String,");
+            attributes.append("deviceStatus:String,");
+
+            attributes.append("layerType:String,");
+            attributes.append("syncTime:Date");
+            return attributes;
+    }
+
+    @Override
+    public SimpleFeature toSimpleFeature(SimpleFeatureBuilder builder, String fid) {
+        builder.set("deviceId", deviceId);
+        builder.set("location", location);
+        builder.set("shipType", shipType);
+        builder.set("workType", workType);
+        builder.set("workWay", workWay);
+        builder.set("sendTime", sendTime);
+        builder.set("locationTime", locationTime);
+        builder.set("online", online);
+        builder.set("shipLength", shipLength);
+        builder.set("shipWidth", shipWidth);
+        builder.set("texture", texture);
+        builder.set("longitude", longitude);
+        builder.set("latitude", latitude);
+        builder.set("direction", direction);
+        builder.set("speed", speed);
+        builder.set("kwh", kwh);
+        builder.set("shipName", shipName);
+        builder.set("isOnline", isOnline);
+        builder.set("isInport", isInport);
+        builder.set("isAnchor", isAnchor);
+        if(deviceStatus!=null){
+            builder.set("deviceStatus", deviceStatus);
+        }else{
+            builder.set("deviceStatus", 0);
+        }
+
+        builder.set("layerType", getLayerType());
+        builder.set("syncTime", new Date());
+        builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
+        return builder.buildFeature(fid);
+    }
+
+    @Override
+    public String[] getPropList(){
+        return new String[] {
+//                "deviceId",
+                "location",
+                "shipType",
+                "workType",
+                "workWay",
+                "sendTime",
+                "locationTime",
+                "online",
+                "shipLength",
+                "shipWidth",
+                "texture",
+                "longitude",
+                "latitude",
+                "direction",
+                "speed",
+                "kwh",
+                "shipName",
+                "isOnline",
+                "isInport",
+                "isAnchor",
+                "deviceStatus",
+                "layerType",
+                "syncTime"
+        };
+    }
+
+    @Override
+    public Object[] getValueList() {
+        return new Object[] {
+                deviceId,
+                location,
+                shipType,
+                workType,
+                workWay,
+                sendTime,
+                locationTime,
+                online,
+                shipLength,
+                shipWidth,
+                texture,
+                longitude,
+                latitude,
+                direction,
+                speed,
+                kwh,
+                shipName,
+                isOnline,
+                isInport,
+                isAnchor,
+                deviceStatus,
+                getLayerType(),
+                new Date()
+        };
+    }
+}

+ 14 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouLocationEntity.java

@@ -1,11 +1,15 @@
 package cn.com.taiji.track.entity;
 
+import com.alibaba.fastjson.JSON;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.util.Date;
+
 /**
  * @author wudskq
  */
@@ -18,9 +22,19 @@ public class BeidouLocationEntity {
     @TableId
     private String deviceId;
     private String location;
+    private String shipType;
+    private String workType;
+    private String workWay;
+    private String sendTime;
     private String locationTime;
+    private String online;
+    private String shipLength;
+    private String shipWidth;
+    private String texture;
     private String longitude;
     private String latitude;
+    private String direction;
+    private String speed;
     private String kwh;
     private String shipName;
 }

+ 12 - 12
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouTrackEntity.java

@@ -18,19 +18,19 @@ public class BeidouTrackEntity extends BaseEntity {
 
     private Long trackId;
     private String deviceId;
-    private Long shipType;
-    private Long workType;
-    private Integer workWay;
+    private String shipType;
+    private String workType;
+    private String workWay;
     private String sendTime;
     private String locationTime;
-    private Integer online;
-    private Integer shipLength;
-    private Integer shipWidth;
-    private Integer texture;
-    private Double longitude;
-    private Double latitude;
-    private Double direction;
-    private Double speed;
-    private Integer kwh;
+    private String online;
+    private String shipLength;
+    private String shipWidth;
+    private String texture;
+    private String longitude;
+    private String latitude;
+    private String direction;
+    private String speed;
+    private String kwh;
     private String shipName;
 }

+ 18 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/ShipStatusEntity.java

@@ -35,7 +35,7 @@ public class ShipStatusEntity{
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
     private Date onlineChangeTime;
     /**  0否 1是 */
-    private String isInport;
+    private InportStatus isInport;
     /** 在港状态变更时间 */
     @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
     private Date inportChangeTime;
@@ -63,6 +63,23 @@ public class ShipStatusEntity{
             this.text = text;
         }
     }
+    public enum InportStatus {
+        FALSE("0","否"),
+        TRUE("1","是");
+        @EnumValue
+        private final String value;
+        @JsonValue    //需要在前端展示哪个值就在哪个属性上加上该注解
+        private String text;
+
+        public String getValue() {
+            return value;
+        }
+
+        private InportStatus(String value,String text) {
+            this.value = value;
+            this.text = text;
+        }
+    }
     public enum AnchorStatus {
         FALSE("0","否"),
         TRUE("1","是");

+ 0 - 2
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/WarningRecordEntity.java

@@ -36,8 +36,6 @@ public class WarningRecordEntity extends BaseEntity{
     private String areaId;
     /** 起始预警信号 */
     private String start;
-    /** 处置状态 */
-    private String status;
     /** 轨迹信息 */
     private String dynamicShip;
     /** 预警时间 */

+ 65 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/event/AnchorTrackChannelAddEvent.java

@@ -0,0 +1,65 @@
+package cn.com.taiji.track.event;
+
+import cn.com.taiji.track.dto.AnchorTrackQueryDTO;
+import cn.com.taiji.track.dto.HistoryTrackQueryDTO;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * @author chenfangchao
+ * @title: NettyEvent
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+public class AnchorTrackChannelAddEvent extends ApplicationEvent {
+
+    private ChannelHandlerContext ctx;
+
+    private ChannelId channelId;
+
+    private AnchorTrackQueryDTO dto;
+
+    public AnchorTrackChannelAddEvent(Object source, ChannelHandlerContext ctx, ChannelId channelId, AnchorTrackQueryDTO dto) {
+        super(source);
+        this.ctx = ctx;
+        this.channelId = channelId;
+        if(null != dto){
+            this.dto = dto;
+        }
+    }
+
+    public ChannelHandlerContext getCtx() {
+        return ctx;
+    }
+
+    public void setCtx(ChannelHandlerContext ctx) {
+        this.ctx = ctx;
+    }
+
+    public ChannelId getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(ChannelId channelId) {
+        this.channelId = channelId;
+    }
+
+    public AnchorTrackQueryDTO getDto() {
+        return dto;
+    }
+
+    public void setDto(AnchorTrackQueryDTO dto) {
+        this.dto = dto;
+    }
+
+    @Override
+    public String toString() {
+        return "ChannelAddEvent{" +
+                "ctx=" + ctx +
+                ", channelId=" + channelId +
+                ", dto='" + dto.toString() + '\'' +
+                '}';
+    }
+}

+ 29 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/event/AnchorTrackChannelRemoveEvent.java

@@ -0,0 +1,29 @@
+package cn.com.taiji.track.event;
+
+import io.netty.channel.ChannelId;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * @author chenfangchao
+ * @title: ChannelRemoveEvent
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+public class AnchorTrackChannelRemoveEvent extends ApplicationEvent {
+
+    private ChannelId channelId;
+
+    public AnchorTrackChannelRemoveEvent(Object source, ChannelId channelId) {
+        super(source);
+        this.channelId = channelId;
+    }
+
+    public ChannelId getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(ChannelId channelId) {
+        this.channelId = channelId;
+    }
+}

+ 167 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/handler/AnchorTrackHandler.java

@@ -0,0 +1,167 @@
+package cn.com.taiji.track.handler;
+
+import cn.com.taiji.track.config.NettyConfig;
+import cn.com.taiji.track.dto.AnchorTrackQueryDTO;
+import cn.com.taiji.track.dto.HistoryTrackQueryDTO;
+import cn.com.taiji.track.event.AnchorTrackChannelAddEvent;
+import cn.com.taiji.track.event.AnchorTrackChannelRemoveEvent;
+import cn.com.taiji.track.event.HistoryTrackChannelAddEvent;
+import cn.com.taiji.track.event.HistoryTrackChannelRemoveEvent;
+import com.alibaba.fastjson.JSONObject;
+import io.netty.channel.*;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.stereotype.Component;
+
+import java.util.Objects;
+
+
+/**
+ * @author wudskq
+ */
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class AnchorTrackHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
+
+    Logger logger = LoggerFactory.getLogger(AnchorTrackHandler.class);
+
+    @Autowired
+    private ApplicationEventPublisher applicationEventPublisher;
+
+    /**
+     * 一旦连接,第一个被执行
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        log.info("handlerAdded 被调用" + ctx.channel().id().asLongText());
+        // 添加到channelGroup 通道组
+        NettyConfig.getChannelGroup().add(ctx.channel());
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        NettyConfig.getChannelGroup().add(ctx.channel());
+        log.info("与客户端建立连接,通道开启!信道ID为 " + ctx.channel().id());
+    }
+    //读就绪事件
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
+        AnchorTrackQueryDTO dto = JSONObject.parseObject(msg.text(), AnchorTrackQueryDTO.class);
+        log.info("本次会话(信道)查询条件为:" + msg.text());
+        if (!NettyConfig.getUserChannelMap().containsKey(dto)) {
+            NettyConfig.getUserChannelMap().put(ctx.channel().id().asShortText(),dto.toString());
+            // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
+            AttributeKey<String> key = AttributeKey.valueOf("anchor-track");
+            ctx.channel().attr(key).setIfAbsent(dto.toString());
+            applicationEventPublisher.publishEvent(new AnchorTrackChannelAddEvent(this,ctx,ctx.channel().id(), dto));
+            log.info("发送启动新线程event成功!");
+        } else {
+            // 前端定时请求,保持心跳连接,避免服务端误删通道
+            ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
+        }
+        // 回复消息
+//        ctx.channel().writeAndFlush(new TextWebSocketFrame("此次会话CQL-ID为 "+ ctx.channel().id().asShortText() + " 查询的条件为:" + dto.toString() + "与服务器成功通信!"));
+    }
+
+    /**
+     * 心跳检测相关方法 - 会主动调用handlerRemoved
+     *
+     * @param ctx
+     * @param evt
+     * @throws Exception
+     */
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            IdleStateEvent event = (IdleStateEvent) evt;
+            if (event.state() == IdleState.ALL_IDLE) {
+                //清除超时会话
+                ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
+                writeAndFlush.addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws Exception {
+                        ctx.channel().close();
+                    }
+                });
+            }
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+
+    /**
+     * 移除通道及关联用户
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+        log.info("handlerRemoved 被调用" + ctx.channel().id().asLongText());
+        // 删除通道
+        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
+        if (Objects.nonNull(channelGroup)) {
+            channelGroup.remove(ctx.channel());
+            NettyConfig.getUserChannelMap().remove(ctx.channel().id().asShortText());
+            applicationEventPublisher.publishEvent(new AnchorTrackChannelRemoveEvent(this, ctx.channel().id()));
+        }
+        removeUserId(ctx);
+    }
+
+    /**
+     * 删除用户与channel的对应关系
+     *
+     * @param ctx
+     */
+    private void removeUserId(ChannelHandlerContext ctx) {
+        AttributeKey<String> key = AttributeKey.valueOf("userId");
+        String userId = ctx.channel().attr(key).get();
+        if (StringUtils.isNotBlank(userId)) {
+            NettyConfig.getUserChannelMap().remove(userId);
+            log.info("删除用户与channel的对应关系,userId:{}", userId);
+        }
+    }
+    /**
+     * 异常处理
+     *
+     * @param ctx
+     * @param cause
+     * @throws Exception
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.info("异常:{}", cause.getMessage());
+        // 删除通道
+        NettyConfig.getChannelGroup().remove(ctx.channel());
+        removeUserId(ctx);
+        ctx.close();
+    }
+
+    /**
+     * 推送消息给指定信道
+     *
+     * @param channelId
+     * @param msg
+     */
+    public void sendMsgToSpecifyChannel(ChannelId channelId, String msg) {
+//        log.info("推送数据--{},到指定信道{}", msg, channelId);
+        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
+        Channel channel = channelGroup.find(channelId);
+        if (channel != null) {
+            channel.writeAndFlush(new TextWebSocketFrame(msg));
+        }
+    }
+}

+ 1 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/handler/FocusTrackHandler.java

@@ -154,7 +154,7 @@ public class FocusTrackHandler extends SimpleChannelInboundHandler<TextWebSocket
      * @param msg
      */
     public void sendMsgToSpecifyChannel(ChannelId channelId, String msg) {
-        log.info("推送数据--{},到指定信道{}", msg, channelId);
+//        log.info("推送数据--{},到指定信道{}", msg, channelId);
         ChannelGroup channelGroup = NettyConfig.getChannelGroup();
         Channel channel = channelGroup.find(channelId);
         if (channel != null) {

+ 1 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/handler/HistoryTrackHandler.java

@@ -154,7 +154,7 @@ public class HistoryTrackHandler extends SimpleChannelInboundHandler<TextWebSock
      * @param msg
      */
     public void sendMsgToSpecifyChannel(ChannelId channelId, String msg) {
-        log.info("推送数据--{},到指定信道{}", msg, channelId);
+//        log.info("推送数据--{},到指定信道{}", msg, channelId);
         ChannelGroup channelGroup = NettyConfig.getChannelGroup();
         Channel channel = channelGroup.find(channelId);
         if (channel != null) {

+ 97 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/AnchorTrackChannelListener.java

@@ -0,0 +1,97 @@
+package cn.com.taiji.track.listener;
+
+import cn.com.taiji.track.constants.WarningCodeConstants;
+import cn.com.taiji.track.dto.AnchorTrackQueryDTO;
+import cn.com.taiji.track.dto.HistoryTrackQueryDTO;
+import cn.com.taiji.track.entity.BeidouTrackEntity;
+import cn.com.taiji.track.entity.WarningRecordEntity;
+import cn.com.taiji.track.event.AnchorTrackChannelAddEvent;
+import cn.com.taiji.track.event.HistoryTrackChannelAddEvent;
+import cn.com.taiji.track.handler.AnchorTrackHandler;
+import cn.com.taiji.track.handler.HistoryTrackHandler;
+import cn.com.taiji.track.service.IBeidouTrackService;
+import cn.com.taiji.track.service.IWarningRecordService;
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author chenfangchao
+ * @title: BeidouChannelListener
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+@Component
+@Slf4j
+public class AnchorTrackChannelListener {
+    @Autowired
+    private AnchorTrackHandler historyTrackHandler;
+    @Autowired
+    private IWarningRecordService warningRecordService;
+
+    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-channel-anchor-track").build();
+
+    ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(2,2,5,
+            TimeUnit.MINUTES,new ArrayBlockingQueue<>(10),threadFactory,new ThreadPoolExecutor.AbortPolicy());
+    /**
+     * 监听新增用户信道
+     *
+     * @param event
+     */
+    @EventListener
+    public void listenerNettyAddChannel(AnchorTrackChannelAddEvent event) {
+        log.info("当前需要创建的线程名称为 {}", event.getChannelId());
+        threadPoolExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+                queryAnchorTrack(event);
+            }
+        });
+    }
+    /**
+     * 查询
+     */
+    private void queryAnchorTrack(AnchorTrackChannelAddEvent event)  {
+        List<BeidouTrackEntity> results = new ArrayList<>();
+        List<WarningRecordEntity> warningRecords = new ArrayList<>();
+        AttributeKey<String> key = AttributeKey.valueOf("anchor-track");
+        String dtoStr = event.getCtx().attr(key).get();
+        AnchorTrackQueryDTO dto = JSONObject.parseObject(dtoStr, AnchorTrackQueryDTO.class);
+        if (null != dto) {
+            QueryWrapper<WarningRecordEntity> wrapper = new QueryWrapper();
+            wrapper.eq("model_code", WarningCodeConstants.ANCHOR_WARNING);
+            wrapper.eq("start", "true");
+            if(StringUtils.hasText(dto.getDeviceId())){
+                wrapper.eq("device_id",dto.getDeviceId());
+            }
+            if(StringUtils.hasText(dto.getStartTime())&&StringUtils.hasText(dto.getEndTime())){
+                wrapper.between("warning_time",dto.getStartTime(),dto.getEndTime());
+            }
+            wrapper.orderByDesc("warning_time");
+            warningRecords = warningRecordService.list(wrapper);
+            if (warningRecords.size() > 0) {
+                for (WarningRecordEntity e : warningRecords)  {
+                    BeidouTrackEntity result = JSONObject.parseObject(e.getDynamicShip(),BeidouTrackEntity.class);
+                    results.add(result);
+                }
+            }
+        }
+        historyTrackHandler.sendMsgToSpecifyChannel(event.getChannelId(), JSONUtil.toJsonStr(results));
+    }
+
+}

+ 1 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/HistoryTrackChannelListener.java

@@ -73,6 +73,7 @@ public class HistoryTrackChannelListener {
             if(StringUtils.hasText(dto.getStartTime())&&StringUtils.hasText(dto.getEndTime())){
                 wrapper.between("location_time",dto.getStartTime(),dto.getEndTime());
             }
+            wrapper.orderByAsc("location_time");
             results = beidouTrackService.list(wrapper);
         }
         historyTrackHandler.sendMsgToSpecifyChannel(event.getChannelId(), JSONUtil.toJsonStr(results));

+ 9 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/BeidouLocationMapper.java

@@ -1,8 +1,12 @@
 package cn.com.taiji.track.mapper;
 
+import cn.com.taiji.track.dto.BeidouShipLocationDTO;
 import cn.com.taiji.track.entity.BeidouLocationEntity;
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Select;
+
+import java.util.List;
 
 /**
  * @author chenfangchao
@@ -13,6 +17,10 @@ import org.apache.ibatis.annotations.Mapper;
  */
 @Mapper
 public interface BeidouLocationMapper extends BaseMapper<BeidouLocationEntity> {
-
+    @Select("SELECT l.*,s.is_online,s.is_inport,s.is_anchor,d.dispose_type " +
+            "FROM ax_beidou_ship_location l\n" +
+            "LEFT JOIN ax_beidou_ship_status s on s.device_id = l.device_id\n" +
+            "LEFT JOIN ax_beidou_ship_dispose d on d.devide_no = l.device_id")
+    List<BeidouShipLocationDTO> listLocation();
 }
 

+ 25 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/schedule/LocationSchedule.java

@@ -0,0 +1,25 @@
+package cn.com.taiji.track.schedule;
+
+import cn.com.taiji.track.service.IBeidouLocationService;
+import cn.com.taiji.track.service.IBeidouTrackService;
+import cn.com.taiji.track.service.IShipStatusService;
+import cn.com.taiji.track.service.IWarningRecordService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author kok20
+ * 适应starrocks保存逻辑,通过定时器降低保存时间间隔
+ */
+@Component
+public class LocationSchedule {
+    @Autowired
+    private IBeidouLocationService beidouLocationService;
+
+    @Scheduled(cron = "*/3 * * * * ?")
+    private void listLocationToCache() {
+        beidouLocationService.listLocationToCache();
+    }
+}
+

+ 6 - 3
beidou-track-geomesa/src/main/java/cn/com/taiji/track/schedule/PersistenceSchedule.java

@@ -36,8 +36,11 @@ public class PersistenceSchedule {
         beidouLocationService.cacheBeidouToMySql();
     }
     @Scheduled(cron = "*/10 * * * * ?")
-    private void cacheStatusToMySql() {
-        shipStatusService.cacheToMySql();
-    }
+    private void cacheStatusToMySql() { shipStatusService.cacheToMySql();}
+    @Scheduled(cron = "*/10 * * * * ?")
+    private void anchorCacheToMySql() { shipStatusService.anchorCacheToMySql();}
+    @Scheduled(cron = "*/10 * * * * ?")
+    private void InportCacheToMySql() {shipStatusService.InportCacheToMySql();}
+
 }
 

+ 153 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/server/anchorTrackServer.java

@@ -0,0 +1,153 @@
+package cn.com.taiji.track.server;
+
+import cn.com.taiji.track.handler.AnchorTrackHandler;
+import cn.com.taiji.track.handler.HistoryTrackHandler;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author wudskq
+ */
+@Slf4j
+@Component
+public class anchorTrackServer {
+
+    /**
+     * webSocket协议名
+     */
+    private static final String WEBSOCKET_PROTOCOL = "WebSocket";
+
+    /**
+     * 端口号
+     */
+    @Value("${webSocket.anchor-track.port}")
+    private int port;
+
+    /**
+     * webSocket路径
+     */
+    @Value("${webSocket.anchor-track.path}")
+    private String webSocketPath;
+
+    /**
+     * 在Netty心跳检测中配置 - 读空闲超时时间设置
+     */
+    @Value("${webSocket.readerIdleTime}")
+    private long readerIdleTime;
+
+    /**
+     * 在Netty心跳检测中配置 - 写空闲超时时间设置
+     */
+    @Value("${webSocket.writerIdleTime}")
+    private long writerIdleTime;
+
+    /**
+     * 在Netty心跳检测中配置 - 读写空闲超时时间设置
+     */
+    @Value("${webSocket.allIdleTime}")
+    private long allIdleTime;
+
+    @Autowired
+    private AnchorTrackHandler anchorTrackHandler;
+
+    private EventLoopGroup bossGroup;
+
+    private EventLoopGroup workGroup;
+
+
+    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-netty-anchortrack").build();
+
+    /**
+     * 启动
+     * @throws InterruptedException
+     */
+    private void start() throws InterruptedException {
+        bossGroup = new NioEventLoopGroup();
+        workGroup = new NioEventLoopGroup();
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.group(bossGroup,workGroup);
+        bootstrap.channel(NioServerSocketChannel.class);
+        // 设置监听端口
+        bootstrap.localAddress(new InetSocketAddress(port));
+        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+
+            @Override
+            protected void initChannel(SocketChannel ch) throws Exception {
+                ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
+                // webSocket协议本身是基于http协议的,使用http编解码器
+                ch.pipeline().addLast(new HttpServerCodec());
+                ch.pipeline().addLast(new ObjectEncoder());
+                ch.pipeline().addLast(new ChunkedWriteHandler());
+                ch.pipeline().addLast(new HttpObjectAggregator(8192));
+                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
+                // 自定义的handler,处理业务逻辑
+                ch.pipeline().addLast(anchorTrackHandler);
+            }
+        });
+        ChannelFuture channelFuture = bootstrap.bind().sync();
+        log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    /**
+     * 释放资源
+     * @throws InterruptedException
+     */
+    @PreDestroy
+    public void destroy() throws InterruptedException {
+        if(bossGroup != null){
+            bossGroup.shutdownGracefully().sync();
+        }
+        if(workGroup != null){
+            workGroup.shutdownGracefully().sync();
+        }
+    }
+
+    /**
+     * 使用线程池初始化netty-websocket服务
+     */
+    ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(2,2,5,
+            TimeUnit.MINUTES,new ArrayBlockingQueue<>(10),threadFactory,new ThreadPoolExecutor.AbortPolicy());
+
+    /**
+     * 初始化(新线程开启)
+     */
+    @PostConstruct()
+    public void init() {
+        threadPoolExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    start();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+}

+ 2 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouLocationService.java

@@ -16,4 +16,6 @@ public interface IBeidouLocationService extends IService<BeidouLocationEntity> {
     void beidouDynamicShipToCache(List<ConsumerRecord<?, ?>> records);
 
     void cacheBeidouToMySql();
+
+    void listLocationToCache();
 }

+ 1 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouTrackService.java

@@ -1,5 +1,6 @@
 package cn.com.taiji.track.service;
 
+import cn.com.taiji.track.entity.BeidouLocationEntity;
 import cn.com.taiji.track.entity.BeidouTrackEntity;
 import com.baomidou.mybatisplus.extension.service.IService;
 import org.apache.kafka.clients.consumer.ConsumerRecord;

+ 4 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IShipStatusService.java

@@ -19,4 +19,8 @@ public interface IShipStatusService extends IService<ShipStatusEntity> {
     void anchorCacheToMySql();
 
     void pushAnchorDeviceId(String deviceId, String start);
+
+    void pushinportDeviceId(String deviceId, String inportWarning);
+
+    void InportCacheToMySql();
 }

+ 96 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouLocationServiceImpl.java

@@ -1,13 +1,31 @@
 package cn.com.taiji.track.service.impl;
 
+import cn.com.taiji.track.dto.BeidouShipLocationDTO;
+import cn.com.taiji.track.dto.IGeomesaTrackDTO;
 import cn.com.taiji.track.entity.BeidouLocationEntity;
 import cn.com.taiji.track.mapper.BeidouLocationMapper;
 import cn.com.taiji.track.service.IBeidouLocationService;
+import cn.com.taiji.track.service.IShipStatusService;
 import cn.com.taiji.track.utils.LatLngUtil;
+import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.geotools.data.FeatureReader;
+import org.geotools.data.FeatureWriter;
+import org.geotools.data.Query;
+import org.geotools.data.Transaction;
+import org.geotools.factory.CommonFactoryFinder;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.filter.identity.FeatureIdImpl;
+import org.geotools.util.factory.Hints;
+import org.locationtech.geomesa.redis.data.RedisDataStore;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.opengis.filter.Filter;
+import org.opengis.filter.FilterFactory2;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.math.BigDecimal;
@@ -21,8 +39,18 @@ import java.util.*;
 @Service
 @Slf4j
 public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper, BeidouLocationEntity> implements IBeidouLocationService {
+    @Autowired
+    protected IShipStatusService shipStatusService;
+
+    private Map<String,BeidouShipLocationDTO> locationMaps= new HashMap<>();
     private Vector<BeidouLocationEntity> locationEntities= new Vector<>();
 
+    @Autowired
+    private SimpleFeatureType sftLocation;
+    @Autowired
+    private RedisDataStore redisDataStore;
+    @Autowired
+    private IGeomesaTrackDTO geomesaLocationDTO;
     @Override
     public void beidouDynamicShipToCache(List<ConsumerRecord<?, ?>> records){
         try {
@@ -31,12 +59,14 @@ public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper,
                 if (message.isPresent()) {
                     Object msg = message.get();
                     BeidouLocationEntity locationEntity = JSONObject.parseObject(msg.toString(), BeidouLocationEntity.class);
-                    locationEntity.setLocation(String.format("POINT(%s %s)",locationEntity.getLongitude(),locationEntity.getLatitude()));
+                    String location = StrUtil.format("POINT ({} {})", locationEntity.getLongitude(),locationEntity.getLatitude());
+                    locationEntity.setLocation(location);
                     BigDecimal longitudeDecimal = new BigDecimal(locationEntity.getLongitude());
                     locationEntity.setLongitude(LatLngUtil.latLng2Dfm(longitudeDecimal.doubleValue()));
                     BigDecimal latitudeDecimal = new BigDecimal(locationEntity.getLatitude());
                     locationEntity.setLatitude(LatLngUtil.latLng2Dfm(latitudeDecimal.doubleValue()));
                     locationEntities.add(locationEntity);
+                    shipStatusService.pushOnLineDeviceId(locationEntity.getDeviceId());
                 }
             }
         } catch (Exception e) {
@@ -54,4 +84,69 @@ public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper,
 //            log.info("位置数据写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
         }
     }
+
+    @Override
+    public void listLocationToCache() {
+        List<BeidouShipLocationDTO> results = baseMapper.listLocation();
+        if (results.size() > 0) {
+            FeatureReader<SimpleFeatureType, SimpleFeature> reader = null;
+            FeatureWriter<SimpleFeatureType, SimpleFeature> writer = null;
+            try {
+                writer = redisDataStore.getFeatureWriterAppend(sftLocation.getTypeName(), Transaction.AUTO_COMMIT);
+                SimpleFeatureBuilder builder = new SimpleFeatureBuilder(sftLocation);
+                for (IGeomesaTrackDTO dto : results) {
+                    dto.setKafkaConsumerGroup(geomesaLocationDTO.getKafkaConsumerGroup());
+                    dto.setExpirySeconds(geomesaLocationDTO.getExpirySeconds());
+                    if (!dto.checkPoint()) {
+                        continue;
+                    }
+                    String fid = dto.getFid();
+                    boolean append = true;
+                    FilterFactory2 filterFactory = CommonFactoryFinder.getFilterFactory2();
+                    Filter filter = null;
+                    if (null != fid && !"".equals(fid)) {
+                        filter = filterFactory.id(filterFactory.featureId(fid));
+                    } else {
+                        String id = UUID.randomUUID().toString();
+                        filter = filterFactory.id(filterFactory.featureId(id));
+                    }
+                    Query query = new Query(dto.getTypeName(), filter);
+                    reader = redisDataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+                    if (reader.hasNext()) {
+                        append = false;
+                        String[] propList = dto.getPropList();
+                        Object[] valueList = dto.getValueList();
+                        redisDataStore.getFeatureSource(dto.getTypeName()).modifyFeatures(propList, valueList, filter);
+                    }
+                    reader.close();
+                    reader = null;
+                    if (append) {
+                        SimpleFeature toWrite = writer.next();
+                        builder.reset();
+                        SimpleFeature feature = dto.toSimpleFeature(builder, fid);
+                        toWrite.setAttributes(feature.getAttributes());
+                        toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
+                        ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
+                        toWrite.getUserData().putAll(feature.getUserData());
+                        writer.write();
+                    }
+                }
+            } catch (Exception e) {
+                log.error("=======插入geomesa发生异常============");
+                e.printStackTrace();
+            } finally {
+                try {
+                    if (null != writer) {
+                        writer.close();
+                    }
+                    if (null != reader) {
+                        reader.close();
+                    }
+                } catch (Exception e) {
+                    log.error("=======close操作发生异常============");
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
 }

+ 0 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouTrackServiceImpl.java

@@ -131,7 +131,6 @@ public class BeidouTrackServiceImpl extends ServiceImpl<BeidouTrackMapper, Beido
         }
 //        log.info("轨迹数据写入图层完成!耗时 {} 毫秒, 从 消费组 {} 中合计消费 {} 条记录", (new Date()).getTime() - begain.getTime(), geomesaTrackDTO.getKafkaConsumerGroup(), records.size());
     }
-
     private  Vector<BeidouTrackEntity> trackEntities= new Vector<>();
 
     @Override

+ 62 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/ShipStatusServiceImpl.java

@@ -1,8 +1,10 @@
 package cn.com.taiji.track.service.impl;
 
+import cn.com.taiji.track.constants.WarningCodeConstants;
 import cn.com.taiji.track.entity.ShipStatusEntity;
 import cn.com.taiji.track.mapper.ShipStatusMapper;
 import cn.com.taiji.track.service.IShipStatusService;
+import com.alibaba.druid.util.StringUtils;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
@@ -20,6 +22,7 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
     private Vector<String> offLineDeviceIds= new Vector<>();
     private Vector<String> onLineDeviceIds= new Vector<>();
     private Map<String, String> anchorMap= new ConcurrentHashMap<>();
+    private Map<String, String> inportMap= new ConcurrentHashMap<>();
 
     @Override
     public void pushOffLineDeviceId(String deviceId){
@@ -37,6 +40,11 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
     }
 
     @Override
+    public void pushinportDeviceId(String deviceId, String inportCode) {
+        inportMap.put(deviceId, inportCode);
+    }
+
+    @Override
     public void cacheToMySql() {
         Date begin = new Date();
         List<ShipStatusEntity> saveList = new ArrayList<>();
@@ -89,13 +97,15 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
         if(anchorMap.size() > 0){
             Map<String,String> map = new HashMap<>(anchorMap);
             anchorMap.clear();
-            for (String id : anchorMap.keySet()) {
+            for (String id : map.keySet()) {
                 String start = map.get(id);
                 ShipStatusEntity entity = baseMapper.selectById(id);
                 ShipStatusEntity.AnchorStatus status = ShipStatusEntity.AnchorStatus.getStatus(start);
                 if (entity == null) {
                     entity = new ShipStatusEntity();
                     entity.setDeviceId(id);
+                    entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
+                    entity.setOnlineChangeTime(new Date());
                     entity.setIsAnchor(status);
                     entity.setAnchorChangeTime(new Date());
                     saveList.add(entity);
@@ -111,4 +121,55 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
             log.info("抛锚状态写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
         }
     }
+    @Override
+    public void InportCacheToMySql() {
+        Date begin = new Date();
+        List<ShipStatusEntity> saveList = new ArrayList<>();
+        if (inportMap.size() > 0) {
+            Map<String, String> map = new HashMap<>(inportMap);
+            inportMap.clear();
+            for (String deviceId : map.keySet()) {
+                String inportCode = map.get(deviceId);
+                ShipStatusEntity entity = baseMapper.selectById(deviceId);
+                switch (inportCode) {
+                    case WarningCodeConstants.INPORT_WARNING:
+                        if (entity == null) {
+                            entity = new ShipStatusEntity();
+                            entity.setDeviceId(deviceId);
+                            entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
+                            entity.setOnlineChangeTime(new Date());
+                            entity.setIsInport(ShipStatusEntity.InportStatus.TRUE);
+                            entity.setInportChangeTime(new Date());
+                            saveList.add(entity);
+                        } else {
+                            entity.setIsInport(ShipStatusEntity.InportStatus.TRUE);
+                            entity.setInportChangeTime(new Date());
+                            saveList.add(entity);
+                        }
+                        break;
+                    case WarningCodeConstants.OUTPROT_WARNING:
+                        if (entity == null) {
+                            entity = new ShipStatusEntity();
+                            entity.setDeviceId(deviceId);
+                            entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
+                            entity.setOnlineChangeTime(new Date());
+                            entity.setIsInport(ShipStatusEntity.InportStatus.FALSE);
+                            entity.setInportChangeTime(new Date());
+                            saveList.add(entity);
+                        } else {
+                            entity.setIsInport(ShipStatusEntity.InportStatus.FALSE);
+                            entity.setInportChangeTime(new Date());
+                            saveList.add(entity);
+                        }
+                        break;
+                    default:
+                        break;
+                }
+            }
+            if (!saveList.isEmpty()) {
+                saveBatch(saveList);
+                log.info("进出港状态写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
+            }
+        }
+    }
 }

+ 7 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/WarningRecordServiceImpl.java

@@ -6,6 +6,7 @@ import cn.com.taiji.track.entity.WarningRecordEntity;
 import cn.com.taiji.track.mapper.WarningRecordMapper;
 import cn.com.taiji.track.service.IShipStatusService;
 import cn.com.taiji.track.service.IWarningRecordService;
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
@@ -50,6 +51,12 @@ public class WarningRecordServiceImpl extends ServiceImpl<WarningRecordMapper, W
                     if (WarningCodeConstants.ANCHOR_WARNING.equals(entity.getModelCode())) {
                         shipStatusService.pushAnchorDeviceId(entity.getDeviceId(),entity.getStart());
                     }
+                    if (WarningCodeConstants.INPORT_WARNING.equals(entity.getModelCode())) {
+                        shipStatusService.pushinportDeviceId(entity.getDeviceId(),WarningCodeConstants.INPORT_WARNING);
+                    }
+                    if (WarningCodeConstants.OUTPROT_WARNING.equals(entity.getModelCode())) {
+                        shipStatusService.pushinportDeviceId(entity.getDeviceId(),WarningCodeConstants.OUTPROT_WARNING);
+                    }
                 }
             }
         } catch (Exception e) {

+ 4 - 1
beidou-track-geomesa/src/main/resources/application-prod.yml

@@ -17,6 +17,9 @@ webSocket:
   focus-track:
     port: 7002
     path: /focustrack
+  anchor-track:
+    port: 7003
+    path: /anchortrack
 spring:
   datasource:
     type: com.alibaba.druid.pool.DruidDataSource
@@ -103,7 +106,7 @@ taiji:
       beidou:
         topic: taiji_ax_beidou_dynamic_ship
     consumer:
-      bootstrap-servers: 172.28.19.20:29092
+      bootstrap-servers: 10.112.89.239:9092
       beidou:
         enable: true
         expiry: 70