Sfoglia il codice sorgente

提交轨迹处理与轨迹ws接口

liangjianf 2 anni fa
parent
commit
1bbc4e906b
47 ha cambiato i file con 3375 aggiunte e 0 eliminazioni
  1. 4 0
      beidou-track-geomesa/README.md
  2. 132 0
      beidou-track-geomesa/pom.xml
  3. 21 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/GeoServerApplication.java
  4. 44 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/config/AbstractRedisDataStoreConfig.java
  5. 56 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/config/BeidouShipRedisDataStoreConfig.java
  6. 71 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/config/NettyConfig.java
  7. 48 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/config/RedisConfig.java
  8. 11 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/constants/ConstantsUtil.java
  9. 15 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/constants/TopicConstants.java
  10. 36 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/consumer/BeidouShipTrackConsumer.java
  11. 26 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/controller/BeidouController.java
  12. 178 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/dto/BeidouShipTrackDTO.java
  13. 18 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/dto/FocusTrackQueryDTO.java
  14. 27 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/dto/HistoryTrackQueryDTO.java
  15. 36 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/dto/IGeomesaTrackDTO.java
  16. 37 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/entity/BaseEntity.java
  17. 37 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/entity/BeidouTrackEntity.java
  18. 31 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/entity/FocusShipEntity.java
  19. 77 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/ChannelAddEvent.java
  20. 29 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/ChannelRemoveEvent.java
  21. 31 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/EventType.java
  22. 65 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/FocusTrackChannelAddEvent.java
  23. 29 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/FocusTrackChannelRemoveEvent.java
  24. 64 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/HistoryTrackChannelAddEvent.java
  25. 29 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/HistoryTrackChannelRemoveEvent.java
  26. 167 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/handler/FocusTrackHandler.java
  27. 169 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/handler/HistoryTrackHandler.java
  28. 209 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/handler/WebSocketHandler.java
  29. 154 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/listener/ChannelListener.java
  30. 164 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/listener/FocusTrackChannelListener.java
  31. 98 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/listener/HistoryTrackChannelListener.java
  32. 18 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/mapper/BeidouTrackMapper.java
  33. 19 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/mapper/FocusShipMapper.java
  34. 70 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/producer/BeidouShipTestProducer.java
  35. 72 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/serializer/FastJson2JsonRedisSerializer.java
  36. 152 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/server/FocusTrackServer.java
  37. 153 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/server/HistoryTrackServer.java
  38. 152 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/server/NettyServer.java
  39. 19 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/service/IBeidouTrackService.java
  40. 17 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/service/IFocusShipService.java
  41. 151 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/service/impl/BeidouTrackServiceImpl.java
  42. 42 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/service/impl/FocusShipServiceImpl.java
  43. 19 0
      beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/utils/RandomGroup.java
  44. 114 0
      beidou-track-geomesa/src/main/resources/application-dev.yml
  45. 246 0
      beidou-track-geomesa/src/main/resources/application-prod.yml
  46. 3 0
      beidou-track-geomesa/src/main/resources/application.yml
  47. 15 0
      beidou-track-geomesa/src/main/resources/log4j.properties

+ 4 - 0
beidou-track-geomesa/README.md

@@ -0,0 +1,4 @@
+GeoMesa Redis Quick-Start Tutorial
+==================================
+
+See the official GeoMesa [documentation](http://www.geomesa.org/documentation/tutorials/geomesa-quickstart-redis.html) for instructions.

+ 132 - 0
beidou-track-geomesa/pom.xml

@@ -0,0 +1,132 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <properties>
+        <geomesa.version>3.4.1</geomesa.version>
+        <gt.version>23.0</gt.version>
+        <scala.abi.version>2.11</scala.abi.version>
+        <hutool.version>5.8.5</hutool.version>
+        <fastjson.version>1.2.60</fastjson.version>
+    </properties>
+    <parent>
+        <artifactId>ax-beidou</artifactId>
+        <groupId>cn.com.taiji</groupId>
+        <version>1.0.0</version>
+    </parent>
+
+    <artifactId>beidou-track-geomesa</artifactId>
+    <name>GeoMesa Tutorials - Redis - Quick Start</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+        <!-- Mysql数据库依赖,跟随项目版本:5.7 -->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid-spring-boot-starter</artifactId>
+            <version>1.2.6</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mybatis.spring.boot</groupId>
+            <artifactId>mybatis-spring-boot-starter</artifactId>
+            <version>2.2.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.baomidou</groupId>
+            <artifactId>mybatis-plus-boot-starter</artifactId>
+            <version>3.5.1</version>
+        </dependency>
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>${hutool.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>${fastjson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mybatis.spring.boot</groupId>
+            <artifactId>mybatis-spring-boot-starter</artifactId>
+            <version>2.2.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.baomidou</groupId>
+            <artifactId>mybatis-plus-boot-starter</artifactId>
+            <version>3.5.1</version>
+        </dependency>
+        <!-- geomesa -->
+        <dependency>
+            <groupId>org.locationtech.geomesa</groupId>
+            <artifactId>geomesa-redis-datastore_${scala.abi.version}</artifactId>
+            <version>${geomesa.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.locationtech.geomesa</groupId>
+            <artifactId>geomesa-utils_${scala.abi.version}</artifactId>
+            <version>${geomesa.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.locationtech.geomesa</groupId>
+            <artifactId>geomesa-index-api_${scala.abi.version}</artifactId>
+            <version>${geomesa.version}</version>
+        </dependency>
+        <!--geotools的一揽子包-->
+        <dependency>
+            <groupId>org.geotools</groupId>
+            <artifactId>gt-epsg-wkt</artifactId>
+            <version>${gt.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.geotools</groupId>
+            <artifactId>gt-epsg-hsql</artifactId>
+            <version>${gt.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.geotools</groupId>
+            <artifactId>gt-opengis</artifactId>
+            <version>${gt.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.geotools</groupId>
+            <artifactId>gt-main</artifactId>
+            <version>${gt.version}</version>
+        </dependency>
+    </dependencies>
+
+    <repositories>
+        <repository>
+            <id>GeoSolutions</id>
+            <url>https://repo.osgeo.org/repository/release/</url>
+        </repository>
+    </repositories>
+</project>

+ 21 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/GeoServerApplication.java

@@ -0,0 +1,21 @@
+package cn.com.taiji.beidou;
+
+import org.mybatis.spring.annotation.MapperScan;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+/**
+ * @author chenfangchao
+ * @title: GeoServerApplication
+ * @projectName geoserver-project
+ * @description: TODO
+ * @date 2022/8/11 3:17 PM
+ */
+@SpringBootApplication
+public class GeoServerApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(GeoServerApplication.class,args);
+    }
+}

+ 44 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/config/AbstractRedisDataStoreConfig.java

@@ -0,0 +1,44 @@
+package cn.com.taiji.beidou.config;
+
+import cn.com.taiji.beidou.dto.IGeomesaTrackDTO;
+import org.geotools.data.DataStoreFinder;
+import org.locationtech.geomesa.redis.data.RedisDataStore;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import redis.clients.jedis.Jedis;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class AbstractRedisDataStoreConfig {
+    @Value("#{'${taiji.geomesa.store.redis.config}'.split('###')}")
+    private String[] redisStoreConfig;
+
+    @Bean(name = "redisDataStore")
+    public RedisDataStore initRedisDataStore() throws IOException {
+        Map<String, Serializable> parameters = new HashMap<>();
+        for (int i=0; i<redisStoreConfig.length; i++) {
+            String param[] = redisStoreConfig[i].split("===");
+            parameters.put(param[0], param[1]);
+        }
+        return (RedisDataStore) DataStoreFinder.getDataStore(parameters);
+    }
+
+    @Bean(name = "redisClient")
+    public Jedis redisClient() {
+        Jedis client = null;
+        for (int i=0; i<redisStoreConfig.length; i++) {
+            String param[] = redisStoreConfig[i].split("===");
+            if ("redis.url".equals(param[0])) {
+                client = new Jedis(param[1]);
+                break;
+            }
+        }
+        return client;
+    }
+
+    abstract SimpleFeatureType sft(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaTrackDTO) throws IOException;
+}

+ 56 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/config/BeidouShipRedisDataStoreConfig.java

@@ -0,0 +1,56 @@
+package cn.com.taiji.beidou.config;
+
+import cn.com.taiji.beidou.dto.BeidouShipTrackDTO;
+import cn.com.taiji.beidou.dto.IGeomesaTrackDTO;
+import cn.hutool.core.util.StrUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.locationtech.geomesa.redis.data.RedisDataStore;
+import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.opengis.filter.Filter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.io.IOException;
+
+/**
+ * @author kok20
+ */
+@Slf4j
+@Configuration
+@ConditionalOnProperty(name = "taiji.kafka.consumer.beidou.enable",havingValue = "true")
+public class BeidouShipRedisDataStoreConfig extends AbstractRedisDataStoreConfig {
+
+    @Value("${taiji.kafka.consumer.beidou.group}")
+    private String group;
+    @Value("${taiji.kafka.consumer.beidou.expiry}")
+    private Integer expiry;
+
+    @Override
+    @Bean(name = "sft")
+    public SimpleFeatureType sft(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaTrackDTO) throws IOException {
+        SimpleFeatureType sft = SimpleFeatureTypes.createType(geomesaTrackDTO.getTypeName(),  geomesaTrackDTO.getSimpleFeatureType().toString());
+        sft.getUserData().put("geomesa.feature.expiry", StrUtil.format("syncTime({} seconds)", expiry));
+
+        try {
+            redisDataStore.getFeatureSource(geomesaTrackDTO.getTypeName()).removeFeatures(Filter.INCLUDE);
+            redisDataStore.removeSchema(geomesaTrackDTO.getTypeName());
+        } catch (Exception e) {
+            log.error("可能属于首次构建,SCHEMA 并不存在");
+            e.printStackTrace();
+        }
+
+        redisDataStore.createSchema(sft);
+        return sft;
+    }
+
+    @Bean(name = "geomesaTrackDTO")
+    public BeidouShipTrackDTO geomesaTrackDTO() {
+        BeidouShipTrackDTO dto = new BeidouShipTrackDTO();
+        dto.setKafkaConsumerGroup(group);
+        dto.setExpirySeconds(expiry);
+        return dto;
+    }
+}

+ 71 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/config/NettyConfig.java

@@ -0,0 +1,71 @@
+package cn.com.taiji.beidou.config;
+
+import io.netty.channel.Channel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+
+public class NettyConfig {
+
+    /**
+     * 定义一个channel组,管理所有的channel
+     */
+    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+    /**
+     * 存放用户与Chanel的对应信息,用于给指定用户发送消息
+     */
+    private static ConcurrentHashMap<String, Object> userChannelMap = new ConcurrentHashMap<>();
+
+    private NettyConfig() {}
+
+    /**
+     * 获取channel组
+     * @return
+     */
+    public static ChannelGroup getChannelGroup() {
+        return channelGroup;
+    }
+
+    /**
+     * 判断一个通道是否有用户在使用
+     * 可做信息转发时判断该通道是否合法
+     * @param channel
+     * @return
+     */
+    public static boolean hasUser(Channel channel) {
+        AttributeKey<String> key = AttributeKey.valueOf("user");
+        return (channel.hasAttr(key) || channel.attr(key).get() != null);
+    }
+
+
+    /**
+     * 根据用户id获取该用户的通道
+     * @param userId
+     * @return
+     */
+    public static Object getChannelByUserId(String userId) {
+        return userChannelMap.get(userId);
+    }
+
+    /**
+     * 判断一个用户是否在线
+     * @param userId
+     * @return
+     */
+    public static Boolean online(String userId) {
+        return userChannelMap.containsKey(userId) && userChannelMap.get(userId) != null;
+    }
+
+    /**
+     * 获取用户channel map
+     * @return
+     */
+    public static ConcurrentHashMap<String, Object> getUserChannelMap(){
+        return userChannelMap;
+    }
+}

+ 48 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/config/RedisConfig.java

@@ -0,0 +1,48 @@
+package cn.com.taiji.beidou.config;
+
+import cn.com.taiji.beidou.serializer.FastJson2JsonRedisSerializer;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.springframework.cache.annotation.CachingConfigurerSupport;
+import org.springframework.cache.annotation.EnableCaching;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.redis.connection.RedisConnectionFactory;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.serializer.StringRedisSerializer;
+
+/**
+ * redis配置
+ * 
+ * @author seat
+ */
+@Configuration
+@EnableCaching
+public class RedisConfig extends CachingConfigurerSupport
+{
+    @Bean
+    @SuppressWarnings(value = { "unchecked", "rawtypes" })
+    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory connectionFactory)
+    {
+        RedisTemplate<Object, Object> template = new RedisTemplate<>();
+        template.setConnectionFactory(connectionFactory);
+
+        FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
+
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
+        serializer.setObjectMapper(mapper);
+
+        // 使用StringRedisSerializer来序列化和反序列化redis的key值
+        template.setKeySerializer(new StringRedisSerializer());
+        template.setValueSerializer(serializer);
+
+        // Hash的key也采用StringRedisSerializer的序列化方式
+        template.setHashKeySerializer(new StringRedisSerializer());
+        template.setHashValueSerializer(serializer);
+
+        template.afterPropertiesSet();
+        return template;
+    }
+}

+ 11 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/constants/ConstantsUtil.java

@@ -0,0 +1,11 @@
+package cn.com.taiji.beidou.constants;
+
+/**
+ * @author wudskq
+ */
+public class ConstantsUtil {
+    //构建Redis-GeomMesa查询CQL
+    public static final String BEIDOU_REDIS_QUERY_CQL = "deviceId = ";
+
+
+}

+ 15 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/constants/TopicConstants.java

@@ -0,0 +1,15 @@
+package cn.com.taiji.beidou.constants;
+
+/**
+ * @author chenfangchao
+ * @title: TopicConstants
+ * @projectName geomesa-tutorials
+ * @description: TODO
+ * @date 2022/9/5 10:07 AM
+ */
+public class TopicConstants {
+
+    public static final String BEIDOU_SHIP = "taiji_ax_beidou_dynamic_ship";
+
+    public static final String FUSION_SHIP = "taiji_ax_ship_dynamic_fusion";
+}

+ 36 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/consumer/BeidouShipTrackConsumer.java

@@ -0,0 +1,36 @@
+package cn.com.taiji.beidou.consumer;
+
+import cn.com.taiji.beidou.service.IBeidouTrackService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.TopicPartition;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+
+/**
+ * @author wudskq
+ * TODO 实时消费Kafka北斗船舶数据存入到redis图层
+ */
+@Component
+@Slf4j
+@ConditionalOnProperty(name = "taiji.kafka.consumer.beidou.enable",havingValue = "true")
+public class BeidouShipTrackConsumer {
+
+    @Autowired
+    private IBeidouTrackService beidouTrackService;
+
+    @KafkaListener(
+            groupId = "${taiji.kafka.consumer.beidou.group}",
+            topics = {"${taiji.kafka.consumer.beidou.topic}"}
+    )
+    public void dynamicTrack(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        beidouTrackService.beidouDynamicShipToRedis(records);
+//        beidouTrackService.beidouDynamicShipToMySql(records);
+        ack.acknowledge();
+    }
+}

+ 26 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/controller/BeidouController.java

@@ -0,0 +1,26 @@
+package cn.com.taiji.beidou.controller;
+
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+
+
+/**
+ * @author kok20
+ */
+@RestController
+@RequestMapping("/beidou")
+public class BeidouController {
+
+
+    @Resource
+    private KafkaTemplate kafkaTemplate;
+
+    @GetMapping("/beidou")
+    public void  testBeidouShipData(){
+    }
+
+}

+ 178 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/dto/BeidouShipTrackDTO.java

@@ -0,0 +1,178 @@
+package cn.com.taiji.beidou.dto;
+
+import cn.hutool.core.util.StrUtil;
+import lombok.Data;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.util.factory.Hints;
+import org.opengis.feature.simple.SimpleFeature;
+
+import java.io.Serializable;
+import java.util.Date;
+
+
+/**
+ * @author kok20
+ */
+@Data
+public class BeidouShipTrackDTO extends IGeomesaTrackDTO implements Serializable {
+
+    private static final long serialVersionUID = -8983779632713666636L;
+
+    private String time;
+    private Long trackId;
+    private String deviceId;
+    private Long shipType;
+    private Long workType;
+    private Integer workWay;
+    private String sendTime;
+    private String locationTime;
+    private Integer online;
+    private Integer shipLength;
+    private Integer shipWidth;
+    private Integer texture;
+    private Double longitude;
+    private Double latitude;
+    private Double direction;
+    private Double speed;
+    private Integer kwh;
+    private String shipName;
+    private String location;
+
+    @Override
+    public boolean checkPoint() {
+        if (longitude != null && latitude != null) {
+            location = StrUtil.format("POINT ({} {})", longitude, latitude);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String getTypeName() { return "beidou-data"; }
+
+    @Override
+    public String getLayerType() {
+        return "geo_beidou_ship";
+    }
+
+    @Override
+    public String getWsLayerName() {
+        return "beidou-ship";
+    }
+
+    @Override
+    public String getFid() {
+        return deviceId;
+    }
+
+    @Override
+    public StringBuilder getSimpleFeatureType(){
+            StringBuilder attributes = new StringBuilder();
+            attributes.append("time:String,");
+            attributes.append("trackId:Long,");
+            attributes.append("deviceId:String:index=true,");
+            attributes.append("shipType:Long,");
+            attributes.append("workType:Long,");
+            attributes.append("workWay:Integer,");
+            attributes.append("sendTime:String,");
+            attributes.append("locationTime:String,");
+            attributes.append("online:Integer,");
+            attributes.append("shipLength:Integer,");
+            attributes.append("shipWidth:Integer,");
+            attributes.append("texture:Integer,");
+            attributes.append("longitude:Double,");
+            attributes.append("latitude:Double,");
+            attributes.append("direction:Double,");
+            attributes.append("speed:Double,");
+            attributes.append("kwh:Integer,");
+            attributes.append("shipName:String,");
+            attributes.append("*location:Point:srid=4326,");
+
+            attributes.append("layerType:String,");
+            attributes.append("syncTime:Date");
+            return attributes;
+    }
+
+    @Override
+    public SimpleFeature toSimpleFeature(SimpleFeatureBuilder builder, String fid) {
+        builder.set("time", time);
+        builder.set("trackId", trackId);
+        builder.set("deviceId", deviceId);
+        builder.set("shipType", shipType);
+        builder.set("workType", workType);
+        builder.set("workWay", workWay);
+        builder.set("sendTime", sendTime);
+        builder.set("locationTime", locationTime);
+        builder.set("online", online);
+        builder.set("shipLength", shipLength);
+        builder.set("shipWidth", shipWidth);
+        builder.set("texture", texture);
+        builder.set("longitude", longitude);
+        builder.set("latitude", latitude);
+        builder.set("direction", direction);
+        builder.set("speed", speed);
+        builder.set("kwh", kwh);
+        builder.set("shipName", shipName);
+
+        builder.set("location", location);
+
+        builder.set("layerType", getLayerType());
+        builder.set("syncTime", new Date());
+        builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
+        return builder.buildFeature(fid);
+    }
+
+    @Override
+    public String[] getPropList(){
+        return new String[] {
+                "time",
+                "trackId",
+//                "deviceId",
+                "shipType",
+                "workType",
+                "workWay",
+                "sendTime",
+                "locationTime",
+                "online",
+                "shipLength",
+                "shipWidth",
+                "texture",
+                "longitude",
+                "latitude",
+                "direction",
+                "speed",
+                "kwh",
+                "shipName",
+                "location",
+                "layerType",
+                "syncTime"
+        };
+    }
+
+    @Override
+    public Object[] getValueList() {
+        return new Object[] {
+                time,
+                trackId,
+//                deviceId,
+                shipType,
+                workType,
+                workWay,
+                sendTime,
+                locationTime,
+                online,
+                shipLength,
+                shipWidth,
+                texture,
+                longitude,
+                latitude,
+                direction,
+                speed,
+                kwh,
+                shipName,
+                location,
+                getLayerType(),
+                new Date()
+        };
+    }
+}

+ 18 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/dto/FocusTrackQueryDTO.java

@@ -0,0 +1,18 @@
+package cn.com.taiji.beidou.dto;
+
+import com.alibaba.fastjson.JSON;
+import lombok.Data;
+
+
+/**
+ * @author kok20
+ */
+@Data
+public class FocusTrackQueryDTO {
+    private String userId;
+
+    @Override
+    public String toString() {
+        return JSON.toJSONString(this);
+    }
+}

+ 27 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/dto/HistoryTrackQueryDTO.java

@@ -0,0 +1,27 @@
+package cn.com.taiji.beidou.dto;
+
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSON;
+import lombok.Data;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.util.factory.Hints;
+import org.opengis.feature.simple.SimpleFeature;
+
+import java.io.Serializable;
+import java.util.Date;
+
+
+/**
+ * @author kok20
+ */
+@Data
+public class HistoryTrackQueryDTO {
+    private String shipId;
+    private String startTime;
+    private String endTime;
+
+    @Override
+    public String toString() {
+        return JSON.toJSONString(this);
+    }
+}

+ 36 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/dto/IGeomesaTrackDTO.java

@@ -0,0 +1,36 @@
+package cn.com.taiji.beidou.dto;
+
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.opengis.feature.simple.SimpleFeature;
+
+public abstract class IGeomesaTrackDTO {
+
+    public String group;
+
+    public Integer expirySeconds;
+
+    public void setKafkaConsumerGroup(String group) {
+        this.group = group;
+    }
+    public String getKafkaConsumerGroup() {
+        return this.group;
+    }
+
+    public void setExpirySeconds(Integer expirySeconds) {
+        this.expirySeconds = expirySeconds;
+    }
+
+    public Integer getExpirySeconds() {
+        return expirySeconds;
+    }
+
+    public abstract boolean checkPoint();
+    public abstract String getTypeName();
+    public abstract String getLayerType();
+    public abstract String getWsLayerName();
+    public abstract String getFid();
+    public abstract StringBuilder getSimpleFeatureType();
+    public abstract SimpleFeature toSimpleFeature(SimpleFeatureBuilder builder, String fid);
+    public abstract String[] getPropList();
+    public abstract Object[] getValueList();
+}

+ 37 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/entity/BaseEntity.java

@@ -0,0 +1,37 @@
+package cn.com.taiji.beidou.entity;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableField;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Entity基类
+ * 
+ * @author seat
+ */
+@Data
+public class BaseEntity implements Serializable
+{
+    private static final long serialVersionUID = 1L;
+
+
+    /** id */
+    @TableId(type = IdType.ASSIGN_ID)
+    private String id;
+
+    /** 创建时间 */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
+    private Date createTime;
+
+    /** 更新时间 */
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
+    private Date updateTime;
+
+}

+ 37 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/entity/BeidouTrackEntity.java

@@ -0,0 +1,37 @@
+package cn.com.taiji.beidou.entity;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+/**
+ * @author wudskq
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("ax_beidou_track")
+public class BeidouTrackEntity extends BaseEntity {
+
+    private String time;
+    private Long trackId;
+    private String deviceId;
+    private Long shipType;
+    private Long workType;
+    private Integer workWay;
+    private String sendTime;
+    private String locationTime;
+    private Integer online;
+    private Integer shipLength;
+    private Integer shipWidth;
+    private Integer texture;
+    private Double longitude;
+    private Double latitude;
+    private Double direction;
+    private Double speed;
+    private Integer kwh;
+    private String shipName;
+}

+ 31 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/entity/FocusShipEntity.java

@@ -0,0 +1,31 @@
+package cn.com.taiji.beidou.entity;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.sql.Date;
+
+/**
+ * @Author CHEN
+ * @Date 2022/11/9 18:00
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@TableName("ax_beidou_focus")
+public class FocusShipEntity extends BaseEntity{
+    /**
+     * 重点关注id
+     */
+    private Integer focusId;
+    /**
+     * 关注船舶id
+     */
+    private String shipId;
+    /**
+     * 关注人id
+     */
+    private String userId;
+}

+ 77 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/ChannelAddEvent.java

@@ -0,0 +1,77 @@
+package cn.com.taiji.beidou.event;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * @author chenfangchao
+ * @title: NettyEvent
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+public class ChannelAddEvent extends ApplicationEvent {
+
+    private ChannelHandlerContext ctx;
+
+    private ChannelId channelId;
+
+    private String cql;
+
+    private String realCql;
+
+    public ChannelAddEvent(Object source, ChannelHandlerContext ctx, ChannelId channelId,String cql,String realCql) {
+        super(source);
+        this.ctx = ctx;
+        this.channelId = channelId;
+        if(null != cql){
+            this.cql = cql;
+        }
+        if(null != realCql){
+            this.realCql = realCql;
+        }
+    }
+
+    public ChannelHandlerContext getCtx() {
+        return ctx;
+    }
+
+    public void setCtx(ChannelHandlerContext ctx) {
+        this.ctx = ctx;
+    }
+
+    public ChannelId getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(ChannelId channelId) {
+        this.channelId = channelId;
+    }
+
+    public String getCql() {
+        return cql;
+    }
+
+    public void setCql(String cql) {
+        this.cql = cql;
+    }
+
+    public String getRealCql() {
+        return realCql;
+    }
+
+    public void setRealCql(String realCql) {
+        this.realCql = realCql;
+    }
+
+    @Override
+    public String toString() {
+        return "ChannelAddEvent{" +
+                "ctx=" + ctx +
+                ", channelId=" + channelId +
+                ", cql='" + cql + '\'' +
+                ", realCql='" + realCql + '\'' +
+                '}';
+    }
+}

+ 29 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/ChannelRemoveEvent.java

@@ -0,0 +1,29 @@
+package cn.com.taiji.beidou.event;
+
+import io.netty.channel.ChannelId;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * @author chenfangchao
+ * @title: ChannelRemoveEvent
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+public class ChannelRemoveEvent extends ApplicationEvent {
+
+    private ChannelId channelId;
+
+    public ChannelRemoveEvent(Object source, ChannelId channelId) {
+        super(source);
+        this.channelId = channelId;
+    }
+
+    public ChannelId getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(ChannelId channelId) {
+        this.channelId = channelId;
+    }
+}

+ 31 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/EventType.java

@@ -0,0 +1,31 @@
+package cn.com.taiji.beidou.event;
+
+
+import java.io.Serializable;
+
+public class EventType<T> implements Serializable {
+    private T data;
+    private String type;
+
+
+    public EventType(T data, String type) {
+        this.data = data;
+        this.type = type;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+}

+ 65 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/FocusTrackChannelAddEvent.java

@@ -0,0 +1,65 @@
+package cn.com.taiji.beidou.event;
+
+import cn.com.taiji.beidou.dto.FocusTrackQueryDTO;
+import cn.com.taiji.beidou.dto.HistoryTrackQueryDTO;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * @author chenfangchao
+ * @title: NettyEvent
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+public class FocusTrackChannelAddEvent extends ApplicationEvent {
+
+    private ChannelHandlerContext ctx;
+
+    private ChannelId channelId;
+
+    private FocusTrackQueryDTO dto;
+
+    public FocusTrackChannelAddEvent(Object source, ChannelHandlerContext ctx, ChannelId channelId, FocusTrackQueryDTO dto) {
+        super(source);
+        this.ctx = ctx;
+        this.channelId = channelId;
+        if(null != dto){
+            this.dto = dto;
+        }
+    }
+
+    public ChannelHandlerContext getCtx() {
+        return ctx;
+    }
+
+    public void setCtx(ChannelHandlerContext ctx) {
+        this.ctx = ctx;
+    }
+
+    public ChannelId getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(ChannelId channelId) {
+        this.channelId = channelId;
+    }
+
+    public FocusTrackQueryDTO getDto() {
+        return dto;
+    }
+
+    public void setDto(FocusTrackQueryDTO dto) {
+        this.dto = dto;
+    }
+
+    @Override
+    public String toString() {
+        return "ChannelAddEvent{" +
+                "ctx=" + ctx +
+                ", channelId=" + channelId +
+                ", dto='" + dto.toString() + '\'' +
+                '}';
+    }
+}

+ 29 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/FocusTrackChannelRemoveEvent.java

@@ -0,0 +1,29 @@
+package cn.com.taiji.beidou.event;
+
+import io.netty.channel.ChannelId;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * @author chenfangchao
+ * @title: ChannelRemoveEvent
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+public class FocusTrackChannelRemoveEvent extends ApplicationEvent {
+
+    private ChannelId channelId;
+
+    public FocusTrackChannelRemoveEvent(Object source, ChannelId channelId) {
+        super(source);
+        this.channelId = channelId;
+    }
+
+    public ChannelId getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(ChannelId channelId) {
+        this.channelId = channelId;
+    }
+}

+ 64 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/HistoryTrackChannelAddEvent.java

@@ -0,0 +1,64 @@
+package cn.com.taiji.beidou.event;
+
+import cn.com.taiji.beidou.dto.HistoryTrackQueryDTO;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * @author chenfangchao
+ * @title: NettyEvent
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+public class HistoryTrackChannelAddEvent extends ApplicationEvent {
+
+    private ChannelHandlerContext ctx;
+
+    private ChannelId channelId;
+
+    private HistoryTrackQueryDTO dto;
+
+    public HistoryTrackChannelAddEvent(Object source, ChannelHandlerContext ctx, ChannelId channelId, HistoryTrackQueryDTO dto) {
+        super(source);
+        this.ctx = ctx;
+        this.channelId = channelId;
+        if(null != dto){
+            this.dto = dto;
+        }
+    }
+
+    public ChannelHandlerContext getCtx() {
+        return ctx;
+    }
+
+    public void setCtx(ChannelHandlerContext ctx) {
+        this.ctx = ctx;
+    }
+
+    public ChannelId getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(ChannelId channelId) {
+        this.channelId = channelId;
+    }
+
+    public HistoryTrackQueryDTO getDto() {
+        return dto;
+    }
+
+    public void setDto(HistoryTrackQueryDTO dto) {
+        this.dto = dto;
+    }
+
+    @Override
+    public String toString() {
+        return "ChannelAddEvent{" +
+                "ctx=" + ctx +
+                ", channelId=" + channelId +
+                ", dto='" + dto.toString() + '\'' +
+                '}';
+    }
+}

+ 29 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/event/HistoryTrackChannelRemoveEvent.java

@@ -0,0 +1,29 @@
+package cn.com.taiji.beidou.event;
+
+import io.netty.channel.ChannelId;
+import org.springframework.context.ApplicationEvent;
+
+/**
+ * @author chenfangchao
+ * @title: ChannelRemoveEvent
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+public class HistoryTrackChannelRemoveEvent extends ApplicationEvent {
+
+    private ChannelId channelId;
+
+    public HistoryTrackChannelRemoveEvent(Object source, ChannelId channelId) {
+        super(source);
+        this.channelId = channelId;
+    }
+
+    public ChannelId getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(ChannelId channelId) {
+        this.channelId = channelId;
+    }
+}

+ 167 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/handler/FocusTrackHandler.java

@@ -0,0 +1,167 @@
+package cn.com.taiji.beidou.handler;
+
+import cn.com.taiji.beidou.config.NettyConfig;
+import cn.com.taiji.beidou.dto.FocusTrackQueryDTO;
+import cn.com.taiji.beidou.dto.HistoryTrackQueryDTO;
+import cn.com.taiji.beidou.event.FocusTrackChannelAddEvent;
+import cn.com.taiji.beidou.event.FocusTrackChannelRemoveEvent;
+import cn.com.taiji.beidou.event.HistoryTrackChannelAddEvent;
+import cn.com.taiji.beidou.event.HistoryTrackChannelRemoveEvent;
+import com.alibaba.fastjson.JSONObject;
+import io.netty.channel.*;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.stereotype.Component;
+
+import java.util.Objects;
+
+
+/**
+ * @author wudskq
+ */
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class FocusTrackHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
+
+    Logger logger = LoggerFactory.getLogger(FocusTrackHandler.class);
+
+    @Autowired
+    private ApplicationEventPublisher applicationEventPublisher;
+
+    /**
+     * 一旦连接,第一个被执行
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        log.info("handlerAdded 被调用" + ctx.channel().id().asLongText());
+        // 添加到channelGroup 通道组
+        NettyConfig.getChannelGroup().add(ctx.channel());
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        NettyConfig.getChannelGroup().add(ctx.channel());
+        log.info("与客户端建立连接,通道开启!信道ID为 " + ctx.channel().id());
+    }
+    //读就绪事件
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
+        FocusTrackQueryDTO dto = JSONObject.parseObject(msg.text(), FocusTrackQueryDTO.class);
+        log.info("本次会话(信道)查询条件为:" + msg.text());
+        if (!NettyConfig.getUserChannelMap().containsKey(dto)) {
+            NettyConfig.getUserChannelMap().put(ctx.channel().id().asShortText(),dto.toString());
+            // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
+            AttributeKey<String> key = AttributeKey.valueOf("focus-track");
+            ctx.channel().attr(key).setIfAbsent(dto.toString());
+            applicationEventPublisher.publishEvent(new FocusTrackChannelAddEvent(this,ctx,ctx.channel().id(), dto));
+            log.info("发送启动新线程event成功!");
+        } else {
+            // 前端定时请求,保持心跳连接,避免服务端误删通道
+            ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
+        }
+        // 回复消息
+//        ctx.channel().writeAndFlush(new TextWebSocketFrame("此次会话CQL-ID为 "+ ctx.channel().id().asShortText() + " 查询的条件为:" + dto.toString() + "与服务器成功通信!"));
+    }
+
+    /**
+     * 心跳检测相关方法 - 会主动调用handlerRemoved
+     *
+     * @param ctx
+     * @param evt
+     * @throws Exception
+     */
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            IdleStateEvent event = (IdleStateEvent) evt;
+            if (event.state() == IdleState.ALL_IDLE) {
+                //清除超时会话
+                ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
+                writeAndFlush.addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws Exception {
+                        ctx.channel().close();
+                    }
+                });
+            }
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+
+    /**
+     * 移除通道及关联用户
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+        log.info("handlerRemoved 被调用" + ctx.channel().id().asLongText());
+        // 删除通道
+        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
+        if (Objects.nonNull(channelGroup)) {
+            channelGroup.remove(ctx.channel());
+            NettyConfig.getUserChannelMap().remove(ctx.channel().id().asShortText());
+            applicationEventPublisher.publishEvent(new FocusTrackChannelRemoveEvent(this, ctx.channel().id()));
+        }
+        removeUserId(ctx);
+    }
+
+    /**
+     * 删除用户与channel的对应关系
+     *
+     * @param ctx
+     */
+    private void removeUserId(ChannelHandlerContext ctx) {
+        AttributeKey<String> key = AttributeKey.valueOf("userId");
+        String userId = ctx.channel().attr(key).get();
+        if (StringUtils.isNotBlank(userId)) {
+            NettyConfig.getUserChannelMap().remove(userId);
+            log.info("删除用户与channel的对应关系,userId:{}", userId);
+        }
+    }
+    /**
+     * 异常处理
+     *
+     * @param ctx
+     * @param cause
+     * @throws Exception
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.info("异常:{}", cause.getMessage());
+        // 删除通道
+        NettyConfig.getChannelGroup().remove(ctx.channel());
+        removeUserId(ctx);
+        ctx.close();
+    }
+
+    /**
+     * 推送消息给指定信道
+     *
+     * @param channelId
+     * @param msg
+     */
+    public void sendMsgToSpecifyChannel(ChannelId channelId, String msg) {
+        log.info("推送数据--{},到指定信道{}", msg, channelId);
+        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
+        Channel channel = channelGroup.find(channelId);
+        if (channel != null) {
+            channel.writeAndFlush(new TextWebSocketFrame(msg));
+        }
+    }
+}

+ 169 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/handler/HistoryTrackHandler.java

@@ -0,0 +1,169 @@
+package cn.com.taiji.beidou.handler;
+
+import cn.com.taiji.beidou.config.NettyConfig;
+import cn.com.taiji.beidou.dto.HistoryTrackQueryDTO;
+import cn.com.taiji.beidou.entity.BeidouTrackEntity;
+import cn.com.taiji.beidou.event.ChannelAddEvent;
+import cn.com.taiji.beidou.event.ChannelRemoveEvent;
+import cn.com.taiji.beidou.event.HistoryTrackChannelAddEvent;
+import cn.com.taiji.beidou.event.HistoryTrackChannelRemoveEvent;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import io.netty.channel.*;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.stereotype.Component;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author wudskq
+ */
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class HistoryTrackHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
+
+    Logger logger = LoggerFactory.getLogger(HistoryTrackHandler.class);
+
+    @Autowired
+    private ApplicationEventPublisher applicationEventPublisher;
+
+    /**
+     * 一旦连接,第一个被执行
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        log.info("handlerAdded 被调用" + ctx.channel().id().asLongText());
+        // 添加到channelGroup 通道组
+        NettyConfig.getChannelGroup().add(ctx.channel());
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        NettyConfig.getChannelGroup().add(ctx.channel());
+        log.info("与客户端建立连接,通道开启!信道ID为 " + ctx.channel().id());
+    }
+    //读就绪事件
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
+        HistoryTrackQueryDTO dto = JSONObject.parseObject(msg.text(), HistoryTrackQueryDTO.class);
+        log.info("本次会话(信道)查询条件为:" + msg.text());
+        if (!NettyConfig.getUserChannelMap().containsKey(dto)) {
+            NettyConfig.getUserChannelMap().put(ctx.channel().id().asShortText(),dto.toString());
+            // 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户ID
+            AttributeKey<String> key = AttributeKey.valueOf("history-track");
+            ctx.channel().attr(key).setIfAbsent(dto.toString());
+            applicationEventPublisher.publishEvent(new HistoryTrackChannelAddEvent(this,ctx,ctx.channel().id(), dto));
+            log.info("发送启动新线程event成功!");
+        } else {
+            // 前端定时请求,保持心跳连接,避免服务端误删通道
+            ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
+        }
+        // 回复消息
+//        ctx.channel().writeAndFlush(new TextWebSocketFrame("此次会话CQL-ID为 "+ ctx.channel().id().asShortText() + " 查询的条件为:" + dto.toString() + "与服务器成功通信!"));
+    }
+
+    /**
+     * 心跳检测相关方法 - 会主动调用handlerRemoved
+     *
+     * @param ctx
+     * @param evt
+     * @throws Exception
+     */
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            IdleStateEvent event = (IdleStateEvent) evt;
+            if (event.state() == IdleState.ALL_IDLE) {
+                //清除超时会话
+                ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
+                writeAndFlush.addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws Exception {
+                        ctx.channel().close();
+                    }
+                });
+            }
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+
+    /**
+     * 移除通道及关联用户
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+        log.info("handlerRemoved 被调用" + ctx.channel().id().asLongText());
+        // 删除通道
+        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
+        if (Objects.nonNull(channelGroup)) {
+            channelGroup.remove(ctx.channel());
+            NettyConfig.getUserChannelMap().remove(ctx.channel().id().asShortText());
+            applicationEventPublisher.publishEvent(new HistoryTrackChannelRemoveEvent(this, ctx.channel().id()));
+        }
+        removeUserId(ctx);
+    }
+
+    /**
+     * 删除用户与channel的对应关系
+     *
+     * @param ctx
+     */
+    private void removeUserId(ChannelHandlerContext ctx) {
+        AttributeKey<String> key = AttributeKey.valueOf("userId");
+        String userId = ctx.channel().attr(key).get();
+        if (StringUtils.isNotBlank(userId)) {
+            NettyConfig.getUserChannelMap().remove(userId);
+            log.info("删除用户与channel的对应关系,userId:{}", userId);
+        }
+    }
+    /**
+     * 异常处理
+     *
+     * @param ctx
+     * @param cause
+     * @throws Exception
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.info("异常:{}", cause.getMessage());
+        // 删除通道
+        NettyConfig.getChannelGroup().remove(ctx.channel());
+        removeUserId(ctx);
+        ctx.close();
+    }
+
+    /**
+     * 推送消息给指定信道
+     *
+     * @param channelId
+     * @param msg
+     */
+    public void sendMsgToSpecifyChannel(ChannelId channelId, String msg) {
+        log.info("推送数据--{},到指定信道{}", msg, channelId);
+        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
+        Channel channel = channelGroup.find(channelId);
+        if (channel != null) {
+            channel.writeAndFlush(new TextWebSocketFrame(msg));
+        }
+    }
+}

+ 209 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/handler/WebSocketHandler.java

@@ -0,0 +1,209 @@
+package cn.com.taiji.beidou.handler;
+
+import cn.com.taiji.beidou.config.NettyConfig;
+import cn.com.taiji.beidou.event.ChannelAddEvent;
+import cn.com.taiji.beidou.event.ChannelRemoveEvent;
+import com.alibaba.fastjson.JSONObject;
+import io.netty.channel.*;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.stereotype.Component;
+
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * @author wudskq
+ */
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
+
+    Logger logger = LoggerFactory.getLogger(WebSocketHandler.class);
+
+    @Autowired
+    private ApplicationEventPublisher applicationEventPublisher;
+
+    /**
+     * 一旦连接,第一个被执行
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+        log.info("handlerAdded 被调用" + ctx.channel().id().asLongText());
+        // 添加到channelGroup 通道组
+        NettyConfig.getChannelGroup().add(ctx.channel());
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        NettyConfig.getChannelGroup().add(ctx.channel());
+        System.out.println("与客户端建立连接,通道开启!信道ID为 " + ctx.channel().id());
+    }
+
+    /**
+     * 读取客户端发送的身份标识,推送相关数据
+     */
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
+        // 获取用户ID,关联channel
+        JSONObject jsonObject = JSONObject.parseObject(msg.text());
+        String cql = jsonObject.getString("cql");
+        String realCql = jsonObject.getString("realCql");
+        log.info("本次会话(信道)查询cql为:" + cql);
+        log.info("本次会话(信道)查询的实时cql为 "+ realCql);
+        if ((null != cql && !NettyConfig.getUserChannelMap().containsValue(cql)) || (null != realCql && !NettyConfig.getUserChannelMap().containsValue(realCql))) {
+            if(null != cql && !"".equals(cql) ){
+                NettyConfig.getUserChannelMap().put(ctx.channel().id().asShortText(),cql);
+                // 将cql作为自定义属性加入到channel中,方便随时channel中获取cql
+                AttributeKey<String> key = AttributeKey.valueOf("cql");
+                //范围cql
+                ctx.channel().attr(key).set(cql);
+            }
+            if(null != realCql && !"".equals(realCql)){
+                NettyConfig.getUserChannelMap().put(ctx.channel().id().asShortText(),realCql);
+                AttributeKey<String> key1 = AttributeKey.valueOf("realCql");
+                //实时范围cql
+                ctx.channel().attr(key1).set(realCql);
+            }
+            applicationEventPublisher.publishEvent(new ChannelAddEvent(this,ctx,ctx.channel().id(), cql,realCql));
+            log.info("发送启动新线程event成功!");
+        } else {
+            // 前端定时请求,保持心跳连接,避免服务端误删通道
+            ctx.channel().writeAndFlush(new TextWebSocketFrame("keep alive success!"));
+        }
+        // 回复消息
+        ctx.channel().writeAndFlush(new TextWebSocketFrame("此次会话CQL-ID为 "+ ctx.channel().id().asShortText() + " 查询的CQL为" + cql + "与服务器成功通信!"));
+    }
+
+    /**
+     * 移除通道及关联用户
+     *
+     * @param ctx
+     * @throws Exception
+     */
+    @Override
+    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+        log.info("handlerRemoved 被调用" + ctx.channel().id().asLongText());
+        // 删除通道
+        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
+        if (Objects.nonNull(channelGroup)) {
+            channelGroup.remove(ctx.channel());
+            NettyConfig.getUserChannelMap().remove(ctx.channel().id().asShortText());
+            applicationEventPublisher.publishEvent(new ChannelRemoveEvent(this, ctx.channel().id()));
+        }
+        removeUserId(ctx);
+    }
+
+    /**
+     * 异常处理
+     *
+     * @param ctx
+     * @param cause
+     * @throws Exception
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.info("异常:{}", cause.getMessage());
+        // 删除通道
+        NettyConfig.getChannelGroup().remove(ctx.channel());
+        removeUserId(ctx);
+        ctx.close();
+    }
+
+    /**
+     * 心跳检测相关方法 - 会主动调用handlerRemoved
+     *
+     * @param ctx
+     * @param evt
+     * @throws Exception
+     */
+    @Override
+    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            IdleStateEvent event = (IdleStateEvent) evt;
+            if (event.state() == IdleState.ALL_IDLE) {
+                //清除超时会话
+                ChannelFuture writeAndFlush = ctx.writeAndFlush("you will close");
+                writeAndFlush.addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws Exception {
+                        ctx.channel().close();
+                    }
+                });
+            }
+        } else {
+            super.userEventTriggered(ctx, evt);
+        }
+    }
+
+    /**
+     * 删除用户与channel的对应关系
+     *
+     * @param ctx
+     */
+    private void removeUserId(ChannelHandlerContext ctx) {
+        AttributeKey<String> key = AttributeKey.valueOf("userId");
+        String userId = ctx.channel().attr(key).get();
+        if (StringUtils.isNotBlank(userId)) {
+            NettyConfig.getUserChannelMap().remove(userId);
+            log.info("删除用户与channel的对应关系,userId:{}", userId);
+        }
+    }
+
+    /**
+     * 推送消息给所有在线的客户端
+     *
+     * @param msg
+     */
+    public void sendMsgToAll(String msg) {
+        if (NettyConfig.getChannelGroup().size() < 1) {
+        }
+        NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));
+    }
+
+    /**
+     * 推送消息给指定用户
+     *
+     * @param userId
+     * @param msg
+     */
+    public void sendMsgToSpecifyUser(String userId, String msg) {
+        log.info("推送数据--{},到指定用户{}", msg, userId);
+        ConcurrentHashMap<String, Object> userChannelMap = NettyConfig.getUserChannelMap();
+        Channel channel = (Channel) userChannelMap.get(userId);
+        if (channel != null) {
+            channel.writeAndFlush(new TextWebSocketFrame(msg));
+        }
+    }
+
+    /**
+     * 推送消息给指定信道
+     *
+     * @param channelId
+     * @param msg
+     */
+    public void sendMsgToSpecifyChannel(ChannelId channelId, String msg) {
+        log.info("推送数据--{},到指定信道{}", msg, channelId);
+        ChannelGroup channelGroup = NettyConfig.getChannelGroup();
+        Channel channel = channelGroup.find(channelId);
+        if (channel != null) {
+            channel.writeAndFlush(new TextWebSocketFrame(msg));
+        }
+    }
+
+
+}

+ 154 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/listener/ChannelListener.java

@@ -0,0 +1,154 @@
+package cn.com.taiji.beidou.listener;
+
+import cn.com.taiji.beidou.dto.IGeomesaTrackDTO;
+import cn.com.taiji.beidou.event.ChannelAddEvent;
+import cn.com.taiji.beidou.event.ChannelRemoveEvent;
+import cn.com.taiji.beidou.event.EventType;
+import cn.com.taiji.beidou.handler.WebSocketHandler;
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONObject;
+import cn.hutool.json.JSONUtil;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.geotools.data.FeatureReader;
+import org.geotools.data.Query;
+import org.geotools.data.Transaction;
+import org.geotools.filter.text.cql2.CQLException;
+import org.geotools.filter.text.ecql.ECQL;
+import org.locationtech.geomesa.redis.data.RedisDataStore;
+import org.locationtech.jts.geom.Geometry;
+import org.opengis.feature.Property;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author chenfangchao
+ * @title: BeidouChannelListener
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+@Component
+@Slf4j
+public class ChannelListener {
+
+    @Autowired
+    private RedisDataStore redisDataStore;
+
+    @Autowired
+    private WebSocketHandler webSocketHandler;
+
+    private final ConcurrentHashMap<String, Thread> concurrentHashMap = new ConcurrentHashMap<String, Thread>();
+
+    @Autowired
+    private IGeomesaTrackDTO geomesaTrackDTO;
+
+    /**
+     * 监听新增用户信道
+     *
+     * @param event
+     */
+    @EventListener
+    public void listenerNettyAddChannel(ChannelAddEvent event) {
+        log.info("当前需要创建的线程名称为 {}", event.getChannelId());
+
+        Thread thread = concurrentHashMap.get(event.getChannelId().asLongText());
+        if (null != thread) {
+            thread.interrupt();
+        }
+
+        thread = new Thread(() -> {
+            while (true) {
+                try {
+                    if (Thread.currentThread().isInterrupted()) {
+                        return;
+                    }
+                    JSONArray datas = new JSONArray();
+
+                    AttributeKey<String> key = AttributeKey.valueOf("cql");
+                    String cql = event.getCtx().attr(key).get();
+                    if (null != cql) {
+                        //查询是否存在
+                        Query query = new Query(geomesaTrackDTO.getTypeName(), ECQL.toFilter(cql));
+                        query.setMaxFeatures(Integer.parseInt("2000"));
+                        FeatureReader<SimpleFeatureType, SimpleFeature> reader = redisDataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+                        //判断是否存在
+                        while (reader.hasNext()) {
+                            SimpleFeature feature = reader.next();
+                            Collection<Property> properties = feature.getProperties();
+                            JSONObject obj = new JSONObject();
+                            properties.forEach(property -> {
+                                Object value = property.getValue();
+                                if (value instanceof Geometry) {
+                                    Geometry geometry = (Geometry) value;
+                                    value = geometry.toText();
+                                }
+                                obj.put(property.getName().toString(), value);
+                            });
+                            datas.add(obj);
+                        }
+                    }
+
+                    AttributeKey<String> key1 = AttributeKey.valueOf("realCql");
+                    String realCql = event.getCtx().attr(key1).get();
+                    if (null != realCql) {
+                        Query realQuery = new Query(geomesaTrackDTO.getTypeName(), ECQL.toFilter(realCql));
+                        realQuery.setMaxFeatures(Integer.parseInt("2000"));
+                        FeatureReader<SimpleFeatureType, SimpleFeature> realReader = redisDataStore.getFeatureReader(realQuery, Transaction.AUTO_COMMIT);
+                        //判断是否存在
+                        while (realReader.hasNext()) {
+                            SimpleFeature feature = realReader.next();
+                            Collection<Property> properties = feature.getProperties();
+                            JSONObject obj = new JSONObject();
+                            properties.forEach(property -> {
+                                Object value = property.getValue();
+                                if (value instanceof Geometry) {
+                                    Geometry geometry = (Geometry) value;
+                                    value = geometry.toText();
+                                }
+                                obj.put(property.getName().toString(), value);
+                            });
+                            datas.add(obj);
+                        }
+                    }
+                    webSocketHandler.sendMsgToSpecifyChannel(event.getChannelId(), JSONUtil.toJsonStr(new EventType<>(datas, geomesaTrackDTO.getWsLayerName())));
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    e.printStackTrace();
+                    return;
+                } catch (CQLException e) {
+                    e.printStackTrace();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }, event.getChannelId().asLongText());
+
+        thread.start();
+        concurrentHashMap.put(event.getChannelId().asLongText(), thread);
+    }
+
+
+    /**
+     * 监听移除用户信道
+     *
+     * @param event
+     */
+    @EventListener
+    public void listenerNettyRemoveChannel(ChannelRemoveEvent event) {
+        log.info("当前需要移除的线程名称为 {}", event.getChannelId());
+        Thread thread = concurrentHashMap.get(event.getChannelId().asLongText());
+        thread.interrupt();
+        concurrentHashMap.remove(event.getChannelId().asLongText());
+    }
+
+
+}

+ 164 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/listener/FocusTrackChannelListener.java

@@ -0,0 +1,164 @@
+package cn.com.taiji.beidou.listener;
+
+import cn.com.taiji.beidou.dto.FocusTrackQueryDTO;
+import cn.com.taiji.beidou.dto.HistoryTrackQueryDTO;
+import cn.com.taiji.beidou.dto.IGeomesaTrackDTO;
+import cn.com.taiji.beidou.entity.BeidouTrackEntity;
+import cn.com.taiji.beidou.entity.FocusShipEntity;
+import cn.com.taiji.beidou.event.ChannelRemoveEvent;
+import cn.com.taiji.beidou.event.EventType;
+import cn.com.taiji.beidou.event.FocusTrackChannelAddEvent;
+import cn.com.taiji.beidou.event.HistoryTrackChannelAddEvent;
+import cn.com.taiji.beidou.handler.HistoryTrackHandler;
+import cn.com.taiji.beidou.service.IBeidouTrackService;
+import cn.com.taiji.beidou.service.IFocusShipService;
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.geotools.data.FeatureReader;
+import org.geotools.data.Query;
+import org.geotools.data.Transaction;
+import org.geotools.factory.CommonFactoryFinder;
+import org.geotools.filter.text.cql2.CQL;
+import org.geotools.filter.text.cql2.CQLException;
+import org.geotools.filter.text.ecql.ECQL;
+import org.locationtech.geomesa.redis.data.RedisDataStore;
+import org.locationtech.jts.geom.Geometry;
+import org.opengis.feature.Property;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.opengis.filter.Filter;
+import org.opengis.filter.FilterFactory2;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author chenfangchao
+ * @title: BeidouChannelListener
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+@Component
+@Slf4j
+public class FocusTrackChannelListener {
+    @Autowired
+    private HistoryTrackHandler historyTrackHandler;
+    @Autowired
+    private IBeidouTrackService beidouTrackService;
+    @Autowired
+    private IFocusShipService focusShipService;
+    @Autowired
+    private RedisDataStore redisDataStore;
+    @Autowired
+    private IGeomesaTrackDTO geomesaTrackDTO;
+
+    private final ConcurrentHashMap<String, Thread> concurrentHashMap = new ConcurrentHashMap<String, Thread>();
+    /**
+     * 监听新增用户信道
+     *
+     * @param event
+     */
+    @EventListener
+    public void listenerNettyAddChannel(FocusTrackChannelAddEvent event) {
+        {
+            log.info("当前需要创建的线程名称为 {}", event.getChannelId());
+
+            Thread thread = concurrentHashMap.get(event.getChannelId().asLongText());
+            if (null != thread) {
+                thread.interrupt();
+            }
+
+            thread = new Thread(() -> {
+                while (true) {
+                    try {
+                        if (Thread.currentThread().isInterrupted()) {
+                            return;
+                        }
+                        queryFocusTrack(event);
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        e.printStackTrace();
+                        return;
+                    } catch (CQLException e) {
+                        e.printStackTrace();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }, event.getChannelId().asLongText());
+
+            thread.start();
+            concurrentHashMap.put(event.getChannelId().asLongText(), thread);
+        }
+    }
+    /**
+     * 查询
+     */
+    private void queryFocusTrack(FocusTrackChannelAddEvent event) throws CQLException, IOException {
+        JSONArray datas = new JSONArray();
+        AttributeKey<String> key = AttributeKey.valueOf("focus-track");
+        String dtoStr = event.getCtx().attr(key).get();
+        FocusTrackQueryDTO dto = JSONObject.parseObject(dtoStr, FocusTrackQueryDTO.class);
+        if (null != dto) {
+            QueryWrapper<FocusShipEntity> wrapper = new QueryWrapper();
+            if(StringUtils.hasText(dto.getUserId())){
+                wrapper.eq("user_id",dto.getUserId());
+            }
+            List<FocusShipEntity> entities = focusShipService.list(wrapper);
+            FilterFactory2 ff = CommonFactoryFinder.getFilterFactory2();
+            List<Filter> filters = new ArrayList<>();
+            if(entities != null) {
+                for (FocusShipEntity input : entities) {
+                    Filter filter = CQL.toFilter("deviceId = " + input.getShipId());
+                    filters.add(filter);
+                }
+                Filter filter = ff.or(filters);
+                Query query = new Query(geomesaTrackDTO.getTypeName(), filter);
+                FeatureReader<SimpleFeatureType, SimpleFeature> focusReader = redisDataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+                //判断是否存在
+                while (focusReader.hasNext()) {
+                    SimpleFeature feature = focusReader.next();
+                    Collection<Property> properties = feature.getProperties();
+                    cn.hutool.json.JSONObject obj = new cn.hutool.json.JSONObject();
+                    properties.forEach(property -> {
+                        Object value = property.getValue();
+                        if (value instanceof Geometry) {
+                            Geometry geometry = (Geometry) value;
+                            value = geometry.toText();
+                        }
+                        obj.put(property.getName().toString(), value);
+                    });
+                    datas.add(obj);
+                }
+            }
+        }
+        historyTrackHandler.sendMsgToSpecifyChannel(event.getChannelId(), JSONUtil.toJsonStr(new EventType<>(datas, geomesaTrackDTO.getWsLayerName())));
+    }
+
+    /**
+     * 监听移除用户信道
+     *
+     * @param event
+     */
+    @EventListener
+    public void listenerNettyRemoveChannel(ChannelRemoveEvent event) {
+        log.info("当前需要移除的线程名称为 {}", event.getChannelId());
+        Thread thread = concurrentHashMap.get(event.getChannelId().asLongText());
+        thread.interrupt();
+        concurrentHashMap.remove(event.getChannelId().asLongText());
+    }
+}

+ 98 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/listener/HistoryTrackChannelListener.java

@@ -0,0 +1,98 @@
+package cn.com.taiji.beidou.listener;
+
+import cn.com.taiji.beidou.dto.HistoryTrackQueryDTO;
+import cn.com.taiji.beidou.dto.IGeomesaTrackDTO;
+import cn.com.taiji.beidou.entity.BeidouTrackEntity;
+import cn.com.taiji.beidou.entity.FocusShipEntity;
+import cn.com.taiji.beidou.event.ChannelAddEvent;
+import cn.com.taiji.beidou.event.ChannelRemoveEvent;
+import cn.com.taiji.beidou.event.EventType;
+import cn.com.taiji.beidou.event.HistoryTrackChannelAddEvent;
+import cn.com.taiji.beidou.handler.HistoryTrackHandler;
+import cn.com.taiji.beidou.handler.WebSocketHandler;
+import cn.com.taiji.beidou.service.IBeidouTrackService;
+import cn.com.taiji.beidou.service.IFocusShipService;
+import cn.hutool.json.JSONArray;
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.util.AttributeKey;
+import lombok.extern.slf4j.Slf4j;
+import org.geotools.data.FeatureReader;
+import org.geotools.data.Query;
+import org.geotools.data.Transaction;
+import org.geotools.filter.text.cql2.CQLException;
+import org.geotools.filter.text.ecql.ECQL;
+import org.locationtech.geomesa.redis.data.RedisDataStore;
+import org.locationtech.jts.geom.Geometry;
+import org.opengis.feature.Property;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author chenfangchao
+ * @title: BeidouChannelListener
+ * @projectName ax-geomesa-redis
+ * @description: TODO
+ * @date 2022/9/22 6:00 PM
+ */
+@Component
+@Slf4j
+public class HistoryTrackChannelListener {
+    @Autowired
+    private HistoryTrackHandler historyTrackHandler;
+    @Autowired
+    private IBeidouTrackService beidouTrackService;
+
+    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-channel-history-track").build();
+
+    ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(2,2,5,
+            TimeUnit.MINUTES,new ArrayBlockingQueue<>(10),threadFactory,new ThreadPoolExecutor.AbortPolicy());
+    /**
+     * 监听新增用户信道
+     *
+     * @param event
+     */
+    @EventListener
+    public void listenerNettyAddChannel(HistoryTrackChannelAddEvent event) {
+        log.info("当前需要创建的线程名称为 {}", event.getChannelId());
+        threadPoolExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+                queryHistoryTrack(event);
+            }
+        });
+    }
+    /**
+     * 查询
+     */
+    private void queryHistoryTrack(HistoryTrackChannelAddEvent event)  {
+        List<BeidouTrackEntity> results = new ArrayList<>();
+        AttributeKey<String> key = AttributeKey.valueOf("history-track");
+        String dtoStr = event.getCtx().attr(key).get();
+        HistoryTrackQueryDTO dto = JSONObject.parseObject(dtoStr, HistoryTrackQueryDTO.class);
+        if (null != dto) {
+            QueryWrapper<BeidouTrackEntity> wrapper = new QueryWrapper();
+            if(StringUtils.hasText(dto.getShipId())){
+                wrapper.eq("device_id",dto.getShipId());
+            }
+            if(StringUtils.hasText(dto.getStartTime())&&StringUtils.hasText(dto.getEndTime())){
+                wrapper.between("location_time",dto.getStartTime(),dto.getEndTime());
+            }
+            results = beidouTrackService.list(wrapper);
+        }
+        historyTrackHandler.sendMsgToSpecifyChannel(event.getChannelId(), JSONUtil.toJsonStr(results));
+    }
+
+}

+ 18 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/mapper/BeidouTrackMapper.java

@@ -0,0 +1,18 @@
+package cn.com.taiji.beidou.mapper;
+
+import cn.com.taiji.beidou.entity.BeidouTrackEntity;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * @author chenfangchao
+ * @title: GeoServerMapper
+ * @projectName ax-geoserver-project
+ * @description: TODO
+ * @date 2022/8/16 7:54 PM
+ */
+@Mapper
+public interface BeidouTrackMapper extends BaseMapper<BeidouTrackEntity> {
+
+}
+

+ 19 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/mapper/FocusShipMapper.java

@@ -0,0 +1,19 @@
+package cn.com.taiji.beidou.mapper;
+
+import cn.com.taiji.beidou.entity.BeidouTrackEntity;
+import cn.com.taiji.beidou.entity.FocusShipEntity;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * @author chenfangchao
+ * @title: GeoServerMapper
+ * @projectName ax-geoserver-project
+ * @description: TODO
+ * @date 2022/8/16 7:54 PM
+ */
+@Mapper
+public interface FocusShipMapper extends BaseMapper<FocusShipEntity> {
+
+}
+

+ 70 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/producer/BeidouShipTestProducer.java

@@ -0,0 +1,70 @@
+package cn.com.taiji.beidou.producer;
+
+import cn.com.taiji.beidou.dto.BeidouShipTrackDTO;
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSONObject;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.annotation.Resource;
+import java.util.UUID;
+
+/**
+ * @author chenfangchao
+ * @title: BeidouShipTestProducer
+ * @projectName ax-geoserver-project
+ * @description: TODO
+ * @date 2022/8/15 9:41 PM
+ */
+@RestController
+@RequestMapping("/producer")
+public class BeidouShipTestProducer {
+
+
+    @Resource
+    private KafkaTemplate kafkaTemplate;
+
+    @GetMapping("/beidou")
+    public void  testBeidouShipData(){
+
+        for (int i = 0; i < 100; i++) {
+            String time = String.valueOf(DateUtil.date());
+            Object msg = "{\n" +
+                    "  \"id\": \"00b72b21b7cb41feb360a0db59403712\",\n" +
+                    "  \"content\": \"167153587,15011858,255,255,0,2022-04-02 16:22:13,2022-04-02 16:22:12,1,39,12,255,110.636777,19.260833,0,0,3\",\n" +
+                    "  \"time\": \"2022-04-02 16:22:14\",\n" +
+                    "  \"curren_time\": \"2022-04-02 16:22:15\",\n" +
+                    "  \"trackId\": \"167153587\",\n" +
+                    "  \"deviceId\": \"15011858\",\n" +
+                    "  \"shipType\": \"255\",\n" +
+                    "  \"workType\": \"255\",\n" +
+                    "  \"workWay\": \"0\",\n" +
+                    "  \"sendTime\": \"+ " + time + " +\",\n" +
+                    "  \"locationTime\": \"2022-04-02 16:22:12\",\n" +
+                    "  \"online\": \"1\",\n" +
+                    "  \"shipLength\": \"39\",\n" +
+                    "  \"shipWidth\": \"12\",\n" +
+                    "  \"texture\": \"255\",\n" +
+                    "  \"longitude\": \"110.636777\",\n" +
+                    "  \"latitude\": \"19.260833\",\n" +
+                    "  \"direction\": \"0\",\n" +
+                    "  \"speed\": \"0\",\n" +
+                    "  \"kwh\": \"3\",\n" +
+                    "  \"kafka_xrsj\": \"2022-04-02 16:22:19.932\"\n" +
+                    "}\n";
+            BeidouShipTrackDTO beidou = JSONObject.parseObject(msg.toString(), BeidouShipTrackDTO.class);
+            beidou.setTrackId(1001L);
+            beidou.setDeviceId(String.valueOf(i));
+            beidou.setShipName(UUID.randomUUID().toString());
+            beidou.setLongitude(beidou.getLongitude());
+            beidou.setLatitude(beidou.getLatitude());
+
+            String data = JSONUtil.toJsonStr(beidou);
+            kafkaTemplate.send("sgAxBeidouTrack", data);
+        }
+    }
+
+}

+ 72 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/serializer/FastJson2JsonRedisSerializer.java

@@ -0,0 +1,72 @@
+package cn.com.taiji.beidou.serializer;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.parser.ParserConfig;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.TypeFactory;
+import org.springframework.data.redis.serializer.RedisSerializer;
+import org.springframework.data.redis.serializer.SerializationException;
+import org.springframework.util.Assert;
+
+import java.nio.charset.Charset;
+
+/**
+ * Redis使用FastJson序列化
+ * 
+ * @author seat
+ */
+public class FastJson2JsonRedisSerializer<T> implements RedisSerializer<T>
+{
+    @SuppressWarnings("unused")
+    private ObjectMapper objectMapper = new ObjectMapper();
+
+    public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
+
+    private Class<T> clazz;
+
+    static
+    {
+        ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
+    }
+
+    public FastJson2JsonRedisSerializer(Class<T> clazz)
+    {
+        super();
+        this.clazz = clazz;
+    }
+
+    @Override
+    public byte[] serialize(T t) throws SerializationException
+    {
+        if (t == null)
+        {
+            return new byte[0];
+        }
+        return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
+    }
+
+    @Override
+    public T deserialize(byte[] bytes) throws SerializationException
+    {
+        if (bytes == null || bytes.length <= 0)
+        {
+            return null;
+        }
+        String str = new String(bytes, DEFAULT_CHARSET);
+
+        return JSON.parseObject(str, clazz);
+    }
+
+    public void setObjectMapper(ObjectMapper objectMapper)
+    {
+        Assert.notNull(objectMapper, "'objectMapper' must not be null");
+        this.objectMapper = objectMapper;
+    }
+
+    protected JavaType getJavaType(Class<?> clazz)
+    {
+        return TypeFactory.defaultInstance().constructType(clazz);
+    }
+}

+ 152 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/server/FocusTrackServer.java

@@ -0,0 +1,152 @@
+package cn.com.taiji.beidou.server;
+
+import cn.com.taiji.beidou.handler.FocusTrackHandler;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author wudskq
+ */
+@Slf4j
+@Component
+public class FocusTrackServer {
+
+    /**
+     * webSocket协议名
+     */
+    private static final String WEBSOCKET_PROTOCOL = "WebSocket";
+
+    /**
+     * 端口号
+     */
+    @Value("${webSocket.focus-track.port}")
+    private int port;
+
+    /**
+     * webSocket路径
+     */
+    @Value("${webSocket.focus-track.path}")
+    private String webSocketPath;
+
+    /**
+     * 在Netty心跳检测中配置 - 读空闲超时时间设置
+     */
+    @Value("${webSocket.readerIdleTime}")
+    private long readerIdleTime;
+
+    /**
+     * 在Netty心跳检测中配置 - 写空闲超时时间设置
+     */
+    @Value("${webSocket.writerIdleTime}")
+    private long writerIdleTime;
+
+    /**
+     * 在Netty心跳检测中配置 - 读写空闲超时时间设置
+     */
+    @Value("${webSocket.allIdleTime}")
+    private long allIdleTime;
+
+    @Autowired
+    private FocusTrackHandler focusTrackHandler;
+
+    private EventLoopGroup bossGroup;
+
+    private EventLoopGroup workGroup;
+
+
+    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-netty-historytrack").build();
+
+    /**
+     * 启动
+     * @throws InterruptedException
+     */
+    private void start() throws InterruptedException {
+        bossGroup = new NioEventLoopGroup();
+        workGroup = new NioEventLoopGroup();
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.group(bossGroup,workGroup);
+        bootstrap.channel(NioServerSocketChannel.class);
+        // 设置监听端口
+        bootstrap.localAddress(new InetSocketAddress(port));
+        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+
+            @Override
+            protected void initChannel(SocketChannel ch) throws Exception {
+                ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
+                // webSocket协议本身是基于http协议的,使用http编解码器
+                ch.pipeline().addLast(new HttpServerCodec());
+                ch.pipeline().addLast(new ObjectEncoder());
+                ch.pipeline().addLast(new ChunkedWriteHandler());
+                ch.pipeline().addLast(new HttpObjectAggregator(8192));
+                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
+                // 自定义的handler,处理业务逻辑
+                ch.pipeline().addLast(focusTrackHandler);
+            }
+        });
+        ChannelFuture channelFuture = bootstrap.bind().sync();
+        log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    /**
+     * 释放资源
+     * @throws InterruptedException
+     */
+    @PreDestroy
+    public void destroy() throws InterruptedException {
+        if(bossGroup != null){
+            bossGroup.shutdownGracefully().sync();
+        }
+        if(workGroup != null){
+            workGroup.shutdownGracefully().sync();
+        }
+    }
+
+    /**
+     * 使用线程池初始化netty-websocket服务
+     */
+    ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(2,2,5,
+            TimeUnit.MINUTES,new ArrayBlockingQueue<>(10),threadFactory,new ThreadPoolExecutor.AbortPolicy());
+
+    /**
+     * 初始化(新线程开启)
+     */
+    @PostConstruct()
+    public void init() {
+        threadPoolExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    start();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+}

+ 153 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/server/HistoryTrackServer.java

@@ -0,0 +1,153 @@
+package cn.com.taiji.beidou.server;
+
+import cn.com.taiji.beidou.handler.HistoryTrackHandler;
+import cn.com.taiji.beidou.handler.WebSocketHandler;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author wudskq
+ */
+@Slf4j
+@Component
+public class HistoryTrackServer {
+
+    /**
+     * webSocket协议名
+     */
+    private static final String WEBSOCKET_PROTOCOL = "WebSocket";
+
+    /**
+     * 端口号
+     */
+    @Value("${webSocket.history-track.port}")
+    private int port;
+
+    /**
+     * webSocket路径
+     */
+    @Value("${webSocket.history-track.path}")
+    private String webSocketPath;
+
+    /**
+     * 在Netty心跳检测中配置 - 读空闲超时时间设置
+     */
+    @Value("${webSocket.readerIdleTime}")
+    private long readerIdleTime;
+
+    /**
+     * 在Netty心跳检测中配置 - 写空闲超时时间设置
+     */
+    @Value("${webSocket.writerIdleTime}")
+    private long writerIdleTime;
+
+    /**
+     * 在Netty心跳检测中配置 - 读写空闲超时时间设置
+     */
+    @Value("${webSocket.allIdleTime}")
+    private long allIdleTime;
+
+    @Autowired
+    private HistoryTrackHandler historyTrackHandler;
+
+    private EventLoopGroup bossGroup;
+
+    private EventLoopGroup workGroup;
+
+
+    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-netty-historytrack").build();
+
+    /**
+     * 启动
+     * @throws InterruptedException
+     */
+    private void start() throws InterruptedException {
+        bossGroup = new NioEventLoopGroup();
+        workGroup = new NioEventLoopGroup();
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.group(bossGroup,workGroup);
+        bootstrap.channel(NioServerSocketChannel.class);
+        // 设置监听端口
+        bootstrap.localAddress(new InetSocketAddress(port));
+        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+
+            @Override
+            protected void initChannel(SocketChannel ch) throws Exception {
+                ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
+                // webSocket协议本身是基于http协议的,使用http编解码器
+                ch.pipeline().addLast(new HttpServerCodec());
+                ch.pipeline().addLast(new ObjectEncoder());
+                ch.pipeline().addLast(new ChunkedWriteHandler());
+                ch.pipeline().addLast(new HttpObjectAggregator(8192));
+                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
+                // 自定义的handler,处理业务逻辑
+                ch.pipeline().addLast(historyTrackHandler);
+            }
+        });
+        ChannelFuture channelFuture = bootstrap.bind().sync();
+        log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    /**
+     * 释放资源
+     * @throws InterruptedException
+     */
+    @PreDestroy
+    public void destroy() throws InterruptedException {
+        if(bossGroup != null){
+            bossGroup.shutdownGracefully().sync();
+        }
+        if(workGroup != null){
+            workGroup.shutdownGracefully().sync();
+        }
+    }
+
+    /**
+     * 使用线程池初始化netty-websocket服务
+     */
+    ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(2,2,5,
+            TimeUnit.MINUTES,new ArrayBlockingQueue<>(10),threadFactory,new ThreadPoolExecutor.AbortPolicy());
+
+    /**
+     * 初始化(新线程开启)
+     */
+    @PostConstruct()
+    public void init() {
+        threadPoolExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    start();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+}

+ 152 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/server/NettyServer.java

@@ -0,0 +1,152 @@
+package cn.com.taiji.beidou.server;
+
+import cn.com.taiji.beidou.handler.WebSocketHandler;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+import io.netty.handler.codec.serialization.ObjectEncoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.net.InetSocketAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * @author wudskq
+ */
+@Slf4j
+@Component
+public class NettyServer {
+
+    /**
+     * webSocket协议名
+     */
+    private static final String WEBSOCKET_PROTOCOL = "WebSocket";
+
+    /**
+     * 端口号
+     */
+    @Value("${webSocket.netty.port}")
+    private int port;
+
+    /**
+     * webSocket路径
+     */
+    @Value("${webSocket.netty.path}")
+    private String webSocketPath;
+
+    /**
+     * 在Netty心跳检测中配置 - 读空闲超时时间设置
+     */
+    @Value("${webSocket.netty.readerIdleTime}")
+    private long readerIdleTime;
+
+    /**
+     * 在Netty心跳检测中配置 - 写空闲超时时间设置
+     */
+    @Value("${webSocket.netty.writerIdleTime}")
+    private long writerIdleTime;
+
+    /**
+     * 在Netty心跳检测中配置 - 读写空闲超时时间设置
+     */
+    @Value("${webSocket.netty.allIdleTime}")
+    private long allIdleTime;
+
+    @Autowired
+    private WebSocketHandler webSocketHandler;
+
+    private EventLoopGroup bossGroup;
+
+    private EventLoopGroup workGroup;
+
+
+    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-netty-ws1").build();
+
+    /**
+     * 启动
+     * @throws InterruptedException
+     */
+    private void start() throws InterruptedException {
+        bossGroup = new NioEventLoopGroup();
+        workGroup = new NioEventLoopGroup();
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.group(bossGroup,workGroup);
+        bootstrap.channel(NioServerSocketChannel.class);
+        // 设置监听端口
+        bootstrap.localAddress(new InetSocketAddress(port));
+        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+
+            @Override
+            protected void initChannel(SocketChannel ch) throws Exception {
+                ch.pipeline().addLast(new IdleStateHandler(readerIdleTime, writerIdleTime, allIdleTime, TimeUnit.MINUTES));
+                // webSocket协议本身是基于http协议的,使用http编解码器
+                ch.pipeline().addLast(new HttpServerCodec());
+                ch.pipeline().addLast(new ObjectEncoder());
+                ch.pipeline().addLast(new ChunkedWriteHandler());
+                ch.pipeline().addLast(new HttpObjectAggregator(8192));
+                ch.pipeline().addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));
+                // 自定义的handler,处理业务逻辑
+                ch.pipeline().addLast(webSocketHandler);
+            }
+        });
+        ChannelFuture channelFuture = bootstrap.bind().sync();
+        log.info("Server started and listen on:{}",channelFuture.channel().localAddress());
+        channelFuture.channel().closeFuture().sync();
+    }
+
+    /**
+     * 释放资源
+     * @throws InterruptedException
+     */
+    @PreDestroy
+    public void destroy() throws InterruptedException {
+        if(bossGroup != null){
+            bossGroup.shutdownGracefully().sync();
+        }
+        if(workGroup != null){
+            workGroup.shutdownGracefully().sync();
+        }
+    }
+
+    /**
+     * 使用线程池初始化netty-websocket服务
+     */
+    ThreadPoolExecutor threadPoolExecutor  = new ThreadPoolExecutor(2,2,5,
+            TimeUnit.MINUTES,new ArrayBlockingQueue<>(10),threadFactory,new ThreadPoolExecutor.AbortPolicy());
+
+    /**
+     * 初始化(新线程开启)
+     */
+    @PostConstruct()
+    public void init() {
+        threadPoolExecutor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    start();
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+    }
+}

+ 19 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/service/IBeidouTrackService.java

@@ -0,0 +1,19 @@
+package cn.com.taiji.beidou.service;
+
+import cn.com.taiji.beidou.entity.BeidouTrackEntity;
+import com.baomidou.mybatisplus.extension.service.IService;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.List;
+
+/**
+ * @Description:
+ * @Author: hujie@umisoft.cn
+ * @Date: 2021/11/30 9:58 下午
+ */
+public interface IBeidouTrackService extends IService<BeidouTrackEntity> {
+
+    void beidouDynamicShipToRedis(List<ConsumerRecord<?, ?>> records);
+
+    void beidouDynamicShipToMySql(List<ConsumerRecord<?, ?>> records);
+}

+ 17 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/service/IFocusShipService.java

@@ -0,0 +1,17 @@
+package cn.com.taiji.beidou.service;
+
+import cn.com.taiji.beidou.entity.BeidouTrackEntity;
+import cn.com.taiji.beidou.entity.FocusShipEntity;
+import com.baomidou.mybatisplus.extension.service.IService;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.List;
+
+/**
+ * @Description:
+ * @Author: hujie@umisoft.cn
+ * @Date: 2021/11/30 9:58 下午
+ */
+public interface IFocusShipService extends IService<FocusShipEntity> {
+
+}

+ 151 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/service/impl/BeidouTrackServiceImpl.java

@@ -0,0 +1,151 @@
+package cn.com.taiji.beidou.service.impl;
+
+import cn.com.taiji.beidou.dto.BeidouShipTrackDTO;
+import cn.com.taiji.beidou.dto.IGeomesaTrackDTO;
+import cn.com.taiji.beidou.entity.BeidouTrackEntity;
+import cn.com.taiji.beidou.mapper.BeidouTrackMapper;
+import cn.com.taiji.beidou.service.IBeidouTrackService;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.geotools.data.FeatureReader;
+import org.geotools.data.FeatureWriter;
+import org.geotools.data.Query;
+import org.geotools.data.Transaction;
+import org.geotools.factory.CommonFactoryFinder;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.filter.identity.FeatureIdImpl;
+import org.geotools.util.factory.Hints;
+import org.locationtech.geomesa.redis.data.RedisDataStore;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.opengis.filter.Filter;
+import org.opengis.filter.FilterFactory2;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+
+/**
+ * @Description:
+ * @Author: hujie@umisoft.cn
+ * @Date: 2021/11/30 9:59 下午
+ */
+@Service
+@Slf4j
+public class BeidouTrackServiceImpl extends ServiceImpl<BeidouTrackMapper, BeidouTrackEntity> implements IBeidouTrackService {
+
+    @Autowired
+    protected StringRedisTemplate redisTemplate;
+    @Autowired
+    private SimpleFeatureType sft;
+    @Autowired
+    private RedisDataStore redisDataStore;
+    @Autowired
+    private IGeomesaTrackDTO geomesaTrackDTO;
+
+    /**
+     * 写入Redis
+     * @param records
+     */
+    @Override
+    public void beidouDynamicShipToRedis(List<ConsumerRecord<?, ?>> records){
+        Date begain = null;
+        try {
+            begain = new Date();
+            FeatureReader<SimpleFeatureType, SimpleFeature> reader = null;
+            FeatureWriter<SimpleFeatureType, SimpleFeature> writer = null;
+
+            try {
+                writer = redisDataStore.getFeatureWriterAppend(sft.getTypeName(), Transaction.AUTO_COMMIT);
+                SimpleFeatureBuilder builder = new SimpleFeatureBuilder(sft);
+                for (ConsumerRecord<?, ?> record : records) {
+                    Optional message = Optional.ofNullable(record.value());
+                    if (message.isPresent()) {
+                        Object msg = message.get();
+                        IGeomesaTrackDTO dto = JSONObject.parseObject(msg.toString(), geomesaTrackDTO.getClass());
+                        dto.setKafkaConsumerGroup(geomesaTrackDTO.getKafkaConsumerGroup());
+                        dto.setExpirySeconds(geomesaTrackDTO.getExpirySeconds());
+                        if (!dto.checkPoint()) {
+                            continue;
+                        }
+                        String fid = dto.getFid();
+                        boolean append = true;
+                        FilterFactory2 filterFactory = CommonFactoryFinder.getFilterFactory2();
+                        Filter filter = null;
+                        if(null != fid && !"".equals(fid)){
+                            filter = filterFactory.id(filterFactory.featureId(fid));
+                        }else {
+                            String id = UUID.randomUUID().toString();
+                            filter = filterFactory.id(filterFactory.featureId(id));
+                        }
+                        Query query = new Query(dto.getTypeName(), filter);
+                        reader = redisDataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+                        if (reader.hasNext()) {
+                            append = false;
+                            String[] propList = dto.getPropList();
+                            Object[] valueList = dto.getValueList();
+                            redisDataStore.getFeatureSource(dto.getTypeName()).modifyFeatures(propList, valueList, filter);
+                        }
+                        reader.close();
+                        reader = null;
+                        if (append) {
+                            SimpleFeature toWrite = writer.next();
+                            builder.reset();
+                            SimpleFeature feature = dto.toSimpleFeature(builder, fid);
+                            toWrite.setAttributes(feature.getAttributes());
+                            toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
+                            ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
+                            toWrite.getUserData().putAll(feature.getUserData());
+                            writer.write();
+                        }
+                    }
+                }
+            }  catch (Exception e) {
+                log.error("=======发生异常============");
+                e.printStackTrace();
+            } finally {
+                try {
+                    if (null != writer) {
+                        writer.close();
+                    }
+                    if (null != reader) {
+                        reader.close();
+                    }
+                } catch (Exception e) {
+                    log.error("=======close操作发生异常============");
+                    e.printStackTrace();
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        log.info("写入图层数据完成!耗时 {} 毫秒, 从 消费组 {} 中合计消费 {} 条记录", (new Date()).getTime() - begain.getTime(), geomesaTrackDTO.getKafkaConsumerGroup(), records.size());
+    }
+
+
+    @Override
+    public void beidouDynamicShipToMySql(List<ConsumerRecord<?, ?>> records){
+        Date begain = null;
+        List<BeidouTrackEntity> entities = new ArrayList<>();
+        try {
+            begain = new Date();
+            for (ConsumerRecord<?, ?> record : records) {
+                Optional message = Optional.ofNullable(record.value());
+                if (message.isPresent()) {
+                    Object msg = message.get();
+                    BeidouTrackEntity entity = JSONObject.parseObject(msg.toString(), BeidouTrackEntity.class);
+                    entity.setCreateTime(new Date());
+                    entity.setUpdateTime(new Date());
+                    entities.add(entity);
+                }
+            }
+            saveBatch(entities);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        log.info("写入数据库完成!耗时 {} 毫秒, 合计消费 {} 条记录", (new Date()).getTime() - begain.getTime(), records.size());
+    }
+}

+ 42 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/service/impl/FocusShipServiceImpl.java

@@ -0,0 +1,42 @@
+package cn.com.taiji.beidou.service.impl;
+
+import cn.com.taiji.beidou.dto.IGeomesaTrackDTO;
+import cn.com.taiji.beidou.entity.BeidouTrackEntity;
+import cn.com.taiji.beidou.entity.FocusShipEntity;
+import cn.com.taiji.beidou.mapper.BeidouTrackMapper;
+import cn.com.taiji.beidou.mapper.FocusShipMapper;
+import cn.com.taiji.beidou.service.IBeidouTrackService;
+import cn.com.taiji.beidou.service.IFocusShipService;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.geotools.data.FeatureReader;
+import org.geotools.data.FeatureWriter;
+import org.geotools.data.Query;
+import org.geotools.data.Transaction;
+import org.geotools.factory.CommonFactoryFinder;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.filter.identity.FeatureIdImpl;
+import org.geotools.util.factory.Hints;
+import org.locationtech.geomesa.redis.data.RedisDataStore;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.opengis.filter.Filter;
+import org.opengis.filter.FilterFactory2;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.StringRedisTemplate;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+
+/**
+ * @Description:
+ * @Author: hujie@umisoft.cn
+ * @Date: 2021/11/30 9:59 下午
+ */
+@Service
+@Slf4j
+public class FocusShipServiceImpl extends ServiceImpl<FocusShipMapper, FocusShipEntity> implements IFocusShipService {
+
+}

+ 19 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/beidou/utils/RandomGroup.java

@@ -0,0 +1,19 @@
+package cn.com.taiji.beidou.utils;
+
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.io.unit.DataUnit;
+
+/**
+ * @author chenfangchao
+ * @title: RandomGroup
+ * @projectName geomesa-tutorials
+ * @description: TODO
+ * @date 2022/9/5 9:48 AM
+ */
+public class RandomGroup {
+
+    public static final long getRandomDate(){
+        long current = DateUtil.current();
+        return current;
+    }
+}

+ 114 - 0
beidou-track-geomesa/src/main/resources/application-dev.yml

@@ -0,0 +1,114 @@
+server:
+  port: 8188
+#netty的配置信息
+webSocket:
+  readerIdleTime: 120  #读空闲超时时间设置(Netty心跳检测配置)
+  writerIdleTime: 120  #写空闲超时时间设置(Netty心跳检测配置)
+  allIdleTime: 120     #读写空闲超时时间设置(Netty心跳检测配置)
+  netty:
+    port: 6999
+    path: /webSocket
+    readerIdleTime: 120  #读空闲超时时间设置(Netty心跳检测配置)
+    writerIdleTime: 120  #写空闲超时时间设置(Netty心跳检测配置)
+    allIdleTime: 120     #读写空闲超时时间设置(Netty心跳检测配置)
+  history-track:
+    port: 6998
+    path: /historytrack
+  focus-track:
+    port: 6997
+    path: /focustrack
+spring:
+  datasource:
+    type: com.alibaba.druid.pool.DruidDataSource
+    driverClassName: com.mysql.cj.jdbc.Driver
+    url: jdbc:mysql://localhost:3306/ycm?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+    username: root
+    password: root
+    # 初始连接数
+    initialSize: 5
+    # 最小连接池数量
+    minIdle: 10
+    # 最大连接池数量
+    maxActive: 20
+    # 配置获取连接等待超时的时间
+    maxWait: 60000
+    # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+    timeBetweenEvictionRunsMillis: 60000
+    # 配置一个连接在池中最小生存的时间,单位是毫秒
+    minEvictableIdleTimeMillis: 300000
+    # 配置一个连接在池中最大生存的时间,单位是毫秒
+    maxEvictableIdleTimeMillis: 900000
+    # 配置检测连接是否有效
+    validationQuery: SELECT 1
+  kafka:
+    bootstrap-servers: 120.25.232.213:9092
+    producer:
+      # 发生错误后,消息重发的次数。
+      retries: 0
+      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
+      batch-size: 16384
+      # 设置生产者内存缓冲区的大小。
+      buffer-memory: 33554432
+      # 键的序列化方式
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      # 值的序列化方式
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
+      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
+      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
+      acks: 1
+    consumer:
+      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
+      auto-commit-interval: 1S
+      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
+      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
+      auto-offset-reset: latest
+      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
+      enable-auto-commit: false
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+    properties:
+      security:
+        protocol: SASL_PLAINTEXT
+      sasl:
+        mechanism: SCRAM-SHA-512
+        jaas:
+          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin";'
+    listener:
+      # 在侦听器容器中运行的线程数。
+      concurrency: 5
+      #listner负责ack,每调用一次,就立即commit
+      ack-mode: manual_immediate
+      missing-topics-fatal: false
+      type: batch
+  # redis 配置
+  redis:
+    # 地址
+    host: localhost
+    port: 6379
+    # 数据库索引
+    database: 3
+    # 密码
+    password:
+    # 连接超时时间
+    timeout: 10s
+    lettuce:
+      pool:
+        # 连接池中的最小空闲连接
+        min-idle: 0
+        # 连接池中的最大空闲连接
+        max-idle: 8
+        # 连接池的最大数据库连接数
+        max-active: 8
+        # #连接池最大阻塞等待时间(使用负值表示没有限制)
+        max-wait: -1ms
+
+taiji:
+  geomesa.store.redis.config: redis.url===redis://localhost:6379###redis.catalog===geomesa###redis.connection.pool.size===100###redis.connection.pool.validate===true###geomesa.stats.enable===false###geomesa.stats.generate===false###geomesa.query.caching===false
+  kafka.consumer:
+    beidou:
+      enable: true
+      expiry: 70
+      topic: sgAxBeidouTrack
+      group: sgAxBeidouTrack—${random.uuid}

+ 246 - 0
beidou-track-geomesa/src/main/resources/application-prod.yml

@@ -0,0 +1,246 @@
+server:
+  port: 8786
+
+
+spring:
+  datasource:
+    druid:
+      master:
+        type: com.alibaba.druid.pool.DruidDataSource
+        url: jdbc:mysql://74.10.28.2:3389/geodb?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+        driver-class-name: com.mysql.cj.jdbc.Driver
+        username: ax_tj_seat
+        password: Taiji@2022#seat
+      slave:
+        enabled: true
+        type: com.alibaba.druid.pool.DruidDataSource
+        driverClassName: org.postgresql.Driver
+        url: jdbc:postgresql://74.10.28.55:5432/structure-warning-db?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+        username: postgres
+        password: postgres
+      slave1:
+        enabled: true
+        type: com.alibaba.druid.pool.DruidDataSource
+        driverClassName: org.postgresql.Driver
+        url: jdbc:postgresql://74.10.28.55:5432/structure-warning-db?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+        username: postgres
+        password: postgres
+      # 初始连接数
+      initialSize: 5
+      # 最小连接池数量
+      minIdle: 10
+      # 最大连接池数量
+      maxActive: 3000
+      # 配置获取连接等待超时的时间
+      maxWait: 6000
+      # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+      timeBetweenEvictionRunsMillis: 1000
+      # 配置一个连接在池中最小生存的时间,单位是毫秒
+      minEvictableIdleTimeMillis: 5000
+      # 配置一个连接在池中最大生存的时间,单位是毫秒
+      maxEvictableIdleTimeMillis: 9000
+      # 配置检测连接是否有效
+      validationQuery: SELECT 1
+      testWhileIdle: true
+      testOnBorrow: false
+      testOnReturn: false
+      webStatFilter:
+        enabled: true
+      statViewServlet:
+        enabled: true
+        # 设置白名单,不填则允许所有访问
+        allow:
+        url-pattern: /druid/*
+        # 控制台管理用户名和密码
+        login-username: admin
+        login-password: 123456
+      filter:
+        stat:
+          enabled: true
+          # 慢SQL记录
+          log-slow-sql: true
+          slow-sql-millis: 1000
+          merge-sql: true
+        wall:
+          config:
+            multi-statement-allow: true
+  # kafka
+  kafka:
+    bootstrap-servers: app2833:9094,app2834:9094,app2835:9094,app2836:9094,app2837:9094
+    consumer:
+      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
+      auto-commit-interval: 1S
+      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
+      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
+      auto-offset-reset: latest
+      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
+      enable-auto-commit: false
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+    producer:
+      retries: 3
+      batch-size: 16384
+      buffer-memory: 33554432
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+      acks: 1
+    properties:
+      security:
+        protocol: SASL_PLAINTEXT
+      sasl:
+        mechanism: SCRAM-SHA-512
+        jaas:
+          config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="user01" password="8b9dcf43";'
+    listener:
+      # 在侦听器容器中运行的线程数。
+      concurrency: 5
+      #listner负责ack,每调用一次,就立即commit
+      ack-mode: manual_immediate
+      missing-topics-fatal: false
+      type: batch
+
+  # redis 配置
+  redis:
+    # 地址
+    host: 74.10.28.106
+    port: 6379
+    # 数据库索引
+    database: 3
+    # 密码
+    password: 
+    # 连接超时时间
+    timeout: 10s
+    lettuce:
+      pool:
+        # 连接池中的最小空闲连接
+        min-idle: 0
+        # 连接池中的最大空闲连接
+        max-idle: 8
+        # 连接池的最大数据库连接数
+        max-active: 8
+        # #连接池最大阻塞等待时间(使用负值表示没有限制)
+        max-wait: -1ms
+
+geoserver:
+   redis:
+     url: redis://74.10.28.106:6379
+     catalog: geomesa
+   kafka:
+     brokers: 74.10.28.27:9092
+     zookeepers: 74.10.28.27:2181
+
+# other config
+taiji:
+  elasticsearch.rest:
+    uris: 74.10.28.65:9200,74.10.28.66:9200,74.10.28.67:9200,74.10.28.68:9200,74.10.28.69:9200
+    username: ax_seat       #如果你设置了基于x-pack的验证就要填写账号和密码
+    password: ax_seat       #没有则不用配置
+    connection-timeout: 100 #连接超时
+    max-connection: 100  #最大连接数
+  kafka.consumer:
+    geomesa-beidou-ship:
+      topic: geomesa_beidou_ship_topic
+      group: ${random.uuid}
+    geomesa-fusion-ship:
+      topic: geomesa_fusion_ship_topic
+      group: ${random.uuid}
+    beidou-dynamic-ship:
+      group: ${random.uuid}
+      topic: 'taiji_ax_beidou_dynamic_ship'
+      partitions0: 0
+      partitions1: 1
+      partitions2: 2
+      partitions3: 3
+      partitions4: 4
+    dynamic-ship-track:
+      topic: 'taiji_ax_ship_dynamic_fusion'
+      id-key: 'merge_target'
+      longitude-key: 'target_longitude'
+      latitude-key: 'target_latitude'
+      group: ${random.uuid}
+      partitions0: 0
+      partitions1: 1
+      partitions2: 2
+      partitions3: 3
+      partitions4: 4
+    ztpt_dynamic_ais:
+      group: ${random.uuid}
+      partitions0: 0
+      partitions1: 1
+      partitions2: 2
+      partitions3: 3
+      partitions4: 4
+      topic: 'taiji_ax_ztpt_dynamic_ais'
+      id-key: 'userid'
+      longitude-key: 'longitude'
+      latitude-key: 'latitude'
+    hlx_ax_dynamic_target_ais:
+      group: ${random.uuid}
+      topic: 'hlx_ax_dynamic_target_ais'
+      partitions0: 0
+      partitions1: 1
+      partitions2: 2
+      partitions3: 3
+      partitions4: 4
+    hlx_ax_dynamic_target_radar:
+      group: ${random.uuid}
+      topic: 'hlx_ax_dynamic_target_radar'
+      partitions0: 0
+      partitions1: 1
+      partitions2: 2
+      partitions3: 3
+      partitions4: 4
+    hlx_ax_dynamic_target_zyh_radar:
+      group: ${random.uuid}
+      topic: 'hlx_ax_dynamic_target_zyh_radar'
+      partitions0: 0
+      partitions1: 1
+      partitions2: 2
+      partitions3: 3
+      partitions4: 4
+    tianao_radar_fusion:
+      group: ${random.uuid}
+      topic: 'taiji_ax_tianao_radar_fusion'
+      latitude-key: 'latitude'
+      longitude-key: 'longitude'
+      id-key: 'fusionBatchNum'
+      partitions0: 0
+      partitions1: 1
+      partitions2: 2
+      partitions3: 3
+      partitions4: 4
+    prefix:
+      ship_borne_terminal_redis_key_prefix: ship_borne_terminal_118_
+      redis_scan_count: 1
+  async-thread-pool:
+    beidou-dynamic-ship:
+      core-pool-size: 200
+      max-pool-size: 200
+      queue-capacity: 200
+      name-prefix: beidou-dynamic-ship-
+
+# 开启雷达船过滤
+filter:
+  radar: false
+
+# 定时任务时间配置
+time:
+  radar: 0 0/2 * * * ?
+  other: 0 0/1 * * * ?
+  ais: 0 0/1 * * * ?
+  hlxais: 0 0/1 * * * ?
+  hlxdynamicradar: 0 0/1 * * * ?
+  zyhradar: 0 0/1 * * * ?
+  tianao: 0 0/1 * * * ?
+
+# 拆分船舶最后一次位置消费
+ship:
+  track:
+    beidou: true
+    fusion: false
+    hlxRadar: false
+    hlxAis: false
+    zyhRadar: false
+    tianaoRadar: false
+    ais: false

+ 3 - 0
beidou-track-geomesa/src/main/resources/application.yml

@@ -0,0 +1,3 @@
+spring:
+  profiles:
+    active: dev

+ 15 - 0
beidou-track-geomesa/src/main/resources/log4j.properties

@@ -0,0 +1,15 @@
+# log to stdout by default
+log4j.rootLogger=info, stdout
+
+# set logging levels to appropriate values
+log4j.logger.org.locationtech.geomesa=info
+log4j.logger.org.apache.zookeeper=warn
+log4j.logger.org.apache.curator=warn
+log4j.logger.org.apache.hadoop=warn
+log4j.logger.org.apache.redis=warn
+log4j.logger.hsqldb.db=warn
+
+# log to stderr by default instead of std out
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.Target=System.out