ソースを参照

修改实时轨迹数据源,由原本的kafka改为船舶定位表联查,并增加状态字段。

liangjianf 2 年 前
コミット
35dc7ed0a2

+ 2 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/AbstractRedisDataStoreConfig.java

@@ -41,4 +41,6 @@ public abstract class AbstractRedisDataStoreConfig {
     }
 
     abstract SimpleFeatureType sft(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaTrackDTO) throws IOException;
+
+    abstract SimpleFeatureType sftLocation(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaLocationDTO) throws IOException;
 }

+ 27 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/config/BeidouShipRedisDataStoreConfig.java

@@ -1,5 +1,6 @@
 package cn.com.taiji.track.config;
 
+import cn.com.taiji.track.dto.BeidouShipLocationDTO;
 import cn.com.taiji.track.dto.BeidouShipTrackDTO;
 import cn.com.taiji.track.dto.IGeomesaTrackDTO;
 import cn.hutool.core.util.StrUtil;
@@ -46,6 +47,24 @@ public class BeidouShipRedisDataStoreConfig extends AbstractRedisDataStoreConfig
         return sft;
     }
 
+    @Override
+    @Bean(name = "sftLocation")
+    public SimpleFeatureType sftLocation(RedisDataStore redisDataStore, IGeomesaTrackDTO geomesaLocationDTO) throws IOException {
+        SimpleFeatureType sft = SimpleFeatureTypes.createType(geomesaLocationDTO.getTypeName(),  geomesaLocationDTO.getSimpleFeatureType().toString());
+        sft.getUserData().put("geomesa.feature.expiry", StrUtil.format("syncTime({} seconds)", expiry));
+
+        try {
+            redisDataStore.getFeatureSource(geomesaLocationDTO.getTypeName()).removeFeatures(Filter.INCLUDE);
+            redisDataStore.removeSchema(geomesaLocationDTO.getTypeName());
+        } catch (Exception e) {
+            log.error("可能属于首次构建,SCHEMA 并不存在");
+            e.printStackTrace();
+        }
+
+        redisDataStore.createSchema(sft);
+        return sft;
+    }
+
     @Bean(name = "geomesaTrackDTO")
     public BeidouShipTrackDTO geomesaTrackDTO() {
         BeidouShipTrackDTO dto = new BeidouShipTrackDTO();
@@ -53,4 +72,12 @@ public class BeidouShipRedisDataStoreConfig extends AbstractRedisDataStoreConfig
         dto.setExpirySeconds(expiry);
         return dto;
     }
+
+    @Bean(name = "geomesaLocationDTO")
+    public BeidouShipLocationDTO geomesaLocationDTO() {
+        BeidouShipLocationDTO dto = new BeidouShipLocationDTO();
+        dto.setKafkaConsumerGroup(group);
+        dto.setExpirySeconds(expiry);
+        return dto;
+    }
 }

+ 1 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/consumer/BeidouShipTrackConsumer.java

@@ -32,7 +32,7 @@ public class BeidouShipTrackConsumer {
             topics = {"${taiji.kafka.consumer.beidou.topic}"}
     )
     public void dynamicTrack(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
-        beidouTrackService.beidouDynamicShipToRedis(records);
+//        beidouTrackService.beidouDynamicShipToRedis(records);
         beidouTrackService.beidouDynamicShipToCache(records);
         beidouTrackService.beidouDynamicShipToKafka(records);
         beidouLocationService.beidouDynamicShipToCache(records);

+ 198 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/dto/BeidouShipLocationDTO.java

@@ -0,0 +1,198 @@
+package cn.com.taiji.track.dto;
+
+import cn.com.taiji.track.entity.ShipStatusEntity;
+import cn.hutool.core.util.StrUtil;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.util.factory.Hints;
+import org.opengis.feature.simple.SimpleFeature;
+
+import java.io.Serializable;
+import java.util.Date;
+
+
+/**
+ * @author kok20
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class BeidouShipLocationDTO extends IGeomesaTrackDTO implements Serializable {
+
+    private static final long serialVersionUID = -8983779632713666636L;
+
+    private String deviceId;
+    private String location;
+    private String shipType;
+    private String workType;
+    private String workWay;
+    private String sendTime;
+    private String locationTime;
+    private String online;
+    private String shipLength;
+    private String shipWidth;
+    private String texture;
+    private String longitude;
+    private String latitude;
+    private String direction;
+    private String speed;
+    private String kwh;
+    private String shipName;
+    private String isOnline;
+    private String isInport;
+    private String isAnchor;
+    private String deviceStatus;
+
+    @Override
+    public boolean checkPoint() {
+        if (location != null) {
+            longitude = location.substring(location.indexOf("(")+1,location.indexOf(" ",location.indexOf("(")));
+            latitude = location.substring(location.indexOf(" ",location.indexOf("("))+1,location.indexOf(")"));
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public String getTypeName() { return "beidou-data"; }
+
+    @Override
+    public String getLayerType() {
+        return "geo_beidou_ship";
+    }
+
+    @Override
+    public String getWsLayerName() {
+        return "beidou-ship";
+    }
+
+    @Override
+    public String getFid() {
+        return deviceId;
+    }
+
+    @Override
+    public StringBuilder getSimpleFeatureType(){
+            StringBuilder attributes = new StringBuilder();
+            attributes.append("deviceId:String:index=true,");
+            attributes.append("*location:Point:srid=4326,");
+            attributes.append("shipType:String,");
+            attributes.append("workType:String,");
+            attributes.append("workWay:String,");
+            attributes.append("sendTime:String,");
+            attributes.append("locationTime:String,");
+            attributes.append("online:String,");
+            attributes.append("shipLength:String,");
+            attributes.append("shipWidth:String,");
+            attributes.append("texture:String,");
+            attributes.append("longitude:String,");
+            attributes.append("latitude:String,");
+            attributes.append("direction:String,");
+            attributes.append("speed:String,");
+            attributes.append("kwh:String,");
+            attributes.append("shipName:String,");
+            attributes.append("isOnline:String,");
+            attributes.append("isInport:String,");
+            attributes.append("isAnchor:String,");
+            attributes.append("deviceStatus:String,");
+
+            attributes.append("layerType:String,");
+            attributes.append("syncTime:Date");
+            return attributes;
+    }
+
+    @Override
+    public SimpleFeature toSimpleFeature(SimpleFeatureBuilder builder, String fid) {
+        builder.set("deviceId", deviceId);
+        builder.set("location", location);
+        builder.set("shipType", shipType);
+        builder.set("workType", workType);
+        builder.set("workWay", workWay);
+        builder.set("sendTime", sendTime);
+        builder.set("locationTime", locationTime);
+        builder.set("online", online);
+        builder.set("shipLength", shipLength);
+        builder.set("shipWidth", shipWidth);
+        builder.set("texture", texture);
+        builder.set("longitude", longitude);
+        builder.set("latitude", latitude);
+        builder.set("direction", direction);
+        builder.set("speed", speed);
+        builder.set("kwh", kwh);
+        builder.set("shipName", shipName);
+        builder.set("isOnline", isOnline);
+        builder.set("isInport", isInport);
+        builder.set("isAnchor", isAnchor);
+        if(deviceStatus!=null){
+            builder.set("deviceStatus", deviceStatus);
+        }else{
+            builder.set("deviceStatus", 0);
+        }
+
+        builder.set("layerType", getLayerType());
+        builder.set("syncTime", new Date());
+        builder.featureUserData(Hints.USE_PROVIDED_FID, Boolean.TRUE);
+        return builder.buildFeature(fid);
+    }
+
+    @Override
+    public String[] getPropList(){
+        return new String[] {
+//                "deviceId",
+                "location",
+                "shipType",
+                "workType",
+                "workWay",
+                "sendTime",
+                "locationTime",
+                "online",
+                "shipLength",
+                "shipWidth",
+                "texture",
+                "longitude",
+                "latitude",
+                "direction",
+                "speed",
+                "kwh",
+                "shipName",
+                "isOnline",
+                "isInport",
+                "isAnchor",
+                "deviceStatus",
+                "layerType",
+                "syncTime"
+        };
+    }
+
+    @Override
+    public Object[] getValueList() {
+        return new Object[] {
+                deviceId,
+                location,
+                shipType,
+                workType,
+                workWay,
+                sendTime,
+                locationTime,
+                online,
+                shipLength,
+                shipWidth,
+                texture,
+                longitude,
+                latitude,
+                direction,
+                speed,
+                kwh,
+                shipName,
+                isOnline,
+                isInport,
+                isAnchor,
+                deviceStatus,
+                getLayerType(),
+                new Date()
+        };
+    }
+}

+ 14 - 2
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouLocationEntity.java

@@ -1,11 +1,15 @@
 package cn.com.taiji.track.entity;
 
+import com.alibaba.fastjson.JSON;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
+import com.fasterxml.jackson.annotation.JsonFormat;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.util.Date;
+
 /**
  * @author wudskq
  */
@@ -18,11 +22,19 @@ public class BeidouLocationEntity {
     @TableId
     private String deviceId;
     private String location;
+    private String shipType;
+    private String workType;
+    private String workWay;
+    private String sendTime;
     private String locationTime;
+    private String online;
+    private String shipLength;
+    private String shipWidth;
+    private String texture;
     private String longitude;
     private String latitude;
-    private String kwh;
-    private String shipName;
     private String direction;
     private String speed;
+    private String kwh;
+    private String shipName;
 }

+ 12 - 12
beidou-track-geomesa/src/main/java/cn/com/taiji/track/entity/BeidouTrackEntity.java

@@ -18,19 +18,19 @@ public class BeidouTrackEntity extends BaseEntity {
 
     private Long trackId;
     private String deviceId;
-    private Long shipType;
-    private Long workType;
-    private Integer workWay;
+    private String shipType;
+    private String workType;
+    private String workWay;
     private String sendTime;
     private String locationTime;
-    private Integer online;
-    private Integer shipLength;
-    private Integer shipWidth;
-    private Integer texture;
-    private Double longitude;
-    private Double latitude;
-    private Double direction;
-    private Double speed;
-    private Integer kwh;
+    private String online;
+    private String shipLength;
+    private String shipWidth;
+    private String texture;
+    private String longitude;
+    private String latitude;
+    private String direction;
+    private String speed;
+    private String kwh;
     private String shipName;
 }

+ 1 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/listener/HistoryTrackChannelListener.java

@@ -73,6 +73,7 @@ public class HistoryTrackChannelListener {
             if(StringUtils.hasText(dto.getStartTime())&&StringUtils.hasText(dto.getEndTime())){
                 wrapper.between("location_time",dto.getStartTime(),dto.getEndTime());
             }
+            wrapper.orderByAsc("location_time");
             results = beidouTrackService.list(wrapper);
         }
         historyTrackHandler.sendMsgToSpecifyChannel(event.getChannelId(), JSONUtil.toJsonStr(results));

+ 9 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/mapper/BeidouLocationMapper.java

@@ -1,8 +1,12 @@
 package cn.com.taiji.track.mapper;
 
+import cn.com.taiji.track.dto.BeidouShipLocationDTO;
 import cn.com.taiji.track.entity.BeidouLocationEntity;
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Select;
+
+import java.util.List;
 
 /**
  * @author chenfangchao
@@ -13,6 +17,10 @@ import org.apache.ibatis.annotations.Mapper;
  */
 @Mapper
 public interface BeidouLocationMapper extends BaseMapper<BeidouLocationEntity> {
-
+    @Select("SELECT l.*,s.is_online,s.is_inport,s.is_anchor,d.dispose_type " +
+            "FROM ax_beidou_ship_location l\n" +
+            "LEFT JOIN ax_beidou_ship_status s on s.device_id = l.device_id\n" +
+            "LEFT JOIN ax_beidou_ship_dispose d on d.devide_no = l.device_id")
+    List<BeidouShipLocationDTO> listLocation();
 }
 

+ 3 - 3
beidou-track-geomesa/src/main/java/cn/com/taiji/track/schedule/LocationSchedule.java

@@ -17,9 +17,9 @@ public class LocationSchedule {
     @Autowired
     private IBeidouLocationService beidouLocationService;
 
-    @Scheduled(cron = "*/10 * * * * ?")
-    private void cacheLocationToMySql() {
-        beidouLocationService.queryLocationToRedis();
+    @Scheduled(cron = "*/3 * * * * ?")
+    private void listLocationToCache() {
+        beidouLocationService.listLocationToCache();
     }
 }
 

+ 1 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouLocationService.java

@@ -17,5 +17,5 @@ public interface IBeidouLocationService extends IService<BeidouLocationEntity> {
 
     void cacheBeidouToMySql();
 
-    void queryLocationToRedis();
+    void listLocationToCache();
 }

+ 1 - 0
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/IBeidouTrackService.java

@@ -1,5 +1,6 @@
 package cn.com.taiji.track.service;
 
+import cn.com.taiji.track.entity.BeidouLocationEntity;
 import cn.com.taiji.track.entity.BeidouTrackEntity;
 import com.baomidou.mybatisplus.extension.service.IService;
 import org.apache.kafka.clients.consumer.ConsumerRecord;

+ 93 - 3
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouLocationServiceImpl.java

@@ -1,13 +1,31 @@
 package cn.com.taiji.track.service.impl;
 
+import cn.com.taiji.track.dto.BeidouShipLocationDTO;
+import cn.com.taiji.track.dto.IGeomesaTrackDTO;
 import cn.com.taiji.track.entity.BeidouLocationEntity;
 import cn.com.taiji.track.mapper.BeidouLocationMapper;
 import cn.com.taiji.track.service.IBeidouLocationService;
+import cn.com.taiji.track.service.IShipStatusService;
 import cn.com.taiji.track.utils.LatLngUtil;
+import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.geotools.data.FeatureReader;
+import org.geotools.data.FeatureWriter;
+import org.geotools.data.Query;
+import org.geotools.data.Transaction;
+import org.geotools.factory.CommonFactoryFinder;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.filter.identity.FeatureIdImpl;
+import org.geotools.util.factory.Hints;
+import org.locationtech.geomesa.redis.data.RedisDataStore;
+import org.opengis.feature.simple.SimpleFeature;
+import org.opengis.feature.simple.SimpleFeatureType;
+import org.opengis.filter.Filter;
+import org.opengis.filter.FilterFactory2;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.math.BigDecimal;
@@ -21,8 +39,18 @@ import java.util.*;
 @Service
 @Slf4j
 public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper, BeidouLocationEntity> implements IBeidouLocationService {
+    @Autowired
+    protected IShipStatusService shipStatusService;
+
+    private Map<String,BeidouShipLocationDTO> locationMaps= new HashMap<>();
     private Vector<BeidouLocationEntity> locationEntities= new Vector<>();
 
+    @Autowired
+    private SimpleFeatureType sftLocation;
+    @Autowired
+    private RedisDataStore redisDataStore;
+    @Autowired
+    private IGeomesaTrackDTO geomesaLocationDTO;
     @Override
     public void beidouDynamicShipToCache(List<ConsumerRecord<?, ?>> records){
         try {
@@ -31,12 +59,14 @@ public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper,
                 if (message.isPresent()) {
                     Object msg = message.get();
                     BeidouLocationEntity locationEntity = JSONObject.parseObject(msg.toString(), BeidouLocationEntity.class);
-                    locationEntity.setLocation(String.format("POINT(%s %s)",locationEntity.getLongitude(),locationEntity.getLatitude()));
+                    String location = StrUtil.format("POINT ({} {})", locationEntity.getLongitude(),locationEntity.getLatitude());
+                    locationEntity.setLocation(location);
                     BigDecimal longitudeDecimal = new BigDecimal(locationEntity.getLongitude());
                     locationEntity.setLongitude(LatLngUtil.latLng2Dfm(longitudeDecimal.doubleValue()));
                     BigDecimal latitudeDecimal = new BigDecimal(locationEntity.getLatitude());
                     locationEntity.setLatitude(LatLngUtil.latLng2Dfm(latitudeDecimal.doubleValue()));
                     locationEntities.add(locationEntity);
+                    shipStatusService.pushOnLineDeviceId(locationEntity.getDeviceId());
                 }
             }
         } catch (Exception e) {
@@ -56,7 +86,67 @@ public class BeidouLocationServiceImpl extends ServiceImpl<BeidouLocationMapper,
     }
 
     @Override
-    public void queryLocationToRedis() {
-
+    public void listLocationToCache() {
+        List<BeidouShipLocationDTO> results = baseMapper.listLocation();
+        if (results.size() > 0) {
+            FeatureReader<SimpleFeatureType, SimpleFeature> reader = null;
+            FeatureWriter<SimpleFeatureType, SimpleFeature> writer = null;
+            try {
+                writer = redisDataStore.getFeatureWriterAppend(sftLocation.getTypeName(), Transaction.AUTO_COMMIT);
+                SimpleFeatureBuilder builder = new SimpleFeatureBuilder(sftLocation);
+                for (IGeomesaTrackDTO dto : results) {
+                    dto.setKafkaConsumerGroup(geomesaLocationDTO.getKafkaConsumerGroup());
+                    dto.setExpirySeconds(geomesaLocationDTO.getExpirySeconds());
+                    if (!dto.checkPoint()) {
+                        continue;
+                    }
+                    String fid = dto.getFid();
+                    boolean append = true;
+                    FilterFactory2 filterFactory = CommonFactoryFinder.getFilterFactory2();
+                    Filter filter = null;
+                    if (null != fid && !"".equals(fid)) {
+                        filter = filterFactory.id(filterFactory.featureId(fid));
+                    } else {
+                        String id = UUID.randomUUID().toString();
+                        filter = filterFactory.id(filterFactory.featureId(id));
+                    }
+                    Query query = new Query(dto.getTypeName(), filter);
+                    reader = redisDataStore.getFeatureReader(query, Transaction.AUTO_COMMIT);
+                    if (reader.hasNext()) {
+                        append = false;
+                        String[] propList = dto.getPropList();
+                        Object[] valueList = dto.getValueList();
+                        redisDataStore.getFeatureSource(dto.getTypeName()).modifyFeatures(propList, valueList, filter);
+                    }
+                    reader.close();
+                    reader = null;
+                    if (append) {
+                        SimpleFeature toWrite = writer.next();
+                        builder.reset();
+                        SimpleFeature feature = dto.toSimpleFeature(builder, fid);
+                        toWrite.setAttributes(feature.getAttributes());
+                        toWrite.getUserData().put(Hints.USE_PROVIDED_FID, Boolean.TRUE);
+                        ((FeatureIdImpl) toWrite.getIdentifier()).setID(feature.getID());
+                        toWrite.getUserData().putAll(feature.getUserData());
+                        writer.write();
+                    }
+                }
+            } catch (Exception e) {
+                log.error("=======插入geomesa发生异常============");
+                e.printStackTrace();
+            } finally {
+                try {
+                    if (null != writer) {
+                        writer.close();
+                    }
+                    if (null != reader) {
+                        reader.close();
+                    }
+                } catch (Exception e) {
+                    log.error("=======close操作发生异常============");
+                    e.printStackTrace();
+                }
+            }
+        }
     }
 }

+ 0 - 1
beidou-track-geomesa/src/main/java/cn/com/taiji/track/service/impl/BeidouTrackServiceImpl.java

@@ -131,7 +131,6 @@ public class BeidouTrackServiceImpl extends ServiceImpl<BeidouTrackMapper, Beido
         }
 //        log.info("轨迹数据写入图层完成!耗时 {} 毫秒, 从 消费组 {} 中合计消费 {} 条记录", (new Date()).getTime() - begain.getTime(), geomesaTrackDTO.getKafkaConsumerGroup(), records.size());
     }
-
     private  Vector<BeidouTrackEntity> trackEntities= new Vector<>();
 
     @Override

+ 4 - 1
beidou-track-geomesa/src/main/resources/application-prod.yml

@@ -17,6 +17,9 @@ webSocket:
   focus-track:
     port: 7002
     path: /focustrack
+  anchor-track:
+    port: 7003
+    path: /anchortrack
 spring:
   datasource:
     type: com.alibaba.druid.pool.DruidDataSource
@@ -103,7 +106,7 @@ taiji:
       beidou:
         topic: taiji_ax_beidou_dynamic_ship
     consumer:
-      bootstrap-servers: 172.28.19.20:29092
+      bootstrap-servers: 10.112.89.239:9092
       beidou:
         enable: true
         expiry: 70