소스 검색

修改实时轨迹写入geomesa逻辑

liangjianf 2 년 전
부모
커밋
ff31b17da7
18개의 변경된 파일422개의 추가작업 그리고 417개의 파일을 삭제
  1. 1 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/KafkaConfig.java
  2. 1 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/AbstractRedisDataStoreConfig.java
  3. 24 25
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/BeidouShipRedisDataStoreConfig.java
  4. 1 3
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouShipTrackConsumer.java
  5. 1 2
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/BeidouShipLocationDTO.java
  6. 178 178
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/BeidouShipTrackDTO.java
  7. 84 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouShipArchives.java
  8. 1 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouTrackEntity.java
  9. 2 2
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/ChannelListener.java
  10. 17 8
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/BeidouLocationMapper.java
  11. 19 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/BeidouShipArchivesMapper.java
  12. 69 70
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/producer/BeidouShipTestProducer.java
  13. 0 5
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouTrackService.java
  14. 18 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouLocationServiceImpl.java
  15. 0 109
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouTrackServiceImpl.java
  16. 4 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/ShipStatusServiceImpl.java
  17. 0 5
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/WarningRecordServiceImpl.java
  18. 2 6
      beidou-track-geomesa/src/main/resources/application-prod.yml

+ 1 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/KafkaConfig.java

@@ -1,4 +1,4 @@
-package cn.com.taiji.track.config;
+package cn.com.taiji.beidou.track.config;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;

+ 1 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/AbstractRedisDataStoreConfig.java

@@ -40,7 +40,7 @@ public abstract class AbstractRedisDataStoreConfig {
         return client;
     }
 
-    abstract SimpleFeatureType sft(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaTrackDTO) throws IOException;
+//    abstract SimpleFeatureType sft(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaTrackDTO) throws IOException;
 
     abstract SimpleFeatureType sftLocation(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaLocationDTO) throws IOException;
 }

+ 24 - 25
beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/BeidouShipRedisDataStoreConfig.java

@@ -1,7 +1,6 @@
 package cn.com.taiji.track.config;
 
 import cn.com.taiji.track.dto.BeidouShipLocationDTO;
-import cn.com.taiji.track.dto.BeidouShipTrackDTO;
 import cn.com.taiji.track.dto.IGeomesaTrackDTO;
 import cn.hutool.core.util.StrUtil;
 import lombok.extern.slf4j.Slf4j;
@@ -29,23 +28,23 @@ public class BeidouShipRedisDataStoreConfig extends AbstractRedisDataStoreConfig
     @Value("${taiji.kafka.consumer.beidou.expiry}")
     private Integer expiry;
 
-    @Override
-    @Bean(name = "sft")
-    public SimpleFeatureType sft(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaTrackDTO) throws IOException {
-        SimpleFeatureType sft = SimpleFeatureTypes.createType(geomesaTrackDTO.getTypeName(),  geomesaTrackDTO.getSimpleFeatureType().toString());
-        sft.getUserData().put("geomesa.feature.expiry", StrUtil.format("syncTime({} seconds)", expiry));
-
-        try {
-            redisDataStore.getFeatureSource(geomesaTrackDTO.getTypeName()).removeFeatures(Filter.INCLUDE);
-            redisDataStore.removeSchema(geomesaTrackDTO.getTypeName());
-        } catch (Exception e) {
-            log.error("可能属于首次构建,SCHEMA 并不存在");
-            e.printStackTrace();
-        }
-
-        redisDataStore.createSchema(sft);
-        return sft;
-    }
+//    @Override
+//    @Bean(name = "sft")
+//    public SimpleFeatureType sft(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaTrackDTO) throws IOException {
+//        SimpleFeatureType sft = SimpleFeatureTypes.createType(geomesaTrackDTO.getTypeName(),  geomesaTrackDTO.getSimpleFeatureType().toString());
+//        sft.getUserData().put("geomesa.feature.expiry", StrUtil.format("syncTime({} seconds)", expiry));
+//
+//        try {
+//            redisDataStore.getFeatureSource(geomesaTrackDTO.getTypeName()).removeFeatures(Filter.INCLUDE);
+//            redisDataStore.removeSchema(geomesaTrackDTO.getTypeName());
+//        } catch (Exception e) {
+//            log.error("可能属于首次构建,SCHEMA 并不存在");
+//            e.printStackTrace();
+//        }
+//
+//        redisDataStore.createSchema(sft);
+//        return sft;
+//    }
 
     @Override
     @Bean(name = "sftLocation")
@@ -65,13 +64,13 @@ public class BeidouShipRedisDataStoreConfig extends AbstractRedisDataStoreConfig
         return sft;
     }
 
-    @Bean(name = "geomesaTrackDTO")
-    public BeidouShipTrackDTO geomesaTrackDTO() {
-        BeidouShipTrackDTO dto = new BeidouShipTrackDTO();
-        dto.setKafkaConsumerGroup(group);
-        dto.setExpirySeconds(expiry);
-        return dto;
-    }
+//    @Bean(name = "geomesaTrackDTO")
+//    public BeidouShipTrackDTO geomesaTrackDTO() {
+//        BeidouShipTrackDTO dto = new BeidouShipTrackDTO();
+//        dto.setKafkaConsumerGroup(group);
+//        dto.setExpirySeconds(expiry);
+//        return dto;
+//    }
 
     @Bean(name = "geomesaLocationDTO")
     public BeidouShipLocationDTO geomesaLocationDTO() {

+ 1 - 3
beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouShipTrackConsumer.java

@@ -27,13 +27,11 @@ public class BeidouShipTrackConsumer {
     private IBeidouLocationService beidouLocationService;
 
     @KafkaListener(
-            containerFactory = "kafkaContainerFactory",
-//            groupId = "${taiji.kafka.consumer.beidou.group}", //测试时切换配置
+            groupId = "${taiji.kafka.consumer.beidou.group}",
             topics = {"${taiji.kafka.consumer.beidou.topic}"}
     )
     public void dynamicTrack(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
         beidouTrackService.beidouDynamicShipToCache(records);
-        beidouTrackService.beidouDynamicShipToKafka(records);
         beidouLocationService.beidouDynamicShipToCache(records);
         ack.acknowledge();
     }

+ 1 - 2
beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/BeidouShipLocationDTO.java

@@ -100,7 +100,6 @@ public class BeidouShipLocationDTO extends IGeomesaTrackDTO implements Serializa
             attributes.append("isAnchor:String,");
             attributes.append("deviceStatus:String,");
             attributes.append("xwDeptId:String,");
-
             attributes.append("layerType:String,");
             attributes.append("syncTime:Date");
             return attributes;
@@ -194,7 +193,7 @@ public class BeidouShipLocationDTO extends IGeomesaTrackDTO implements Serializa
                 isOnline,
                 isInport,
                 isAnchor,
-                deviceStatus,
+                deviceStatus==null ? 0 : deviceStatus,
                 xwDeptId,
                 getLayerType(),
                 new Date()

+ 178 - 178
beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/BeidouShipTrackDTO.java

@@ -1,178 +1,178 @@
-package cn.com.taiji.track.dto;
-
-import cn.hutool.core.util.StrUtil;
-import lombok.Data;
-import org.geotools.feature.simple.SimpleFeatureBuilder;
-import org.geotools.util.factory.Hints;
-import org.opengis.feature.simple.SimpleFeature;
-
-import java.io.Serializable;
-import java.util.Date;
-
-
-/**
- * @author kok20
- */
-@Data
-public class BeidouShipTrackDTO extends IGeomesaTrackDTO implements Serializable {
-
-    private static final long serialVersionUID = -8983779632713666636L;
-
-    private String time;
-    private Long trackId;
-    private String deviceId;
-    private Long shipType;
-    private Long workType;
-    private Integer workWay;
-    private String sendTime;
-    private String locationTime;
-    private Integer online;
-    private Integer shipLength;
-    private Integer shipWidth;
-    private Integer texture;
-    private Double longitude;
-    private Double latitude;
-    private Double direction;
-    private Double speed;
-    private Integer kwh;
-    private String shipName;
-    private String location;
-
-    @Override
-    public boolean checkPoint() {
-        if (longitude != null && latitude != null) {
-            location = StrUtil.format("POINT ({} {})", longitude, latitude);
-            return true;
-        }
-        return false;
-    }
-
-    @Override
-    public String getTypeName() { return "beidou-data"; }
-
-    @Override
-    public String getLayerType() {
-        return "geo_beidou_ship";
-    }
-
-    @Override
-    public String getWsLayerName() {
-        return "beidou-ship";
-    }
-
-    @Override
-    public String getFid() {
-        return deviceId;
-    }
-
-    @Override
-    public StringBuilder getSimpleFeatureType(){
-            StringBuilder attributes = new StringBuilder();
-            attributes.append("time:String,");
-            attributes.append("trackId:Long,");
-            attributes.append("deviceId:String:index=true,");
-            attributes.append("shipType:Long,");
-            attributes.append("workType:Long,");
-            attributes.append("workWay:Integer,");
-            attributes.append("sendTime:String,");
-            attributes.append("locationTime:String,");
-            attributes.append("online:Integer,");
-            attributes.append("shipLength:Integer,");
-            attributes.append("shipWidth:Integer,");
-            attributes.append("texture:Integer,");
-            attributes.append("longitude:Double,");
-            attributes.append("latitude:Double,");
-            attributes.append("direction:Double,");
-            attributes.append("speed:Double,");
-            attributes.append("kwh:Integer,");
-            attributes.append("shipName:String,");
-            attributes.append("*location:Point:srid=4326,");
-
-            attributes.append("layerType:String,");
-            attributes.append("syncTime:Date");
-            return attributes;
-    }
-
-    @Override
-    public SimpleFeature toSimpleFeature(SimpleFeatureBuilder builder, String fid) {
-        builder.set("time", time);
-        builder.set("trackId", trackId);
-        builder.set("deviceId", deviceId);
-        builder.set("shipType", shipType);
-        builder.set("workType", workType);
-        builder.set("workWay", workWay);
-        builder.set("sendTime", sendTime);
-        builder.set("locationTime", locationTime);
-        builder.set("online", online);
-        builder.set("shipLength", shipLength);
-        builder.set("shipWidth", shipWidth);
-        builder.set("texture", texture);
-        builder.set("longitude", longitude);
-        builder.set("latitude", latitude);
-        builder.set("direction", direction);
-        builder.set("speed", speed);
-        builder.set("kwh", kwh);
-        builder.set("shipName", shipName);
-
-        builder.set("location", location);
-
-        builder.set("layerType", getLayerType());
-        builder.set("syncTime", new Date());
-        builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
-        return builder.buildFeature(fid);
-    }
-
-    @Override
-    public String[] getPropList(){
-        return new String[] {
-                "time",
-                "trackId",
-//                "deviceId",
-                "shipType",
-                "workType",
-                "workWay",
-                "sendTime",
-                "locationTime",
-                "online",
-                "shipLength",
-                "shipWidth",
-                "texture",
-                "longitude",
-                "latitude",
-                "direction",
-                "speed",
-                "kwh",
-                "shipName",
-                "location",
-                "layerType",
-                "syncTime"
-        };
-    }
-
-    @Override
-    public Object[] getValueList() {
-        return new Object[] {
-                time,
-                trackId,
-//                deviceId,
-                shipType,
-                workType,
-                workWay,
-                sendTime,
-                locationTime,
-                online,
-                shipLength,
-                shipWidth,
-                texture,
-                longitude,
-                latitude,
-                direction,
-                speed,
-                kwh,
-                shipName,
-                location,
-                getLayerType(),
-                new Date()
-        };
-    }
-}
+//package cn.com.taiji.track.dto;
+//
+//import cn.hutool.core.util.StrUtil;
+//import lombok.Data;
+//import org.geotools.feature.simple.SimpleFeatureBuilder;
+//import org.geotools.util.factory.Hints;
+//import org.opengis.feature.simple.SimpleFeature;
+//
+//import java.io.Serializable;
+//import java.util.Date;
+//
+//
+///**
+// * @author kok20
+// */
+//@Data
+//public class BeidouShipTrackDTO extends IGeomesaTrackDTO implements Serializable {
+//
+//    private static final long serialVersionUID = -8983779632713666636L;
+//
+//    private String time;
+//    private Long trackId;
+//    private String deviceId;
+//    private Long shipType;
+//    private Long workType;
+//    private Integer workWay;
+//    private String sendTime;
+//    private String locationTime;
+//    private Integer online;
+//    private Integer shipLength;
+//    private Integer shipWidth;
+//    private Integer texture;
+//    private Double longitude;
+//    private Double latitude;
+//    private Double direction;
+//    private Double speed;
+//    private Integer kwh;
+//    private String shipName;
+//    private String location;
+//
+//    @Override
+//    public boolean checkPoint() {
+//        if (longitude != null && latitude != null) {
+//            location = StrUtil.format("POINT ({} {})", longitude, latitude);
+//            return true;
+//        }
+//        return false;
+//    }
+//
+//    @Override
+//    public String getTypeName() { return "beidou-data"; }
+//
+//    @Override
+//    public String getLayerType() {
+//        return "geo_beidou_ship";
+//    }
+//
+//    @Override
+//    public String getWsLayerName() {
+//        return "beidou-ship";
+//    }
+//
+//    @Override
+//    public String getFid() {
+//        return deviceId;
+//    }
+//
+//    @Override
+//    public StringBuilder getSimpleFeatureType(){
+//            StringBuilder attributes = new StringBuilder();
+//            attributes.append("time:String,");
+//            attributes.append("trackId:Long,");
+//            attributes.append("deviceId:String:index=true,");
+//            attributes.append("shipType:Long,");
+//            attributes.append("workType:Long,");
+//            attributes.append("workWay:Integer,");
+//            attributes.append("sendTime:String,");
+//            attributes.append("locationTime:String,");
+//            attributes.append("online:Integer,");
+//            attributes.append("shipLength:Integer,");
+//            attributes.append("shipWidth:Integer,");
+//            attributes.append("texture:Integer,");
+//            attributes.append("longitude:Double,");
+//            attributes.append("latitude:Double,");
+//            attributes.append("direction:Double,");
+//            attributes.append("speed:Double,");
+//            attributes.append("kwh:Integer,");
+//            attributes.append("shipName:String,");
+//            attributes.append("*location:Point:srid=4326,");
+//
+//            attributes.append("layerType:String,");
+//            attributes.append("syncTime:Date");
+//            return attributes;
+//    }
+//
+//    @Override
+//    public SimpleFeature toSimpleFeature(SimpleFeatureBuilder builder, String fid) {
+//        builder.set("time", time);
+//        builder.set("trackId", trackId);
+//        builder.set("deviceId", deviceId);
+//        builder.set("shipType", shipType);
+//        builder.set("workType", workType);
+//        builder.set("workWay", workWay);
+//        builder.set("sendTime", sendTime);
+//        builder.set("locationTime", locationTime);
+//        builder.set("online", online);
+//        builder.set("shipLength", shipLength);
+//        builder.set("shipWidth", shipWidth);
+//        builder.set("texture", texture);
+//        builder.set("longitude", longitude);
+//        builder.set("latitude", latitude);
+//        builder.set("direction", direction);
+//        builder.set("speed", speed);
+//        builder.set("kwh", kwh);
+//        builder.set("shipName", shipName);
+//
+//        builder.set("location", location);
+//
+//        builder.set("layerType", getLayerType());
+//        builder.set("syncTime", new Date());
+//        builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
+//        return builder.buildFeature(fid);
+//    }
+//
+//    @Override
+//    public String[] getPropList(){
+//        return new String[] {
+//                "time",
+//                "trackId",
+////                "deviceId",
+//                "shipType",
+//                "workType",
+//                "workWay",
+//                "sendTime",
+//                "locationTime",
+//                "online",
+//                "shipLength",
+//                "shipWidth",
+//                "texture",
+//                "longitude",
+//                "latitude",
+//                "direction",
+//                "speed",
+//                "kwh",
+//                "shipName",
+//                "location",
+//                "layerType",
+//                "syncTime"
+//        };
+//    }
+//
+//    @Override
+//    public Object[] getValueList() {
+//        return new Object[] {
+//                time,
+//                trackId,
+////                deviceId,
+//                shipType,
+//                workType,
+//                workWay,
+//                sendTime,
+//                locationTime,
+//                online,
+//                shipLength,
+//                shipWidth,
+//                texture,
+//                longitude,
+//                latitude,
+//                direction,
+//                speed,
+//                kwh,
+//                shipName,
+//                location,
+//                getLayerType(),
+//                new Date()
+//        };
+//    }
+//}

+ 84 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouShipArchives.java

@@ -0,0 +1,84 @@
+package cn.com.taiji.track.entity;
+
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author CHEN
+ * @Date 2022/11/9 9:21
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("ax_beidou_ship_archives")
+public class BeidouShipArchives {
+    /**
+     * 船舶id 如:儋0602365
+     */
+    @TableId
+    private String shipId;
+    /**
+     * 北斗设备号
+     */
+    private String devideNo;
+    /**
+     * 船舶类型
+     */
+    private String shipType;
+    /**
+     * 作业类型
+     */
+    private String jobType;
+    /**
+     * 作业方式
+     */
+    private String jobWay;
+    /**
+     * 船主
+     */
+    private String ownerName;
+    /**
+     * 船主电话
+     */
+    private String ownerTelNo;
+    /**
+     * 船长
+     */
+    private double shipLength;
+    /**
+     * 船宽
+     */
+    private double shipWidth;
+    /**
+     * 船舶材质
+     */
+    private String shipMaterial;
+    /**
+     * 所属管理单位
+     */
+    private String policeStationId;
+    /**
+     * 船主单位
+     */
+    private String ownerCpmpany;
+    /**
+     * 创建时间
+     */
+    private long createTime;
+    /**
+     * 修改时间
+     */
+    private long modifyTime;
+    /**
+     * 船主类型
+     */
+    private String ownerType;
+
+    /**
+     * 最后状态
+     */
+    private Integer lastStatus;
+}

+ 1 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouTrackEntity.java

@@ -14,7 +14,7 @@ import java.io.Serializable;
 @NoArgsConstructor
 @AllArgsConstructor
 @TableName("ax_beidou_track")
-public class BeidouTrackEntity extends BaseEntity {
+public class BeidouTrackEntity {
 
     private Long trackId;
     private String deviceId;

+ 2 - 2
beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/ChannelListener.java

@@ -77,7 +77,7 @@ public class ChannelListener {
                     if (null != cql) {
                         //查询是否存在
                         Query query = new Query(geomesaTrackDTO.getTypeName(), ECQL.toFilter(cql));
-                        query.setMaxFeatures(Integer.parseInt("2000"));
+//                        query.setMaxFeatures(Integer.parseInt("2000"));
                         FeatureReader<SimpleFeatureType, SimpleFeature> reader = redisDataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
                         //判断是否存在
                         while (reader.hasNext()) {
@@ -100,7 +100,7 @@ public class ChannelListener {
                     String realCql = event.getCtx().attr(key1).get();
                     if (null != realCql) {
                         Query realQuery = new Query(geomesaTrackDTO.getTypeName(), ECQL.toFilter(realCql));
-                        realQuery.setMaxFeatures(Integer.parseInt("2000"));
+//                        realQuery.setMaxFeatures(Integer.parseInt("2000"));
                         FeatureReader<SimpleFeatureType, SimpleFeature> realReader = redisDataStore.getFeatureReader(realQuery, Transaction.AUTO_COMMIT);
                         //判断是否存在
                         while (realReader.hasNext()) {

+ 17 - 8
beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/BeidouLocationMapper.java

@@ -17,14 +17,23 @@ import java.util.List;
  */
 @Mapper
 public interface BeidouLocationMapper extends BaseMapper<BeidouLocationEntity> {
-    @Select("SELECT l.*,s.is_online,s.is_inport,s.is_anchor,d.dispose_type,dept.xw_dept_id,\n" +
-            "IFNULL(l.ship_name,ship_id) AS ship_name\n" +
-            "FROM ax_beidou_ship_location l\n" +
-            "LEFT JOIN ax_beidou_ship_status s on s.device_id = l.device_id\n" +
-            "LEFT JOIN ax_beidou_ship_dispose d on d.devide_no = l.device_id\n" +
-            "LEFT JOIN ax_beidou_ship_archives a on a.devide_no = l.device_id\n" +
-            "LEFT JOIN ax_beidou_dept dept on dept.pscbh = a.police_station_id\n" +
-            "ORDER BY device_id,location_time")
+    @Select("SELECT\n" +
+            "\tl.*,\n" +
+            "\ts.is_online,\n" +
+            "\ts.is_inport,\n" +
+            "\ts.is_anchor,\n" +
+            "\tIFNULL( d.dispose_type, 0 ) AS dispose_type,\n" +
+            "\tIFNULL( dept.xw_dept_id, '' ) AS xw_dept_id,\n" +
+            "\tIFNULL( l.ship_name, ship_id ) AS ship_name \n" +
+            "FROM\n" +
+            "\tax_beidou_ship_location l\n" +
+            "\tLEFT JOIN ax_beidou_ship_status s ON s.device_id = l.device_id\n" +
+            "\tLEFT JOIN ax_beidou_ship_dispose d ON d.devide_no = l.device_id\n" +
+            "\tLEFT JOIN ax_beidou_ship_archives a ON a.devide_no = l.device_id\n" +
+            "\tLEFT JOIN ax_beidou_dept dept ON dept.pscbh = a.police_station_id \n" +
+            "ORDER BY\n" +
+            "\tdevice_id,\n" +
+            "\tlocation_time")
     List<BeidouShipLocationDTO> listLocation();
 }
 

+ 19 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/BeidouShipArchivesMapper.java

@@ -0,0 +1,19 @@
+package cn.com.taiji.track.mapper;
+
+import cn.com.taiji.track.entity.BeidouShipArchives;
+import cn.com.taiji.track.entity.BeidouTrackEntity;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * @author chenfangchao
+ * @title: GeoServerMapper
+ * @projectName ax-geoserver-project
+ * @description: TODO
+ * @date 2022/8/16 7:54 PM
+ */
+@Mapper
+public interface BeidouShipArchivesMapper extends BaseMapper<BeidouShipArchives> {
+
+}
+

+ 69 - 70
beidou-track-geomesa/src/main/java/cn/com/taiji/track/producer/BeidouShipTestProducer.java

@@ -1,70 +1,69 @@
-package cn.com.taiji.track.producer;
-
-import cn.com.taiji.track.dto.BeidouShipTrackDTO;
-import cn.hutool.core.date.DateUtil;
-import cn.hutool.json.JSONUtil;
-import com.alibaba.fastjson.JSONObject;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-import javax.annotation.Resource;
-import java.util.UUID;
-
-/**
- * @author chenfangchao
- * @title: BeidouShipTestProducer
- * @projectName ax-geoserver-project
- * @description: TODO
- * @date 2022/8/15 9:41 PM
- */
-@RestController
-@RequestMapping("/producer")
-public class BeidouShipTestProducer {
-
-
-    @Resource
-    private KafkaTemplate kafkaTemplate;
-
-    @GetMapping("/beidou")
-    public void  testBeidouShipData(){
-
-        for (int i = 0; i < 100; i++) {
-            String time = String.valueOf(DateUtil.date());
-            Object msg = "{\n" +
-                    "  \"id\": \"00b72b21b7cb41feb360a0db59403712\",\n" +
-                    "  \"content\": \"167153587,15011858,255,255,0,2022-04-02 16:22:13,2022-04-02 16:22:12,1,39,12,255,110.636777,19.260833,0,0,3\",\n" +
-                    "  \"time\": \"2022-04-02 16:22:14\",\n" +
-                    "  \"curren_time\": \"2022-04-02 16:22:15\",\n" +
-                    "  \"trackId\": \"167153587\",\n" +
-                    "  \"deviceId\": \"15011858\",\n" +
-                    "  \"shipType\": \"255\",\n" +
-                    "  \"workType\": \"255\",\n" +
-                    "  \"workWay\": \"0\",\n" +
-                    "  \"sendTime\": \"+ " + time + " +\",\n" +
-                    "  \"locationTime\": \"2022-04-02 16:22:12\",\n" +
-                    "  \"online\": \"1\",\n" +
-                    "  \"shipLength\": \"39\",\n" +
-                    "  \"shipWidth\": \"12\",\n" +
-                    "  \"texture\": \"255\",\n" +
-                    "  \"longitude\": \"110.636777\",\n" +
-                    "  \"latitude\": \"19.260833\",\n" +
-                    "  \"direction\": \"0\",\n" +
-                    "  \"speed\": \"0\",\n" +
-                    "  \"kwh\": \"3\",\n" +
-                    "  \"kafka_xrsj\": \"2022-04-02 16:22:19.932\"\n" +
-                    "}\n";
-            BeidouShipTrackDTO beidou = JSONObject.parseObject(msg.toString(), BeidouShipTrackDTO.class);
-            beidou.setTrackId(1001L);
-            beidou.setDeviceId(String.valueOf(i));
-            beidou.setShipName(UUID.randomUUID().toString());
-            beidou.setLongitude(beidou.getLongitude());
-            beidou.setLatitude(beidou.getLatitude());
-
-            String data = JSONUtil.toJsonStr(beidou);
-            kafkaTemplate.send("sgAxBeidouTrack", data);
-        }
-    }
-
-}
+//package cn.com.taiji.track.producer;
+//
+//import cn.hutool.core.date.DateUtil;
+//import cn.hutool.json.JSONUtil;
+//import com.alibaba.fastjson.JSONObject;
+//import org.springframework.kafka.core.KafkaTemplate;
+//import org.springframework.web.bind.annotation.GetMapping;
+//import org.springframework.web.bind.annotation.RequestMapping;
+//import org.springframework.web.bind.annotation.RestController;
+//
+//import javax.annotation.Resource;
+//import java.util.UUID;
+//
+///**
+// * @author chenfangchao
+// * @title: BeidouShipTestProducer
+// * @projectName ax-geoserver-project
+// * @description: TODO
+// * @date 2022/8/15 9:41 PM
+// */
+//@RestController
+//@RequestMapping("/producer")
+//public class BeidouShipTestProducer {
+//
+//
+//    @Resource
+//    private KafkaTemplate kafkaTemplate;
+//
+//    @GetMapping("/beidou")
+//    public void  testBeidouShipData(){
+//
+//        for (int i = 0; i < 100; i++) {
+//            String time = String.valueOf(DateUtil.date());
+//            Object msg = "{\n" +
+//                    "  \"id\": \"00b72b21b7cb41feb360a0db59403712\",\n" +
+//                    "  \"content\": \"167153587,15011858,255,255,0,2022-04-02 16:22:13,2022-04-02 16:22:12,1,39,12,255,110.636777,19.260833,0,0,3\",\n" +
+//                    "  \"time\": \"2022-04-02 16:22:14\",\n" +
+//                    "  \"curren_time\": \"2022-04-02 16:22:15\",\n" +
+//                    "  \"trackId\": \"167153587\",\n" +
+//                    "  \"deviceId\": \"15011858\",\n" +
+//                    "  \"shipType\": \"255\",\n" +
+//                    "  \"workType\": \"255\",\n" +
+//                    "  \"workWay\": \"0\",\n" +
+//                    "  \"sendTime\": \"+ " + time + " +\",\n" +
+//                    "  \"locationTime\": \"2022-04-02 16:22:12\",\n" +
+//                    "  \"online\": \"1\",\n" +
+//                    "  \"shipLength\": \"39\",\n" +
+//                    "  \"shipWidth\": \"12\",\n" +
+//                    "  \"texture\": \"255\",\n" +
+//                    "  \"longitude\": \"110.636777\",\n" +
+//                    "  \"latitude\": \"19.260833\",\n" +
+//                    "  \"direction\": \"0\",\n" +
+//                    "  \"speed\": \"0\",\n" +
+//                    "  \"kwh\": \"3\",\n" +
+//                    "  \"kafka_xrsj\": \"2022-04-02 16:22:19.932\"\n" +
+//                    "}\n";
+//            BeidouShipTrackDTO beidou = JSONObject.parseObject(msg.toString(), BeidouShipTrackDTO.class);
+//            beidou.setTrackId(1001L);
+//            beidou.setDeviceId(String.valueOf(i));
+//            beidou.setShipName(UUID.randomUUID().toString());
+//            beidou.setLongitude(beidou.getLongitude());
+//            beidou.setLatitude(beidou.getLatitude());
+//
+//            String data = JSONUtil.toJsonStr(beidou);
+//            kafkaTemplate.send("sgAxBeidouTrack", data);
+//        }
+//    }
+//
+//}

+ 0 - 5
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouTrackService.java

@@ -13,11 +13,6 @@ import java.util.List;
  * @Date: 2021/11/30 9:58 下午
  */
 public interface IBeidouTrackService extends IService<BeidouTrackEntity> {
-
-    void beidouDynamicShipToRedis(List<ConsumerRecord<?, ?>> records);
-
     void beidouDynamicShipToCache(List<ConsumerRecord<?, ?>> records);
     void cacheBeidouToMySql();
-
-    void beidouDynamicShipToKafka(List<ConsumerRecord<?,?>> records);
 }

+ 18 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouLocationServiceImpl.java

@@ -3,12 +3,18 @@ package cn.com.taiji.track.service.impl;
 import cn.com.taiji.track.dto.BeidouShipLocationDTO;
 import cn.com.taiji.track.dto.IGeomesaTrackDTO;
 import cn.com.taiji.track.entity.BeidouLocationEntity;
+import cn.com.taiji.track.entity.BeidouShipArchives;
+import cn.com.taiji.track.entity.BeidouTrackEntity;
 import cn.com.taiji.track.mapper.BeidouLocationMapper;
+import cn.com.taiji.track.mapper.BeidouShipArchivesMapper;
 import cn.com.taiji.track.service.IBeidouLocationService;
 import cn.com.taiji.track.service.IShipStatusService;
 import cn.com.taiji.track.utils.LatLngUtil;
 import cn.hutool.core.util.StrUtil;
+import com.alibaba.druid.util.StringUtils;
+import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -51,6 +57,8 @@ public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper,
     private RedisDataStore redisDataStore;
     @Autowired
     private IGeomesaTrackDTO geomesaLocationDTO;
+    @Autowired
+    private BeidouShipArchivesMapper shipArchivesMapper;
     @Override
     public void beidouDynamicShipToCache(List<ConsumerRecord<?, ?>> records){
         try {
@@ -80,6 +88,16 @@ public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper,
         if(locationEntities.size() > 0){
             List<BeidouLocationEntity> saveList = new ArrayList<>(locationEntities);
             locationEntities.clear();
+            for(BeidouLocationEntity e : saveList){
+                if(StringUtils.isEmpty(e.getShipName())){
+                    QueryWrapper<BeidouShipArchives> wrapper = new QueryWrapper();
+                    wrapper.eq("devide_no",e.getDeviceId());
+                    BeidouShipArchives shipArchives = shipArchivesMapper.selectOne(wrapper);
+                    if(shipArchives!=null){
+                        e.setShipName(shipArchives.getShipId());
+                    }
+                }
+            }
             saveBatch(saveList);
             log.info("位置数据写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
         }

+ 0 - 109
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouTrackServiceImpl.java

@@ -41,96 +41,6 @@ import java.util.*;
 @Slf4j
 public class BeidouTrackServiceImpl extends ServiceImpl<BeidouTrackMapper, BeidouTrackEntity> implements IBeidouTrackService {
 
-    @Autowired
-    protected StringRedisTemplate redisTemplate;
-    @Autowired
-    private SimpleFeatureType sft;
-    @Autowired
-    private RedisDataStore redisDataStore;
-    @Autowired
-    private IGeomesaTrackDTO geomesaTrackDTO;
-    @Resource
-    private KafkaTemplate kafkaTemplate;
-    @Value("${taiji.kafka.productor.beidou.topic}")
-    private String kafkaTopic;
-    /**
-     * 写入Redis
-     * @param records
-     */
-    @Override
-    public void beidouDynamicShipToRedis(List<ConsumerRecord<?, ?>> records){
-        Date begain = null;
-        try {
-            begain = new Date();
-            FeatureReader<SimpleFeatureType, SimpleFeature> reader = null;
-            FeatureWriter<SimpleFeatureType, SimpleFeature> writer = null;
-
-            try {
-                writer = redisDataStore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT);
-                SimpleFeatureBuilder builder = new SimpleFeatureBuilder(sft);
-                for (ConsumerRecord<?, ?> record : records) {
-                    Optional message = Optional.ofNullable(record.value());
-                    if (message.isPresent()) {
-                        Object msg = message.get();
-                        IGeomesaTrackDTO dto = JSONObject.parseObject(msg.toString(), geomesaTrackDTO.getClass());
-                        dto.setKafkaConsumerGroup(geomesaTrackDTO.getKafkaConsumerGroup());
-                        dto.setExpirySeconds(geomesaTrackDTO.getExpirySeconds());
-                        if (!dto.checkPoint()) {
-                            continue;
-                        }
-                        String fid = dto.getFid();
-                        boolean append = true;
-                        FilterFactory2 filterFactory = CommonFactoryFinder.getFilterFactory2();
-                        Filter filter = null;
-                        if(null != fid && !"".equals(fid)){
-                            filter = filterFactory.id(filterFactory.featureId(fid));
-                        }else {
-                            String id = UUID.randomUUID().toString();
-                            filter = filterFactory.id(filterFactory.featureId(id));
-                        }
-                        Query query = new Query(dto.getTypeName(), filter);
-                        reader = redisDataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
-                        if (reader.hasNext()) {
-                            append = false;
-                            String[] propList = dto.getPropList();
-                            Object[] valueList = dto.getValueList();
-                            redisDataStore.getFeatureSource(dto.getTypeName()).modifyFeatures(propList, valueList, filter);
-                        }
-                        reader.close();
-                        reader = null;
-                        if (append) {
-                            SimpleFeature toWrite = writer.next();
-                            builder.reset();
-                            SimpleFeature feature = dto.toSimpleFeature(builder, fid);
-                            toWrite.setAttributes(feature.getAttributes());
-                            toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
-                            ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
-                            toWrite.getUserData().putAll(feature.getUserData());
-                            writer.write();
-                        }
-                    }
-                }
-            }  catch (Exception e) {
-                log.error("=======发生异常============");
-                e.printStackTrace();
-            } finally {
-                try {
-                    if (null != writer) {
-                        writer.close();
-                    }
-                    if (null != reader) {
-                        reader.close();
-                    }
-                } catch (Exception e) {
-                    log.error("=======close操作发生异常============");
-                    e.printStackTrace();
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-//        log.info("轨迹数据写入图层完成!耗时 {} 毫秒, 从 消费组 {} 中合计消费 {} 条记录", (new Date()).getTime() - begain.getTime(), geomesaTrackDTO.getKafkaConsumerGroup(), records.size());
-    }
     private  Vector<BeidouTrackEntity> trackEntities= new Vector<>();
 
     @Override
@@ -141,8 +51,6 @@ public class BeidouTrackServiceImpl extends ServiceImpl<BeidouTrackMapper, Beido
                 if (message.isPresent()) {
                     Object msg = message.get();
                     BeidouTrackEntity entity = JSONObject.parseObject(msg.toString(), BeidouTrackEntity.class);
-                    entity.setCreateTime(new Date());
-                    entity.setUpdateTime(new Date());
                     trackEntities.add(entity);
                 }
             }
@@ -161,21 +69,4 @@ public class BeidouTrackServiceImpl extends ServiceImpl<BeidouTrackMapper, Beido
             log.info("轨迹数据写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
         }
     }
-
-    /**
-     * 写入Kafka
-     * @param records
-     */
-    @Override
-    public void beidouDynamicShipToKafka(List<ConsumerRecord<?, ?>> records) {
-        Date begin = new Date();
-        for (ConsumerRecord<?, ?> record : records) {
-            Optional message = Optional.ofNullable(record.value());
-            Object msg = message.get();
-            BeidouTrackEntity entity = JSONObject.parseObject(msg.toString(), BeidouTrackEntity.class);
-            String data = JSONUtil.toJsonStr(entity);
-            kafkaTemplate.send(kafkaTopic, entity.getDeviceId(),data);
-        }
-//        log.info("轨迹数据转发kafka完成!耗时 {} 毫秒, 合计转发 {} 条记录", (new Date()).getTime() - begin.getTime(), records.size());
-    }
 }

+ 4 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/ShipStatusServiceImpl.java

@@ -51,6 +51,7 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
                     ShipStatusEntity.AnchorStatus status = ShipStatusEntity.AnchorStatus.getStatus(start);
                     if(!status.equals(statusEntity.getIsAnchor())){
                         statusEntity.setIsAnchor(status);
+                        statusEntity.setAnchorChangeTime(new Date());
                         saveList.add(statusEntity);
                     }
                     map.remove(statusEntity.getDeviceId());
@@ -87,6 +88,7 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
                     ShipStatusEntity.InportStatus status = ShipStatusEntity.InportStatus.getStatus(inportCode);
                     if(!status.equals(statusEntity.getIsAnchor())){
                         statusEntity.setIsInport(status);
+                        statusEntity.setInportChangeTime(new Date());
                         saveList.add(statusEntity);
                     }
                     map.remove(statusEntity.getDeviceId());
@@ -122,8 +124,9 @@ public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipSta
                 for(ShipStatusEntity statusEntity : statusEntityList){
                     String start = map.get(statusEntity.getDeviceId());
                     ShipStatusEntity.OnlineStatus status = ShipStatusEntity.OnlineStatus.getStatus(start);
-                    if(!status.equals(statusEntity.getIsOnline())){
+                    if(!ShipStatusEntity.OnlineStatus.MARK.equals(statusEntity.getIsOnline()) && !status.equals(statusEntity.getIsOnline())){
                         statusEntity.setIsOnline(status);
+                        statusEntity.setOnlineChangeTime(new Date());
                         saveList.add(statusEntity);
                     }
                     map.remove(statusEntity.getDeviceId());

+ 0 - 5
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/WarningRecordServiceImpl.java

@@ -43,11 +43,6 @@ public class WarningRecordServiceImpl extends ServiceImpl<WarningRecordMapper, W
                     entities.add(entity);
                     if (WarningCodeConstants.OFFLINE_WARNING.equals(entity.getModelCode())) {
                         shipStatusService.pushOnLineDeviceId(entity.getDeviceId(),entity.getStart());
-//                        if("true".equals(entity.getStart())){
-//                            shipStatusService.pushOffLineDeviceId(entity.getDeviceId());
-//                        }else{
-//                            shipStatusService.pushOnLineDeviceId(entity.getDeviceId());
-//                        }
                     }
                     if (WarningCodeConstants.ANCHOR_WARNING.equals(entity.getModelCode())) {
                         shipStatusService.pushAnchorDeviceId(entity.getDeviceId(),entity.getStart());

+ 2 - 6
beidou-track-geomesa/src/main/resources/application-prod.yml

@@ -102,16 +102,12 @@ spring:
 taiji:
   geomesa.store.redis.config: redis.url===redis://10.112.89.239:6379###redis.catalog===geomesa###redis.connection.pool.size===100###redis.connection.pool.validate===true###geomesa.stats.enable===false###geomesa.stats.generate===false###geomesa.query.caching===false
   kafka:
-    productor:
-      beidou:
-        topic: taiji_ax_beidou_dynamic_ship
     consumer:
-      bootstrap-servers: 172.28.19.20:29092
       beidou:
         enable: true
         expiry: 3600
-        topic: sgAxBeidouTrack
-        group: sgAxBeidouTrack—${random.uuid}
+        topic: taiji_ax_beidou_dynamic_ship
+        group: taiji_ax_beidou_dynamic_ship—${random.uuid}
       warning:
         topic: taiji_ax_ship_warning_record
         group: taiji_ax_ship_warning_record—${random.uuid}