Pārlūkot izejas kodu

提交预警解析逻辑与船舶位置状态逻辑

liangjianf 2 gadi atpakaļ
vecāks
revīzija
394ecd7481
35 mainītis faili ar 1113 papildinājumiem un 267 dzēšanām
  1. 25 1
      beidou-track-geomesa/pom.xml
  2. 2 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/GeoServerApplication.java
  3. 52 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/KafkaConfig.java
  4. 15 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/constants/WarningCodeConstants.java
  5. 8 2
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouShipTrackConsumer.java
  6. 35 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouWarningRecordConsumer.java
  7. 0 26
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/controller/BeidouController.java
  8. 18 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/controller/BeidouTrackController.java
  9. 2 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/FocusTrackQueryDTO.java
  10. 0 7
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/HistoryTrackQueryDTO.java
  11. 26 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouLocationEntity.java
  12. 0 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouTrackEntity.java
  13. 7 4
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/FocusShipEntity.java
  14. 61 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/ShipStatusEntity.java
  15. 47 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/WarningRecordEntity.java
  16. 1 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/handler/FocusTrackHandler.java
  17. 11 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/FocusTrackChannelListener.java
  18. 4 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/HistoryTrackChannelListener.java
  19. 18 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/BeidouLocationMapper.java
  20. 15 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/ShipStatusMapper.java
  21. 16 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/WarningRecordMapper.java
  22. 43 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/schedule/PersistenceSchedule.java
  23. 19 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouLocationService.java
  24. 4 1
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouTrackService.java
  25. 16 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IShipStatusService.java
  26. 18 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IWarningRecordService.java
  27. 57 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouLocationServiceImpl.java
  28. 41 9
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouTrackServiceImpl.java
  29. 77 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/ShipStatusServiceImpl.java
  30. 67 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/WarningRecordServiceImpl.java
  31. 138 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/utils/LatLngUtil.java
  32. 180 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/track/utils/StarRocksStreamLoad.java
  33. 17 9
      beidou-track-geomesa/src/main/resources/application-dev.yml
  34. 72 204
      beidou-track-geomesa/src/main/resources/application-prod.yml
  35. 1 1
      beidou-track-geomesa/src/main/resources/application.yml

+ 25 - 1
beidou-track-geomesa/pom.xml

@@ -16,7 +16,7 @@
     </parent>
 
     <artifactId>beidou-track-geomesa</artifactId>
-    <name>GeoMesa Tutorials - Redis - Quick Start</name>
+    <name>beidou-track-geomesa</name>
 
     <dependencies>
         <dependency>
@@ -125,8 +125,32 @@
 
     <repositories>
         <repository>
+            <id>Aliyun</id>
+            <url>https://maven.aliyun.com/repository/public</url>
+        </repository>
+        <repository>
             <id>GeoSolutions</id>
             <url>https://repo.osgeo.org/repository/release/</url>
         </repository>
     </repositories>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>2.2.6.RELEASE</version>
+                <configuration>
+                    <classifier>exec</classifier>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>

+ 2 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/GeoServerApplication.java

@@ -2,6 +2,7 @@ package cn.com.taiji.track;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
 /**
  * @author chenfangchao
@@ -11,6 +12,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
  * @date 2022/8/11 3:17 PM
  */
 @SpringBootApplication
+@EnableScheduling
 public class GeoServerApplication {
     public static void main(String[] args) {
         SpringApplication.run(GeoServerApplication.class,args);

+ 52 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/KafkaConfig.java

@@ -0,0 +1,52 @@
+package cn.com.taiji.track.config;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
+import org.springframework.kafka.listener.ContainerProperties;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@ConditionalOnProperty(name = "taiji.kafka.consumer.beidou.enable",havingValue = "true")
+public class KafkaConfig {
+
+    @Value("${taiji.kafka.consumer.bootstrap-servers}")
+    private String bootstrapServers;
+    @Value("${taiji.kafka.consumer.beidou.group}")
+    private String groupId;
+
+    @Bean
+    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaContainerFactory() {
+        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+        factory.setConsumerFactory(consumerFactory());
+        factory.setConcurrency(3);
+        factory.getContainerProperties().setPollTimeout(3000);
+        factory.setBatchListener(true);
+        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+        return factory;
+    }
+
+    public ConsumerFactory<Integer, String> consumerFactory() {
+        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
+    }
+
+    private Map<String, Object> consumerConfigs() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        return props;
+    }
+}

+ 15 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/constants/WarningCodeConstants.java

@@ -0,0 +1,15 @@
+package cn.com.taiji.track.constants;
+
+/**
+ * @author chenfangchao
+ * @title: TopicConstants
+ * @projectName geomesa-tutorials
+ * @description: TODO
+ * @date 2022/9/5 10:07 AM
+ */
+public class WarningCodeConstants {
+
+    public static final String OFFLINE_WARNING = "TJ_CB_10_02";
+
+    public static final String FUSION_SHIP = "taiji_ax_ship_dynamic_fusion";
+}

+ 8 - 2
beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouShipTrackConsumer.java

@@ -1,5 +1,6 @@
 package cn.com.taiji.track.consumer;
 
+import cn.com.taiji.track.service.IBeidouLocationService;
 import cn.com.taiji.track.service.IBeidouTrackService;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -22,14 +23,19 @@ public class BeidouShipTrackConsumer {
 
     @Autowired
     private IBeidouTrackService beidouTrackService;
+    @Autowired
+    private IBeidouLocationService beidouLocationService;
 
     @KafkaListener(
-            groupId = "${taiji.kafka.consumer.beidou.group}",
+//            containerFactory = "kafkaContainerFactory",
+            groupId = "${taiji.kafka.consumer.beidou.group}", //测试时切换配置
             topics = {"${taiji.kafka.consumer.beidou.topic}"}
     )
     public void dynamicTrack(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
         beidouTrackService.beidouDynamicShipToRedis(records);
-//        beidouTrackService.beidouDynamicShipToMySql(records);
+        beidouTrackService.beidouDynamicShipToCache(records);
+        beidouTrackService.beidouDynamicShipToKafka(records);
+        beidouLocationService.beidouDynamicShipToCache(records);
         ack.acknowledge();
     }
 }

+ 35 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouWarningRecordConsumer.java

@@ -0,0 +1,35 @@
+package cn.com.taiji.track.consumer;
+
+import cn.com.taiji.track.service.IBeidouTrackService;
+import cn.com.taiji.track.service.IWarningRecordService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * 消费预警记录
+ * @author kok20
+ */
+@Component
+@Slf4j
+@ConditionalOnProperty(name = "taiji.kafka.consumer.beidou.enable",havingValue = "true")
+public class BeidouWarningRecordConsumer {
+
+    @Autowired
+    private IWarningRecordService warningRecordService;
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.warning.group}",
+            topics = {"${taiji.kafka.consumer.warning.topic}"}
+    )
+    public void consumer(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        warningRecordService.warningRecordToCache(records);
+        ack.acknowledge();
+    }
+}

+ 0 - 26
beidou-track-geomesa/src/main/java/cn/com/taiji/track/controller/BeidouController.java

@@ -1,26 +0,0 @@
-package cn.com.taiji.track.controller;
-
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-import javax.annotation.Resource;
-
-
-/**
- * @author kok20
- */
-@RestController
-@RequestMapping("/beidou")
-public class BeidouController {
-
-
-    @Resource
-    private KafkaTemplate kafkaTemplate;
-
-    @GetMapping("/beidou")
-    public void  testBeidouShipData(){
-    }
-
-}

+ 18 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/controller/BeidouTrackController.java

@@ -0,0 +1,18 @@
+package cn.com.taiji.track.controller;
+
+import cn.com.taiji.track.service.IBeidouTrackService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+
+/**
+ * @author kok20
+ */
+@RestController
+@RequestMapping("/beidouTrack")
+public class BeidouTrackController {
+    @Autowired
+    private IBeidouTrackService beidouTrackService;
+
+}

+ 2 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/FocusTrackQueryDTO.java

@@ -10,6 +10,8 @@ import lombok.Data;
 @Data
 public class FocusTrackQueryDTO {
     private String userId;
+    private String cql;
+    private String realCql;
 
     @Override
     public String toString() {

+ 0 - 7
beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/HistoryTrackQueryDTO.java

@@ -1,14 +1,7 @@
 package cn.com.taiji.track.dto;
 
-import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSON;
 import lombok.Data;
-import org.geotools.feature.simple.SimpleFeatureBuilder;
-import org.geotools.util.factory.Hints;
-import org.opengis.feature.simple.SimpleFeature;
-
-import java.io.Serializable;
-import java.util.Date;
 
 
 /**

+ 26 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouLocationEntity.java

@@ -0,0 +1,26 @@
+package cn.com.taiji.track.entity;
+
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @author wudskq
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("ax_beidou_ship_location")
+public class BeidouLocationEntity {
+
+    @TableId
+    private String deviceId;
+    private String location;
+    private String locationTime;
+    private String longitude;
+    private String latitude;
+    private String kwh;
+    private String shipName;
+}

+ 0 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouTrackEntity.java

@@ -16,7 +16,6 @@ import java.io.Serializable;
 @TableName("ax_beidou_track")
 public class BeidouTrackEntity extends BaseEntity {
 
-    private String time;
     private Long trackId;
     private String deviceId;
     private Long shipType;

+ 7 - 4
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/FocusShipEntity.java

@@ -1,11 +1,12 @@
 package cn.com.taiji.track.entity;
 
 import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-import java.sql.Date;
+import java.util.Date;
 
 /**
  * @Author CHEN
@@ -14,12 +15,12 @@ import java.sql.Date;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor
-@TableName("ax_beidou_focus")
-public class FocusShipEntity extends BaseEntity{
+@TableName("ax_beidou_focus_ship")
+public class FocusShipEntity {
     /**
      * 重点关注id
      */
-    private Integer focusId;
+    private String focusId;
     /**
      * 关注船舶id
      */
@@ -28,4 +29,6 @@ public class FocusShipEntity extends BaseEntity{
      * 关注人id
      */
     private String userId;
+    /** 创建时间 */
+    private String createTime;
 }

+ 61 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/ShipStatusEntity.java

@@ -0,0 +1,61 @@
+package cn.com.taiji.track.entity;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonValue;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Date;
+
+/**
+ * @Author CHEN
+ * @Date 2022/11/9 18:00
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("ax_beidou_ship_status")
+public class ShipStatusEntity{
+    /**
+     * 重点关注id
+     */
+    @TableId
+    private String deviceId;
+    /**
+     *  在线状态
+     */
+    private OnlineStatus isOnline;
+    /**
+     * 在线状态变更时间
+     */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
+    private Date onlineChangeTime;
+    /**  0否 1是 */
+    private String isInport;
+    /** 在港状态变更时间 */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
+    private Date inportChangeTime;
+
+    public enum OnlineStatus {
+        ONLINE("1","在线"),
+        OFFLINE("2","离线"),
+        MARK("3","标记");
+        @EnumValue
+        private final String value;
+        @JsonValue    //需要在前端展示哪个值就在哪个属性上加上该注解
+        private String text;
+
+        public String getValue() {
+            return value;
+        }
+
+        private OnlineStatus(String value,String text) {
+            this.value = value;
+            this.text = text;
+        }
+    }
+}

+ 47 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/WarningRecordEntity.java

@@ -0,0 +1,47 @@
+package cn.com.taiji.track.entity;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * @Author CHEN
+ * @Date 2022/11/9 18:00
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("ax_beidou_warning_record")
+public class WarningRecordEntity extends BaseEntity{
+    /**
+     * 规则id
+     */
+    private String ruleId;
+    /**
+     * 模型船舶id
+     */
+    private String modelId;
+    /**
+     * 模型船舶编号
+     */
+    private String modelCode;
+    /**
+     * 设备ID
+     */
+    private String deviceId;
+    /**
+     * 预警区域ID
+     */
+    private String areaId;
+    /** 起始预警信号 */
+    private String start;
+    /** 处置状态 */
+    private String status;
+    /** 轨迹信息 */
+    private String dynamicShip;
+    /** 预警时间 */
+    private String warningTime;
+
+
+}

+ 1 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/handler/FocusTrackHandler.java

@@ -70,7 +70,7 @@ public class FocusTrackHandler extends SimpleChannelInboundHandler<TextWebSocket
             ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
         }
         // 回复消息
-//        ctx.channel().writeAndFlush(new TextWebSocketFrame("此次会话CQL-ID为 "+ ctx.channel().id().asShortText() + " 查询的条件为:" + dto.toString() + "与服务器成功通信!"));
+        ctx.channel().writeAndFlush(new TextWebSocketFrame("此次会话ID为 "+ ctx.channel().id().asShortText() + " 查询的条件为:" + dto.toString() + "与服务器成功通信!"));
     }
 
     /**

+ 11 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/FocusTrackChannelListener.java

@@ -21,6 +21,7 @@ import org.geotools.data.Transaction;
 import org.geotools.factory.CommonFactoryFinder;
 import org.geotools.filter.text.cql2.CQL;
 import org.geotools.filter.text.cql2.CQLException;
+import org.geotools.filter.text.ecql.ECQL;
 import org.locationtech.geomesa.redis.data.RedisDataStore;
 import org.locationtech.jts.geom.Geometry;
 import org.opengis.feature.Property;
@@ -122,7 +123,17 @@ public class FocusTrackChannelListener {
                     filters.add(filter);
                 }
                 Filter filter = ff.or(filters);
+                String cql = dto.getCql();
+                String realCql = dto.getRealCql();
+                if (StringUtils.hasText(cql)) {
+                    Filter cqlFilter = ECQL.toFilter(cql);
+                    filter = ff.and(filter,cqlFilter);
+                }if (StringUtils.hasText(realCql)) {
+                    Filter realCqlFilter = ECQL.toFilter(realCql);
+                    filter = ff.and(filter,realCqlFilter);
+                }
                 Query query = new Query(geomesaTrackDTO.getTypeName(), filter);
+                query.setMaxFeatures(Integer.parseInt("2000"));
                 FeatureReader<SimpleFeatureType, SimpleFeature> focusReader = redisDataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
                 //判断是否存在
                 while (focusReader.hasNext()) {

+ 4 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/HistoryTrackChannelListener.java

@@ -18,7 +18,10 @@ import org.springframework.util.StringUtils;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author chenfangchao

+ 18 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/BeidouLocationMapper.java

@@ -0,0 +1,18 @@
+package cn.com.taiji.track.mapper;
+
+import cn.com.taiji.track.entity.BeidouLocationEntity;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * @author chenfangchao
+ * @title: GeoServerMapper
+ * @projectName ax-geoserver-project
+ * @description: TODO
+ * @date 2022/8/16 7:54 PM
+ */
+@Mapper
+public interface BeidouLocationMapper extends BaseMapper<BeidouLocationEntity> {
+
+}
+

+ 15 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/ShipStatusMapper.java

@@ -0,0 +1,15 @@
+package cn.com.taiji.track.mapper;
+
+import cn.com.taiji.track.entity.ShipStatusEntity;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+
+/**
+ * @author kok20
+ */
+@Mapper
+public interface ShipStatusMapper extends BaseMapper<ShipStatusEntity> {
+
+}
+

+ 16 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/WarningRecordMapper.java

@@ -0,0 +1,16 @@
+package cn.com.taiji.track.mapper;
+
+import cn.com.taiji.track.entity.FocusShipEntity;
+import cn.com.taiji.track.entity.WarningRecordEntity;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+
+/**
+ * @author kok20
+ */
+@Mapper
+public interface WarningRecordMapper extends BaseMapper<WarningRecordEntity> {
+
+}
+

+ 43 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/schedule/PersistenceSchedule.java

@@ -0,0 +1,43 @@
+package cn.com.taiji.track.schedule;
+
+import cn.com.taiji.track.service.IBeidouLocationService;
+import cn.com.taiji.track.service.IBeidouTrackService;
+import cn.com.taiji.track.service.IShipStatusService;
+import cn.com.taiji.track.service.IWarningRecordService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author kok20
+ * 适应starrocks保存逻辑,通过定时器降低保存时间间隔
+ */
+@Component
+public class PersistenceSchedule {
+    @Autowired
+    private IBeidouTrackService beidouTrackService;
+    @Autowired
+    private IWarningRecordService warningRecordService;
+    @Autowired
+    private IBeidouLocationService beidouLocationService;
+    @Autowired
+    private IShipStatusService shipStatusService;
+
+    @Scheduled(cron = "*/10 * * * * ?")
+    private void cacheBeidouToMySql() {
+        beidouTrackService.cacheBeidouToMySql();
+    }
+    @Scheduled(cron = "*/10 * * * * ?")
+    private void cacheWarningToMySql() {
+        warningRecordService.cacheToMySql();
+    }
+    @Scheduled(cron = "*/10 * * * * ?")
+    private void cacheLocationToMySql() {
+        beidouLocationService.cacheBeidouToMySql();
+    }
+    @Scheduled(cron = "*/10 * * * * ?")
+    private void cacheStatusToMySql() {
+        shipStatusService.cacheToMySql();
+    }
+}
+

+ 19 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouLocationService.java

@@ -0,0 +1,19 @@
+package cn.com.taiji.track.service;
+
+import cn.com.taiji.track.entity.BeidouLocationEntity;
+import com.baomidou.mybatisplus.extension.service.IService;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.List;
+
+/**
+ * @Description:
+ * @Author: hujie@umisoft.cn
+ * @Date: 2021/11/30 9:58 下午
+ */
+public interface IBeidouLocationService extends IService<BeidouLocationEntity> {
+
+    void beidouDynamicShipToCache(List<ConsumerRecord<?, ?>> records);
+
+    void cacheBeidouToMySql();
+}

+ 4 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouTrackService.java

@@ -15,5 +15,8 @@ public interface IBeidouTrackService extends IService<BeidouTrackEntity> {
 
     void beidouDynamicShipToRedis(List<ConsumerRecord<?, ?>> records);
 
-    void beidouDynamicShipToMySql(List<ConsumerRecord<?, ?>> records);
+    void beidouDynamicShipToCache(List<ConsumerRecord<?, ?>> records);
+    void cacheBeidouToMySql();
+
+    void beidouDynamicShipToKafka(List<ConsumerRecord<?,?>> records);
 }

+ 16 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IShipStatusService.java

@@ -0,0 +1,16 @@
+package cn.com.taiji.track.service;
+
+import cn.com.taiji.track.entity.ShipStatusEntity;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+/**
+ * @author kok20
+ */
+public interface IShipStatusService extends IService<ShipStatusEntity> {
+
+    void pushOffLineDeviceId(String deviceId);
+
+    void cacheToMySql();
+
+    void pushOnLineDeviceId(String deviceId);
+}

+ 18 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IWarningRecordService.java

@@ -0,0 +1,18 @@
+package cn.com.taiji.track.service;
+
+import cn.com.taiji.track.entity.BeidouTrackEntity;
+import cn.com.taiji.track.entity.WarningRecordEntity;
+import com.baomidou.mybatisplus.extension.service.IService;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.List;
+
+/**
+ * @author kok20
+ */
+public interface IWarningRecordService extends IService<WarningRecordEntity> {
+
+    void warningRecordToCache(List<ConsumerRecord<?, ?>> records);
+
+    void cacheToMySql();
+}

+ 57 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouLocationServiceImpl.java

@@ -0,0 +1,57 @@
+package cn.com.taiji.track.service.impl;
+
+import cn.com.taiji.track.entity.BeidouLocationEntity;
+import cn.com.taiji.track.mapper.BeidouLocationMapper;
+import cn.com.taiji.track.service.IBeidouLocationService;
+import cn.com.taiji.track.utils.LatLngUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.stereotype.Service;
+
+import java.math.BigDecimal;
+import java.util.*;
+
+/**
+ * @Description:
+ * @Author: hujie@umisoft.cn
+ * @Date: 2021/11/30 9:59 下午
+ */
+@Service
+@Slf4j
+public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper, BeidouLocationEntity> implements IBeidouLocationService {
+    private Vector<BeidouLocationEntity> locationEntities= new Vector<>();
+
+    @Override
+    public void beidouDynamicShipToCache(List<ConsumerRecord<?, ?>> records){
+        try {
+            for (ConsumerRecord<?, ?> record : records) {
+                Optional message = Optional.ofNullable(record.value());
+                if (message.isPresent()) {
+                    Object msg = message.get();
+                    BeidouLocationEntity locationEntity = JSONObject.parseObject(msg.toString(), BeidouLocationEntity.class);
+                    locationEntity.setLocation(String.format("POINT(%s %s)",locationEntity.getLongitude(),locationEntity.getLatitude()));
+                    BigDecimal longitudeDecimal = new BigDecimal(locationEntity.getLongitude());
+                    locationEntity.setLongitude(LatLngUtil.latLng2Dfm(longitudeDecimal.doubleValue()));
+                    BigDecimal latitudeDecimal = new BigDecimal(locationEntity.getLatitude());
+                    locationEntity.setLatitude(LatLngUtil.latLng2Dfm(latitudeDecimal.doubleValue()));
+                    locationEntities.add(locationEntity);
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void cacheBeidouToMySql() {
+        Date begin = new Date();
+        if(locationEntities.size() > 0){
+            List<BeidouLocationEntity> saveList = new ArrayList<>(locationEntities);
+            locationEntities.clear();
+            saveBatch(saveList);
+//            log.info("位置数据写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
+        }
+    }
+}

+ 41 - 9
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouTrackServiceImpl.java

@@ -1,9 +1,11 @@
 package cn.com.taiji.track.service.impl;
 
 import cn.com.taiji.track.dto.IGeomesaTrackDTO;
+import cn.com.taiji.track.entity.BeidouLocationEntity;
 import cn.com.taiji.track.entity.BeidouTrackEntity;
 import cn.com.taiji.track.mapper.BeidouTrackMapper;
 import cn.com.taiji.track.service.IBeidouTrackService;
+import cn.hutool.json.JSONUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
@@ -22,9 +24,12 @@ import org.opengis.feature.simple.SimpleFeatureType;
 import org.opengis.filter.Filter;
 import org.opengis.filter.FilterFactory2;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
 import java.util.*;
 
 /**
@@ -44,7 +49,10 @@ public class BeidouTrackServiceImpl extends ServiceImpl<BeidouTrackMapper, Beido
     private RedisDataStore redisDataStore;
     @Autowired
     private IGeomesaTrackDTO geomesaTrackDTO;
-
+    @Resource
+    private KafkaTemplate kafkaTemplate;
+    @Value("${taiji.kafka.productor.beidou.topic}")
+    private String kafkaTopic;
     /**
      * 写入Redis
      * @param records
@@ -121,16 +129,14 @@ public class BeidouTrackServiceImpl extends ServiceImpl<BeidouTrackMapper, Beido
         } catch (Exception e) {
             e.printStackTrace();
         }
-        log.info("写入图层数据完成!耗时 {} 毫秒, 从 消费组 {} 中合计消费 {} 条记录", (new Date()).getTime() - begain.getTime(), geomesaTrackDTO.getKafkaConsumerGroup(), records.size());
+//        log.info("轨迹数据写入图层完成!耗时 {} 毫秒, 从 消费组 {} 中合计消费 {} 条记录", (new Date()).getTime() - begain.getTime(), geomesaTrackDTO.getKafkaConsumerGroup(), records.size());
     }
 
+    private  Vector<BeidouTrackEntity> trackEntities= new Vector<>();
 
     @Override
-    public void beidouDynamicShipToMySql(List<ConsumerRecord<?, ?>> records){
-        Date begain = null;
-        List<BeidouTrackEntity> entities = new ArrayList<>();
+    public void beidouDynamicShipToCache(List<ConsumerRecord<?, ?>> records){
         try {
-            begain = new Date();
             for (ConsumerRecord<?, ?> record : records) {
                 Optional message = Optional.ofNullable(record.value());
                 if (message.isPresent()) {
@@ -138,13 +144,39 @@ public class BeidouTrackServiceImpl extends ServiceImpl<BeidouTrackMapper, Beido
                     BeidouTrackEntity entity = JSONObject.parseObject(msg.toString(), BeidouTrackEntity.class);
                     entity.setCreateTime(new Date());
                     entity.setUpdateTime(new Date());
-                    entities.add(entity);
+                    trackEntities.add(entity);
                 }
             }
-            saveBatch(entities);
         } catch (Exception e) {
             e.printStackTrace();
         }
-        log.info("写入数据库完成!耗时 {} 毫秒, 合计消费 {} 条记录", (new Date()).getTime() - begain.getTime(), records.size());
+    }
+
+    @Override
+    public void cacheBeidouToMySql() {
+        Date begin = new Date();
+        if(trackEntities.size() > 0){
+            List<BeidouTrackEntity> saveList = new ArrayList<>(trackEntities);
+            trackEntities.clear();
+            saveBatch(saveList);
+            log.info("轨迹数据写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
+        }
+    }
+
+    /**
+     * 写入Kafka
+     * @param records
+     */
+    @Override
+    public void beidouDynamicShipToKafka(List<ConsumerRecord<?, ?>> records) {
+        Date begin = new Date();
+        for (ConsumerRecord<?, ?> record : records) {
+            Optional message = Optional.ofNullable(record.value());
+            Object msg = message.get();
+            BeidouTrackEntity entity = JSONObject.parseObject(msg.toString(), BeidouTrackEntity.class);
+            String data = JSONUtil.toJsonStr(entity);
+            kafkaTemplate.send(kafkaTopic, entity.getDeviceId(),data);
+        }
+//        log.info("轨迹数据转发kafka完成!耗时 {} 毫秒, 合计转发 {} 条记录", (new Date()).getTime() - begin.getTime(), records.size());
     }
 }

+ 77 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/ShipStatusServiceImpl.java

@@ -0,0 +1,77 @@
+package cn.com.taiji.track.service.impl;
+
+import cn.com.taiji.track.entity.ShipStatusEntity;
+import cn.com.taiji.track.mapper.ShipStatusMapper;
+import cn.com.taiji.track.service.IShipStatusService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * @author kok20
+ */
+@Service
+@Slf4j
+public class ShipStatusServiceImpl extends ServiceImpl<ShipStatusMapper, ShipStatusEntity> implements IShipStatusService {
+
+    private Vector<ShipStatusEntity> entities= new Vector<>();
+    private Vector<String> offLineDeviceIds= new Vector<>();
+    private Vector<String> onLineDeviceIds= new Vector<>();
+
+    @Override
+    public void pushOffLineDeviceId(String deviceId){
+        offLineDeviceIds.add(deviceId);
+    }
+
+    @Override
+    public void pushOnLineDeviceId(String deviceId) {
+        onLineDeviceIds.add(deviceId);
+    }
+    @Override
+    public void cacheToMySql() {
+        Date begin = new Date();
+        if(offLineDeviceIds.size() > 0){
+            List<String> idList = new ArrayList<>(offLineDeviceIds);
+            offLineDeviceIds.clear();
+            for (String id : idList) {
+                ShipStatusEntity entity = baseMapper.selectById(id);
+                if (entity == null) {
+                    entity = new ShipStatusEntity();
+                    entity.setDeviceId(id);
+                    entity.setIsOnline(ShipStatusEntity.OnlineStatus.OFFLINE);
+                    entity.setOnlineChangeTime(new Date());
+                    entities.add(entity);
+                }else if (!ShipStatusEntity.OnlineStatus.OFFLINE.equals(entity.getIsOnline())) {
+                    entity.setIsOnline(ShipStatusEntity.OnlineStatus.OFFLINE);
+                    entity.setOnlineChangeTime(new Date());
+                    entities.add(entity);
+                }
+            }
+        }
+        if(onLineDeviceIds.size() > 0){
+            List<String> idList = new ArrayList<>(onLineDeviceIds);
+            onLineDeviceIds.clear();
+            for (String id : idList) {
+                ShipStatusEntity entity = baseMapper.selectById(id);
+                if (entity == null) {
+                    entity = new ShipStatusEntity();
+                    entity.setDeviceId(id);
+                    entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
+                    entity.setOnlineChangeTime(new Date());
+                    entities.add(entity);
+                }else if (!ShipStatusEntity.OnlineStatus.ONLINE.equals(entity.getIsOnline())) {
+                    entity.setIsOnline(ShipStatusEntity.OnlineStatus.ONLINE);
+                    entity.setOnlineChangeTime(new Date());
+                    entities.add(entity);
+                }
+            }
+        }
+        saveBatch(entities);
+        log.info("船舶状态写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), entities.size());
+    }
+}

+ 67 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/WarningRecordServiceImpl.java

@@ -0,0 +1,67 @@
+package cn.com.taiji.track.service.impl;
+
+import cn.com.taiji.track.constants.WarningCodeConstants;
+import cn.com.taiji.track.entity.BeidouTrackEntity;
+import cn.com.taiji.track.entity.WarningRecordEntity;
+import cn.com.taiji.track.mapper.WarningRecordMapper;
+import cn.com.taiji.track.service.IShipStatusService;
+import cn.com.taiji.track.service.IWarningRecordService;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+
+/**
+ * @author kok20
+ */
+@Service
+@Slf4j
+public class WarningRecordServiceImpl extends ServiceImpl<WarningRecordMapper, WarningRecordEntity> implements IWarningRecordService {
+
+    @Autowired
+    protected IShipStatusService shipStatusService;
+
+    private  Vector<WarningRecordEntity> entities= new Vector<>();
+
+    @Override
+    public void warningRecordToCache(List<ConsumerRecord<?, ?>> records){
+        try {
+            for (ConsumerRecord<?, ?> record : records) {
+                Optional message = Optional.ofNullable(record.value());
+                if (message.isPresent()) {
+                    Object msg = message.get();
+                    WarningRecordEntity entity = JSONObject.parseObject(msg.toString(), WarningRecordEntity.class);
+                    BeidouTrackEntity track = JSONObject.parseObject(entity.getDynamicShip(), BeidouTrackEntity.class);
+                    entity.setDeviceId(track.getDeviceId());
+                    entity.setCreateTime(new Date());
+                    entity.setUpdateTime(new Date());
+                    entities.add(entity);
+                    if (WarningCodeConstants.OFFLINE_WARNING.equals(entity.getModelCode())) {
+                        if("true".equals(entity.getStart())){
+                            shipStatusService.pushOffLineDeviceId(entity.getDeviceId());
+                        }else{
+                            shipStatusService.pushOnLineDeviceId(entity.getDeviceId());
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void cacheToMySql() {
+        Date begin = new Date();
+        if(entities.size() > 0){
+            List<WarningRecordEntity> saveList = new ArrayList<>(entities);
+            entities.clear();
+            saveBatch(saveList);
+            log.info("预警数据写入数据库完成!耗时 {} 毫秒, 合计保存 {} 条记录", (new Date()).getTime() - begin.getTime(), saveList.size());
+        }
+    }
+}

+ 138 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/utils/LatLngUtil.java

@@ -0,0 +1,138 @@
+package cn.com.taiji.track.utils;
+
+
+/*************************************
+ *Class Name: LatLngUtil
+ *Description: <度分秒,度分,与经纬度互转工具类>
+ *@author: Seminar
+ *@create: 2021/5/24
+ *@since 1.0.0
+ *************************************/
+public class LatLngUtil {
+
+    /**
+     * 将经纬度转换为度分秒格式
+     *
+     * @param du 116.41884740.0897315
+     * @return 116°25'7.85"       40°5'23.03"
+     */
+    public static String latLng2Dfm(double du) {
+        int du1 = (int) du;
+        double tp = (du - du1) * 60;
+        int fen = (int) tp;
+        String miao = String.format("%.0f", Math.abs(((tp - fen) * 60)));
+        return du1 + "°" + Math.abs(fen) + "'" + miao + "\"";
+    }
+    /**
+     * 度分秒转经纬度
+     *
+     * @param dms 116°25'7.85"
+     * @return 116.418847
+     */
+    public static double dfm2LatLng(String dms) {
+        if (dms == null) {
+            return 0;
+        }
+        try {
+            dms = dms.replace(" ", "");
+            String[] str2 = dms.split("°");
+            if (str2.length < 2) {
+                return 0;
+            }
+            int d = Integer.parseInt(str2[0]);
+            String[] str3 = str2[1].split("\'");
+            if (str3.length < 2) {
+                return 0;
+            }
+            int f = Integer.parseInt(str3[0]);
+            String str4 = str3[1].substring(0, str3[1].length() - 1);
+            double m = Double.parseDouble(str4);
+
+            double fen = f + (m / 60);
+            double du = (fen / 60) + Math.abs(d);
+            if (d < 0) {
+                du = -du;
+            }
+            return Double.parseDouble(String.format("%.7f", du));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return 0;
+    }
+
+    /**
+     * 将经纬度转换为度分格式
+     *
+     * @param du 116.418847      40.0897315
+     * @return 116°25'  40°5'
+     */
+    private static String latLng2Df(double du) {
+        int du1 = (int) du;
+        double tp = (du - du1) * 60;
+        int fen = (int) tp;
+        return du1 + "°" + Math.abs(fen) + "'";
+    }
+
+    /**
+     * 度分转经纬度
+     * 全球经纬度的取值范围为:纬度-90~90,经度-180~180
+     * 度分转换: 将度分单位数据转换为度单位数据,公式:度=度+分/60
+     * 例如: 经度 = 116°20.12',纬度 = 39°12.34'
+     * 经度 = 116 + 20.12 / 60 = 116.33533°
+     * 纬度 = 39 + 12.34 / 60 = 39.20567°
+     *
+     * @param dm 4005.38389 ddmm.mmmmm
+     * @return 40.0897315
+     * 11616.02846 dddmm.mmmmm
+     * 116.267141
+     */
+    public static double df2LatLng(String dm) {
+        if (dm == null) {
+            return 0;
+        }
+        try {
+            dm = dm.replace(" ", "");
+            int d = parseInteger(dm.substring(0, dm.lastIndexOf(".") - 2));
+            // 兼容经纬度的转换
+            double fen = Double.parseDouble(dm.substring(String.valueOf(d).length()));
+
+            double lat = (fen / 60) + Math.abs(d);
+            if (lat < 0) {
+                lat = -lat;
+            }
+            return Double.parseDouble(String.format("%.7f", lat));
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return 0;
+    }
+
+    /**
+     * dms转经纬度
+     * 41951158     14371952
+     * 116.5309944  39.9220889
+     *
+     * @param dms dms 经纬度
+     * @return
+     */
+    public static double dfm2Du(Double dms) {
+        int degree = (int) (dms / 360000);
+        int minute = (int) ((dms - degree * 360000) / 6000);
+        double second = (double) ((dms - degree * 360000 - minute * 6000) / 100.00);
+        double digitalDegree = 0.0;
+        double num = 60;
+        digitalDegree += degree;
+        digitalDegree += minute / num;
+        digitalDegree += (second / (num * num));
+
+        return Double.parseDouble(String.format("%.7f", digitalDegree));
+    }
+
+    public static Integer parseInteger(String str) {
+        if (str == null || str.isEmpty()) {
+            return 0;
+        }
+        return Integer.valueOf(str);
+    }
+
+}

+ 180 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/utils/StarRocksStreamLoad.java

@@ -0,0 +1,180 @@
+//package cn.com.taiji.track.utils;// Copyright (c) 2021 Beijing Dingshi Zongheng Technology Co., Ltd. All rights reserved.
+////
+//// Licensed to the Apache Software Foundation (ASF) under one
+//// or more contributor license agreements.  See the NOTICE file
+//// distributed with this work for additional information
+//// regarding copyright ownership.  The ASF licenses this file
+//// to you under the Apache License, Version 2.0 (the
+//// "License"); you may not use this file except in compliance
+//// with the License.  You may obtain a copy of the License at
+////
+////   http://www.apache.org/licenses/LICENSE-2.0
+////
+//// Unless required by applicable law or agreed to in writing,
+//// software distributed under the License is distributed on an
+//// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//// KIND, either express or implied.  See the License for the
+//// specific language governing permissions and limitations
+//// under the License.
+//
+//import org.apache.commons.codec.binary.Base64;
+//import org.apache.http.HttpHeaders;
+//import org.apache.http.client.methods.CloseableHttpResponse;
+//import org.apache.http.client.methods.HttpPut;
+//import org.apache.http.entity.StringEntity;
+//import org.apache.http.impl.client.CloseableHttpClient;
+//import org.apache.http.impl.client.DefaultRedirectStrategy;
+//import org.apache.http.impl.client.HttpClientBuilder;
+//import org.apache.http.impl.client.HttpClients;
+//import org.apache.http.util.EntityUtils;
+//
+//import java.io.IOException;
+//import java.nio.charset.StandardCharsets;
+///**
+// * This class is a java demo for starrocks stream load
+// *
+// * The pom.xml dependency:
+// *
+// *         <dependency>
+// *             <groupId>org.apache.httpcomponents</groupId>
+// *             <artifactId>httpclient</artifactId>
+// *             <version>4.5.3</version>
+// *         </dependency>
+// *
+// * How to use:
+// *
+// * 1 create a table in starrocks with any mysql client
+// *
+// * CREATE TABLE `stream_test` (
+// *   `id` bigint(20) COMMENT "",
+// *   `id2` bigint(20) COMMENT "",
+// *   `username` varchar(32) COMMENT ""
+// * ) ENGINE=OLAP
+// * DUPLICATE KEY(`id`)
+// * DISTRIBUTED BY HASH(`id`) BUCKETS 20;
+// *
+// *
+// * 2 change the StarRocks cluster, db, user config in this class
+// *
+// * 3 run this class, you should see the following output:
+// *
+// * {
+// *     "TxnId": 27,
+// *     "Label": "39c25a5c-7000-496e-a98e-348a264c81de",
+// *     "Status": "Success",
+// *     "Message": "OK",
+// *     "NumberTotalRows": 10,
+// *     "NumberLoadedRows": 10,
+// *     "NumberFilteredRows": 0,
+// *     "NumberUnselectedRows": 0,
+// *     "LoadBytes": 50,
+// *     "LoadTimeMs": 151
+// * }
+// *
+// * Attention:
+// *
+// * 1 wrong dependency version(such as 4.4) of httpclient may cause shaded.org.apache.http.ProtocolException
+// *   Caused by: shaded.org.apache.http.ProtocolException: Content-Length header already present
+// *     at shaded.org.apache.http.protocol.RequestContent.process(RequestContent.java:96)
+// *     at shaded.org.apache.http.protocol.ImmutableHttpProcessor.process(ImmutableHttpProcessor.java:132)
+// *     at shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:182)
+// *     at shaded.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:88)
+// *     at shaded.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110)
+// *     at shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:184)
+// *
+// *2 run this class more than once, the status code for http response is still ok, and you will see
+// *  the following output:
+// *
+// * {
+// *     "TxnId": -1,
+// *     "Label": "39c25a5c-7000-496e-a98e-348a264c81de",
+// *     "Status": "Label Already Exists",
+// *     "ExistingJobStatus": "FINISHED",
+// *     "Message": "Label [39c25a5c-7000-496e-a98e-348a264c81de"] has already been used.",
+// *     "NumberTotalRows": 0,
+// *     "NumberLoadedRows": 0,
+// *     "NumberFilteredRows": 0,
+// *     "NumberUnselectedRows": 0,
+// *     "LoadBytes": 0,
+// *     "LoadTimeMs": 0
+// * }
+// * 3 when the response statusCode is 200, that doesn't mean your stream load is ok, there may be still
+// *   some stream problem unless you see the output with 'ok' message
+// */
+//public class StarRocksStreamLoad {
+//    private final static String STARROCKS_HOST = "xxx.com";
+//    private final static String STARROCKS_DB = "test";
+//    private final static String STARROCKS_TABLE = "stream_test";
+//    private final static String STARROCKS_USER = "root";
+//    private final static String STARROCKS_PASSWORD = "xxx";
+//    private final static int STARROCKS_HTTP_PORT = 8030;
+//
+//    private void sendData(String content) throws Exception {
+//        final String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
+//                STARROCKS_HOST,
+//                STARROCKS_HTTP_PORT,
+//                STARROCKS_DB,
+//                STARROCKS_TABLE);
+//
+//        final HttpClientBuilder httpClientBuilder = HttpClients
+//                .custom()
+//                .setRedirectStrategy(new DefaultRedirectStrategy() {
+//                    @Override
+//                    protected boolean isRedirectable(String method) {
+//                        return true;
+//                    }
+//                });
+//
+//        try (CloseableHttpClient client = httpClientBuilder.build()) {
+//            HttpPut put = new HttpPut(loadUrl);
+//            StringEntity entity = new StringEntity(content, "UTF-8");
+//            put.setHeader(HttpHeaders.EXPECT, "100-continue");
+//            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(STARROCKS_USER, STARROCKS_PASSWORD));
+//            // the label header is optional, not necessary
+//            // use label header can ensure at most once semantics
+//            put.setHeader("label", "39c25a5c-7000-496e-a98e-348a264c81de");
+//            put.setEntity(entity);
+//
+//            try (CloseableHttpResponse response = client.execute(put)) {
+//                String loadResult = "";
+//                if (response.getEntity() != null) {
+//                    loadResult = EntityUtils.toString(response.getEntity());
+//                }
+//                final int statusCode = response.getStatusLine().getStatusCode();
+//                // statusCode 200 just indicates that starrocks be service is ok, not stream load
+//                // you should see the output content to find whether stream load is success
+//                if (statusCode != 200) {
+//                    throw new IOException(
+//                            String.format("Stream load failed, statusCode=%s load result=%s", statusCode, loadResult));
+//                }
+//
+//                System.out.println(loadResult);
+//            }
+//        }
+//    }
+//
+//    private String basicAuthHeader(String username, String password) {
+//        final String tobeEncode = username + ":" + password;
+//        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
+//        return "Basic " + new String(encoded);
+//    }
+//
+//    public static void main(String[] args) throws Exception {
+//        int id1 = 1;
+//        int id2 = 10;
+//        String id3 = "Simon";
+//        int rowNumber = 10;
+//        String oneRow = id1 + "\t" + id2 + "\t" + id3 + "\n";
+//
+//        StringBuilder stringBuilder = new StringBuilder();
+//        for (int i = 0; i < rowNumber; i++) {
+//            stringBuilder.append(oneRow);
+//        }
+//
+//        stringBuilder.deleteCharAt(stringBuilder.length() - 1);
+//
+//        String loadData = stringBuilder.toString();
+//        StarRocksStreamLoad starrocksStreamLoad = new StarRocksStreamLoad();
+//        starrocksStreamLoad.sendData(loadData);
+//    }
+//}

+ 17 - 9
beidou-track-geomesa/src/main/resources/application-dev.yml

@@ -6,16 +6,16 @@ webSocket:
   writerIdleTime: 120  #写空闲超时时间设置(Netty心跳检测配置)
   allIdleTime: 120     #读写空闲超时时间设置(Netty心跳检测配置)
   netty:
-    port: 6999
+    port: 7000
     path: /webSocket
     readerIdleTime: 120  #读空闲超时时间设置(Netty心跳检测配置)
     writerIdleTime: 120  #写空闲超时时间设置(Netty心跳检测配置)
     allIdleTime: 120     #读写空闲超时时间设置(Netty心跳检测配置)
   history-track:
-    port: 6998
+    port: 7001
     path: /historytrack
   focus-track:
-    port: 6997
+    port: 7002
     path: /focustrack
 spring:
   datasource:
@@ -106,9 +106,17 @@ spring:
 
 taiji:
   geomesa.store.redis.config: redis.url===redis://localhost:6379###redis.catalog===geomesa###redis.connection.pool.size===100###redis.connection.pool.validate===true###geomesa.stats.enable===false###geomesa.stats.generate===false###geomesa.query.caching===false
-  kafka.consumer:
-    beidou:
-      enable: true
-      expiry: 70
-      topic: sgAxBeidouTrack
-      group: sgAxBeidouTrack—${random.uuid}
+  kafka:
+    productor:
+      beidou:
+        topic: taiji_ax_beidou_dynamic_ship
+    consumer:
+      bootstrap-servers: 120.25.232.213:9092
+      beidou:
+        enable: true
+        expiry: 70
+        topic: sgAxBeidouTrack
+        group: sgAxBeidouTrack—${random.uuid}
+      warning:
+        topic: taiji_ax_ship_warning_record
+        group: taiji_ax_ship_warning_record—${random.uuid}

+ 72 - 204
beidou-track-geomesa/src/main/resources/application-prod.yml

@@ -1,72 +1,62 @@
 server:
-  port: 8786
-
-
+  port: 7007
+#netty的配置信息
+webSocket:
+  readerIdleTime: 120  #读空闲超时时间设置(Netty心跳检测配置)
+  writerIdleTime: 120  #写空闲超时时间设置(Netty心跳检测配置)
+  allIdleTime: 120     #读写空闲超时时间设置(Netty心跳检测配置)
+  netty:
+    port: 7000
+    path: /webSocket
+    readerIdleTime: 120  #读空闲超时时间设置(Netty心跳检测配置)
+    writerIdleTime: 120  #写空闲超时时间设置(Netty心跳检测配置)
+    allIdleTime: 120     #读写空闲超时时间设置(Netty心跳检测配置)
+  history-track:
+    port: 7001
+    path: /historytrack
+  focus-track:
+    port: 7002
+    path: /focustrack
 spring:
   datasource:
-    druid:
-      master:
-        type: com.alibaba.druid.pool.DruidDataSource
-        url: jdbc:mysql://74.10.28.2:3389/geodb?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
-        driver-class-name: com.mysql.cj.jdbc.Driver
-        username: ax_tj_seat
-        password: Taiji@2022#seat
-      slave:
-        enabled: true
-        type: com.alibaba.druid.pool.DruidDataSource
-        driverClassName: org.postgresql.Driver
-        url: jdbc:postgresql://74.10.28.55:5432/structure-warning-db?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
-        username: postgres
-        password: postgres
-      slave1:
-        enabled: true
-        type: com.alibaba.druid.pool.DruidDataSource
-        driverClassName: org.postgresql.Driver
-        url: jdbc:postgresql://74.10.28.55:5432/structure-warning-db?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
-        username: postgres
-        password: postgres
-      # 初始连接数
-      initialSize: 5
-      # 最小连接池数量
-      minIdle: 10
-      # 最大连接池数量
-      maxActive: 3000
-      # 配置获取连接等待超时的时间
-      maxWait: 6000
-      # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
-      timeBetweenEvictionRunsMillis: 1000
-      # 配置一个连接在池中最小生存的时间,单位是毫秒
-      minEvictableIdleTimeMillis: 5000
-      # 配置一个连接在池中最大生存的时间,单位是毫秒
-      maxEvictableIdleTimeMillis: 9000
-      # 配置检测连接是否有效
-      validationQuery: SELECT 1
-      testWhileIdle: true
-      testOnBorrow: false
-      testOnReturn: false
-      webStatFilter:
-        enabled: true
-      statViewServlet:
-        enabled: true
-        # 设置白名单,不填则允许所有访问
-        allow:
-        url-pattern: /druid/*
-        # 控制台管理用户名和密码
-        login-username: admin
-        login-password: 123456
-      filter:
-        stat:
-          enabled: true
-          # 慢SQL记录
-          log-slow-sql: true
-          slow-sql-millis: 1000
-          merge-sql: true
-        wall:
-          config:
-            multi-statement-allow: true
-  # kafka
+    type: com.alibaba.druid.pool.DruidDataSource
+    driverClassName: com.mysql.cj.jdbc.Driver
+    url: jdbc:mysql://10.112.89.101:9030/ax_beidou?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
+    username: root
+    password: ROOT@taiji2022!
+    # 初始连接数
+    initialSize: 5
+    # 最小连接池数量
+    minIdle: 10
+    # 最大连接池数量
+    maxActive: 20
+    # 配置获取连接等待超时的时间
+    maxWait: 60000
+    # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+    timeBetweenEvictionRunsMillis: 60000
+    # 配置一个连接在池中最小生存的时间,单位是毫秒
+    minEvictableIdleTimeMillis: 300000
+    # 配置一个连接在池中最大生存的时间,单位是毫秒
+    maxEvictableIdleTimeMillis: 900000
+    # 配置检测连接是否有效
+    validationQuery: SELECT 1
   kafka:
-    bootstrap-servers: app2833:9094,app2834:9094,app2835:9094,app2836:9094,app2837:9094
+    bootstrap-servers: 10.112.89.239:9092
+    producer:
+      # 发生错误后,消息重发的次数。
+      retries: 0
+      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
+      batch-size: 16384
+      # 设置生产者内存缓冲区的大小。
+      buffer-memory: 33554432
+      # 键的序列化方式
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      # 值的序列化方式
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
+      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
+      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
+      acks: 1
     consumer:
       # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
       auto-commit-interval: 1S
@@ -78,20 +68,6 @@ spring:
       enable-auto-commit: false
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
-    producer:
-      retries: 3
-      batch-size: 16384
-      buffer-memory: 33554432
-      key-serializer: org.apache.kafka.common.serialization.StringSerializer
-      value-serializer: org.apache.kafka.common.serialization.StringSerializer
-      acks: 1
-    properties:
-      security:
-        protocol: SASL_PLAINTEXT
-      sasl:
-        mechanism: SCRAM-SHA-512
-        jaas:
-          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="user01" password="8b9dcf43";'
     listener:
       # 在侦听器容器中运行的线程数。
       concurrency: 5
@@ -99,16 +75,15 @@ spring:
       ack-mode: manual_immediate
       missing-topics-fatal: false
       type: batch
-
   # redis 配置
   redis:
     # 地址
-    host: 74.10.28.106
+    host: 10.112.89.239
     port: 6379
     # 数据库索引
     database: 3
     # 密码
-    password: 
+    password:
     # 连接超时时间
     timeout: 10s
     lettuce:
@@ -121,126 +96,19 @@ spring:
         max-active: 8
         # #连接池最大阻塞等待时间(使用负值表示没有限制)
         max-wait: -1ms
-
-geoserver:
-   redis:
-     url: redis://74.10.28.106:6379
-     catalog: geomesa
-   kafka:
-     brokers: 74.10.28.27:9092
-     zookeepers: 74.10.28.27:2181
-
-# other config
 taiji:
-  elasticsearch.rest:
-    uris: 74.10.28.65:9200,74.10.28.66:9200,74.10.28.67:9200,74.10.28.68:9200,74.10.28.69:9200
-    username: ax_seat       #如果你设置了基于x-pack的验证就要填写账号和密码
-    password: ax_seat       #没有则不用配置
-    connection-timeout: 100 #连接超时
-    max-connection: 100  #最大连接数
-  kafka.consumer:
-    geomesa-beidou-ship:
-      topic: geomesa_beidou_ship_topic
-      group: ${random.uuid}
-    geomesa-fusion-ship:
-      topic: geomesa_fusion_ship_topic
-      group: ${random.uuid}
-    beidou-dynamic-ship:
-      group: ${random.uuid}
-      topic: 'taiji_ax_beidou_dynamic_ship'
-      partitions0: 0
-      partitions1: 1
-      partitions2: 2
-      partitions3: 3
-      partitions4: 4
-    dynamic-ship-track:
-      topic: 'taiji_ax_ship_dynamic_fusion'
-      id-key: 'merge_target'
-      longitude-key: 'target_longitude'
-      latitude-key: 'target_latitude'
-      group: ${random.uuid}
-      partitions0: 0
-      partitions1: 1
-      partitions2: 2
-      partitions3: 3
-      partitions4: 4
-    ztpt_dynamic_ais:
-      group: ${random.uuid}
-      partitions0: 0
-      partitions1: 1
-      partitions2: 2
-      partitions3: 3
-      partitions4: 4
-      topic: 'taiji_ax_ztpt_dynamic_ais'
-      id-key: 'userid'
-      longitude-key: 'longitude'
-      latitude-key: 'latitude'
-    hlx_ax_dynamic_target_ais:
-      group: ${random.uuid}
-      topic: 'hlx_ax_dynamic_target_ais'
-      partitions0: 0
-      partitions1: 1
-      partitions2: 2
-      partitions3: 3
-      partitions4: 4
-    hlx_ax_dynamic_target_radar:
-      group: ${random.uuid}
-      topic: 'hlx_ax_dynamic_target_radar'
-      partitions0: 0
-      partitions1: 1
-      partitions2: 2
-      partitions3: 3
-      partitions4: 4
-    hlx_ax_dynamic_target_zyh_radar:
-      group: ${random.uuid}
-      topic: 'hlx_ax_dynamic_target_zyh_radar'
-      partitions0: 0
-      partitions1: 1
-      partitions2: 2
-      partitions3: 3
-      partitions4: 4
-    tianao_radar_fusion:
-      group: ${random.uuid}
-      topic: 'taiji_ax_tianao_radar_fusion'
-      latitude-key: 'latitude'
-      longitude-key: 'longitude'
-      id-key: 'fusionBatchNum'
-      partitions0: 0
-      partitions1: 1
-      partitions2: 2
-      partitions3: 3
-      partitions4: 4
-    prefix:
-      ship_borne_terminal_redis_key_prefix: ship_borne_terminal_118_
-      redis_scan_count: 1
-  async-thread-pool:
-    beidou-dynamic-ship:
-      core-pool-size: 200
-      max-pool-size: 200
-      queue-capacity: 200
-      name-prefix: beidou-dynamic-ship-
-
-# 开启雷达船过滤
-filter:
-  radar: false
-
-# 定时任务时间配置
-time:
-  radar: 0 0/2 * * * ?
-  other: 0 0/1 * * * ?
-  ais: 0 0/1 * * * ?
-  hlxais: 0 0/1 * * * ?
-  hlxdynamicradar: 0 0/1 * * * ?
-  zyhradar: 0 0/1 * * * ?
-  tianao: 0 0/1 * * * ?
-
-# 拆分船舶最后一次位置消费
-ship:
-  track:
-    beidou: true
-    fusion: false
-    hlxRadar: false
-    hlxAis: false
-    zyhRadar: false
-    tianaoRadar: false
-    ais: false
+  geomesa.store.redis.config: redis.url===redis://10.112.89.239:6379###redis.catalog===geomesa###redis.connection.pool.size===100###redis.connection.pool.validate===true###geomesa.stats.enable===false###geomesa.stats.generate===false###geomesa.query.caching===false
+  kafka:
+    productor:
+      beidou:
+        topic: taiji_ax_beidou_dynamic_ship
+    consumer:
+      bootstrap-servers: 172.28.19.20:29092 #废弃
+      beidou:
+        enable: true
+        expiry: 70
+        topic: sgAxBeidouTrack
+        group: sgAxBeidouTrack—${random.uuid}
+      warning:
+        topic: taiji_ax_ship_warning_record
+        group: taiji_ax_ship_warning_record—${random.uuid}

+ 1 - 1
beidou-track-geomesa/src/main/resources/application.yml

@@ -1,3 +1,3 @@
 spring:
   profiles:
-    active: dev
+    active: prod