Selaa lähdekoodia

增加数据入库逻辑

liangjianf 2 vuotta sitten
vanhempi
commit
0454c3bf1f

+ 5 - 1
protobuf_hlx/src/main/java/cn/com/taiji/ProtobufHlxApplication.java

@@ -2,10 +2,14 @@ package cn.com.taiji;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
+/**
+ * @author kok20
+ */
+@EnableScheduling
 @SpringBootApplication
 public class ProtobufHlxApplication {
-
     public static void main(String[] args) {
         SpringApplication.run(ProtobufHlxApplication.class, args);
     }

+ 4 - 32
protobuf_hlx/src/main/java/cn/com/taiji/consumer/HlxAisConsumer.java

@@ -42,7 +42,10 @@ public class HlxAisConsumer {
 
                 Target.TargetAisList list = null;
                 try {
-                    HlxHead hlxHead = hlxAisService.getHeadFromByte(head);
+                    HlxHead hlxHead = HlxHead.getHeadFromByte(head);
+                    if(3==hlxHead.getType()){
+                        return;
+                    }
 //                    System.out.println("hlxHead:\n" + JSON.toJSONString(hlxHead));
                     list = Target.TargetAisList.parseFrom(entry);
                     String jsonObject = JsonFormat.printer().print(list);
@@ -61,35 +64,4 @@ public class HlxAisConsumer {
         ack.acknowledge();
     }
 
-    @KafkaListener(topics = "hlx_ax_original_radar", groupId = "20230303yougi")
-    public void radarConsumer(List<ConsumerRecord<byte[],byte[]>> records, Acknowledgment ack) {
-        for (ConsumerRecord<byte[],byte[]> record : records) {
-            Optional<byte[]> kafkaMessage = Optional.ofNullable(record.value());
-            kafkaMessage.ifPresent(result -> {
-                byte[] head = new byte[21];
-                byte[] entry = new byte[result.length-21];
-                System.arraycopy(result,0,head,0,21);
-                System.arraycopy(result,21,entry,0,result.length-21);
-                System.out.println("protobuf数据bytes[]:" + ByteArrayUtils.bytes2hexDisplayHex(result));
-                System.out.println("protobuf序列化大小: " + result.length);
-
-                Target.TargetRadarList list = null;
-                try {
-                    HlxHead hlxHead = hlxAisService.getHeadFromByte(head);
-                    list = Target.TargetRadarList.parseFrom(entry);
-                    String jsonObject = JsonFormat.printer().print(list);
-                    System.out.println("Json格式化结果:\n" + jsonObject);
-                    System.out.println("Json格式化数据大小: " + jsonObject.getBytes().length);
-                    HlxRadarPackage pack = new HlxRadarPackage();
-                    pack.setHead(JSON.toJSONString(hlxHead));
-                    pack.setRadarList(jsonObject);
-                    hlxAisService.putRadarList(pack);
-                } catch (InvalidProtocolBufferException e) {
-                    e.printStackTrace();
-                }
-            });
-
-        }
-        ack.acknowledge();
-    }
 }

+ 65 - 0
protobuf_hlx/src/main/java/cn/com/taiji/consumer/HlxRadarConsumer.java

@@ -0,0 +1,65 @@
+package cn.com.taiji.consumer;
+
+import cn.com.taiji.model.HlxAisPackage;
+import cn.com.taiji.model.HlxHead;
+import cn.com.taiji.model.HlxRadarPackage;
+import cn.com.taiji.protobuf.Target;
+import cn.com.taiji.service.HlxAisService;
+import cn.com.taiji.service.HlxRadarService;
+import com.alibaba.fastjson.JSON;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.util.JsonFormat;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @author kok20
+ * @date 2023/3/3
+ */
+@Component
+public class HlxRadarConsumer {
+
+    @Autowired
+    HlxRadarService hlxRadarService;
+
+    @KafkaListener(topics = "hlx_ax_original_radar", groupId = "20230303yougi")
+    public void radarConsumer(List<ConsumerRecord<byte[],byte[]>> records, Acknowledgment ack) {
+        for (ConsumerRecord<byte[],byte[]> record : records) {
+            Optional<byte[]> kafkaMessage = Optional.ofNullable(record.value());
+            kafkaMessage.ifPresent(result -> {
+                byte[] head = new byte[21];
+                byte[] entry = new byte[result.length-21];
+                System.arraycopy(result,0,head,0,21);
+                System.arraycopy(result,21,entry,0,result.length-21);
+//                System.out.println("protobuf数据bytes[]:" + ByteArrayUtils.bytes2hexDisplayHex(result));
+//                System.out.println("protobuf序列化大小: " + result.length);
+
+                Target.TargetRadarList list = null;
+                try {
+                    HlxHead hlxHead = HlxHead.getHeadFromByte(head);
+                    if(3==hlxHead.getType()){
+                        return;
+                    }
+                    list = Target.TargetRadarList.parseFrom(entry);
+                    String jsonObject = JsonFormat.printer().print(list);
+//                    System.out.println("Json格式化结果:\n" + jsonObject);
+//                    System.out.println("Json格式化数据大小: " + jsonObject.getBytes().length);
+                    HlxRadarPackage pack = new HlxRadarPackage();
+                    pack.setHead(JSON.toJSONString(hlxHead));
+                    pack.setRadarList(jsonObject);
+                    hlxRadarService.putRadarList(pack);
+                } catch (InvalidProtocolBufferException e) {
+//                    e.printStackTrace();
+                }
+            });
+
+        }
+        ack.acknowledge();
+    }
+}

+ 4 - 1
protobuf_hlx/src/main/java/cn/com/taiji/controller/protobufController.java

@@ -4,6 +4,7 @@ import cn.com.taiji.model.HlxAisPackage;
 import cn.com.taiji.model.HlxRadarPackage;
 import cn.com.taiji.protobuf.Target;
 import cn.com.taiji.service.HlxAisService;
+import cn.com.taiji.service.HlxRadarService;
 import com.alibaba.excel.EasyExcel;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.web.bind.annotation.GetMapping;
@@ -26,6 +27,8 @@ import java.util.List;
 public class protobufController {
     @Autowired
     HlxAisService hlxAisService;
+    @Autowired
+    HlxRadarService hlxRadarService;
 
     @PostMapping("/hlxais/excel")
     public void hlxAisExcel(HttpServletResponse response) throws IOException, ParseException {
@@ -47,7 +50,7 @@ public class protobufController {
     @PostMapping("/hlxradar/excel")
     public void hlxRadarExcel(HttpServletResponse response) throws IOException, ParseException {
         try {
-            List<HlxRadarPackage> list = hlxAisService.getRadarList();
+            List<HlxRadarPackage> list = hlxRadarService.getRadarList();
             System.out.println("导出海兰信雷达原始数据");
             response.setContentType("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet");
             response.setCharacterEncoding("utf-8");

+ 26 - 3
protobuf_hlx/src/main/java/cn/com/taiji/enetity/HlxAis.java

@@ -4,17 +4,40 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import com.baomidou.mybatisplus.annotation.TableName;
+
+import java.util.Date;
+
 /**
  * @author kok20
  * @date 2023/3/3
  */
 @TableName("hlx_ax_original_ais")
 public class HlxAis {
-    private byte[] value;
-    public byte[] getValue() {
+    private String header;
+    private String value;
+    private Date createTime;
+
+    public String getHeader() {
+        return header;
+    }
+
+    public void setHeader(String header) {
+        this.header = header;
+    }
+
+    public String getValue() {
         return value;
     }
-    public void setValue(byte[] value) {
+
+    public void setValue(String value) {
         this.value = value;
     }
+
+    public Date getCreateTime() {
+        return createTime;
+    }
+
+    public void setCreateTime(Date createTime) {
+        this.createTime = createTime;
+    }
 }

+ 40 - 0
protobuf_hlx/src/main/java/cn/com/taiji/enetity/HlxRadar.java

@@ -0,0 +1,40 @@
+package cn.com.taiji.enetity;
+
+import com.baomidou.mybatisplus.annotation.TableName;
+
+import java.util.Date;
+
+/**
+ * @author kok20
+ * @date 2023/3/3
+ */
+@TableName("hlx_ax_original_radar")
+public class HlxRadar {
+    private String header;
+    private String value;
+    private Date createTime;
+
+    public String getHeader() {
+        return header;
+    }
+
+    public void setHeader(String header) {
+        this.header = header;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+
+    public Date getCreateTime() {
+        return createTime;
+    }
+
+    public void setCreateTime(Date createTime) {
+        this.createTime = createTime;
+    }
+}

+ 14 - 0
protobuf_hlx/src/main/java/cn/com/taiji/mapper/HlxRadarMapper.java

@@ -0,0 +1,14 @@
+package cn.com.taiji.mapper;
+
+import cn.com.taiji.enetity.HlxAis;
+import cn.com.taiji.enetity.HlxRadar;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+
+/**
+ * @author kok20
+ * @date 2023/3/3
+ */
+@Mapper
+public interface HlxRadarMapper extends BaseMapper<HlxRadar> {
+}

+ 23 - 0
protobuf_hlx/src/main/java/cn/com/taiji/model/HlxHead.java

@@ -1,5 +1,7 @@
 package cn.com.taiji.model;
 
+import cn.com.taiji.utils.ByteArrayUtils;
+
 /**
  * @author kok20
  * @date 2023/3/4
@@ -41,4 +43,25 @@ public class HlxHead {
     public void setNum(Long num) {
         this.num = num;
     }
+
+    public static HlxHead getHeadFromByte(byte[] head) {
+        HlxHead hlxHead = new HlxHead();
+        if(head.length == 21){
+            byte[] num = new byte[8];
+            System.arraycopy(head,0,num,0,8);
+            hlxHead.setNum(ByteArrayUtils.byteArray2Long_Little_Endian(num));
+            byte[] timestamp = new byte[8];
+            System.arraycopy(head,8,timestamp,0,8);
+            Long stamp = ByteArrayUtils.byteArray2Long_Little_Endian(timestamp);
+            hlxHead.setTimestamp(String.valueOf(stamp));
+            byte[] type = new byte[1];
+            System.arraycopy(head,16,type,0,1);
+            int t = type[0]&255;
+            hlxHead.setType(t);
+            byte[] length = new byte[4];
+            System.arraycopy(head,17,length,0,4);
+            hlxHead.setLength(ByteArrayUtils.byteArray2Int_Little_Endian(length));
+        }
+        return hlxHead;
+    }
 }

+ 2 - 6
protobuf_hlx/src/main/java/cn/com/taiji/service/HlxAisService.java

@@ -14,13 +14,9 @@ import java.util.List;
  * @date 2023/3/3
  */
 public interface HlxAisService extends IService<HlxAis> {
+    void saveRadarList();
+
     void putAisList(HlxAisPackage p);
 
     List<HlxAisPackage> getAisList();
-
-    void putRadarList(HlxRadarPackage p);
-
-    List<HlxRadarPackage> getRadarList();
-
-    HlxHead getHeadFromByte(byte[] head);
 }

+ 22 - 0
protobuf_hlx/src/main/java/cn/com/taiji/service/HlxRadarService.java

@@ -0,0 +1,22 @@
+package cn.com.taiji.service;
+
+import cn.com.taiji.enetity.HlxAis;
+import cn.com.taiji.enetity.HlxRadar;
+import cn.com.taiji.model.HlxAisPackage;
+import cn.com.taiji.model.HlxHead;
+import cn.com.taiji.model.HlxRadarPackage;
+import com.baomidou.mybatisplus.extension.service.IService;
+
+import java.util.List;
+
+/**
+ * @author kok20
+ * @date 2023/3/3
+ */
+public interface HlxRadarService extends IService<HlxRadar> {
+    void putRadarList(HlxRadarPackage p);
+
+    List<HlxRadarPackage> getRadarList();
+
+    void saveRadarList();
+}

+ 15 - 41
protobuf_hlx/src/main/java/cn/com/taiji/service/impl/HlxAisServiceImpl.java

@@ -3,15 +3,12 @@ package cn.com.taiji.service.impl;
 import cn.com.taiji.enetity.HlxAis;
 import cn.com.taiji.mapper.HlxAisMapper;
 import cn.com.taiji.model.HlxAisPackage;
-import cn.com.taiji.model.HlxHead;
-import cn.com.taiji.model.HlxRadarPackage;
 import cn.com.taiji.service.HlxAisService;
 import cn.com.taiji.utils.ByteArrayUtils;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
-import java.text.SimpleDateFormat;
 import java.util.*;
 
 /**
@@ -21,7 +18,21 @@ import java.util.*;
 @Slf4j
 public class HlxAisServiceImpl extends ServiceImpl<HlxAisMapper, HlxAis> implements HlxAisService {
     private Vector<HlxAisPackage> targetAis = new Vector<>();
-    private Vector<HlxRadarPackage> targetRadar = new Vector<>();
+
+    @Override
+    public void saveRadarList() {
+        List<HlxAisPackage> result = new ArrayList<> (targetAis);
+        List<HlxAis> saveList = new ArrayList<> ();
+        targetAis = new Vector<>();
+        for (HlxAisPackage p : result) {
+            HlxAis ais = new HlxAis();
+            ais.setHeader(p.getHead());
+            ais.setValue(p.getAisList());
+            ais.setCreateTime(new Date());
+            saveList.add(ais);
+        }
+        saveBatch(saveList);
+    }
 
     @Override
     public void putAisList(HlxAisPackage p) {
@@ -35,43 +46,6 @@ public class HlxAisServiceImpl extends ServiceImpl<HlxAisMapper, HlxAis> impleme
         return result;
      }
 
-    @Override
-    public void putRadarList(HlxRadarPackage p) {
-        targetRadar.add(p);
-    }
-
-    @Override
-    public List<HlxRadarPackage> getRadarList() {
-        List<HlxRadarPackage> result = new ArrayList<> (targetRadar);
-        targetRadar.clear();
-        return result;
-    }
-
-    @Override
-    public HlxHead getHeadFromByte(byte[] head) {
-        HlxHead hlxHead = new HlxHead();
-        if(head.length == 21){
-            byte[] num = new byte[8];
-            System.arraycopy(head,0,num,0,8);
-            hlxHead.setNum(ByteArrayUtils.byteArray2Long_Little_Endian(num));
-            byte[] timestamp = new byte[8];
-            System.arraycopy(head,8,timestamp,0,8);
-            Long stamp = ByteArrayUtils.byteArray2Long_Little_Endian(timestamp);
-            Date time = new Date();
-            time.setTime(stamp);
-            hlxHead.setTimestamp(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
-                    .format(time));
-            byte[] type = new byte[1];
-            System.arraycopy(head,16,type,0,1);
-            int t = type[0]&255;
-            hlxHead.setType(t);
-            byte[] length = new byte[4];
-            System.arraycopy(head,17,length,0,4);
-            hlxHead.setLength(ByteArrayUtils.byteArray2Int_Little_Endian(length));
-        }
-        return hlxHead;
-    }
-
     public static void main(String[] args) {
 
         byte[] result = new byte[21];

+ 55 - 0
protobuf_hlx/src/main/java/cn/com/taiji/service/impl/HlxRadarServiceImpl.java

@@ -0,0 +1,55 @@
+package cn.com.taiji.service.impl;
+
+import cn.com.taiji.enetity.HlxRadar;
+import cn.com.taiji.mapper.HlxRadarMapper;
+import cn.com.taiji.model.HlxRadarPackage;
+import cn.com.taiji.service.HlxRadarService;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * @author kok20
+ * @date 2023/3/3
+ */
+@Service
+public class HlxRadarServiceImpl extends ServiceImpl<HlxRadarMapper, HlxRadar> implements HlxRadarService {
+    private Vector<HlxRadarPackage> targetRadar = new Vector<>();
+
+    @Override
+    public void putRadarList(HlxRadarPackage p) {
+        targetRadar.add(p);
+    }
+
+    @Override
+    public List<HlxRadarPackage> getRadarList() {
+        List<HlxRadarPackage> result = new ArrayList<> (targetRadar);
+        targetRadar.clear();
+        return result;
+    }
+    @Override
+    @Scheduled(initialDelay = 5 * 1000, fixedRate = 30 * 1000)
+    public void saveRadarList() {
+        if (targetRadar.isEmpty()){
+            return;
+        }
+        List<HlxRadarPackage> result = new ArrayList<> (targetRadar);
+        List<HlxRadar> saveList = new ArrayList<> ();
+        targetRadar.clear();
+        for (HlxRadarPackage p : result) {
+            HlxRadar radar = new HlxRadar();
+            radar.setHeader(p.getHead());
+            radar.setValue(p.getRadarList());
+            radar.setCreateTime(new Date());
+            saveList.add(radar);
+        }
+        saveBatch(saveList);
+        System.out.println("保存原始雷达数据: " + saveList.size()+"条");
+    }
+
+}

+ 3 - 3
protobuf_hlx/src/main/resources/application-prod.yml

@@ -4,9 +4,9 @@ spring:
   datasource:
     type: com.alibaba.druid.pool.DruidDataSource
     driverClassName: com.mysql.cj.jdbc.Driver
-    url: jdbc:mysql://127.0.0.1:3306/hlx?hlx=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
-    username: root
-    password: root
+    url: jdbc:mysql://74.10.28.2:3389/ax_seat_1?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8&rewriteBatchedStatements=true
+    username: ax_tj_lh
+    password: Taiji@2022#lh
     # 初始连接数
     initialSize: 5
     # 最小连接池数量