123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- package cn.com.taiji.consumer;
- import cn.com.taiji.model.HlxAisPackage;
- import cn.com.taiji.model.HlxHead;
- import cn.com.taiji.model.HlxRadarPackage;
- import cn.com.taiji.protobuf.Target;
- import cn.com.taiji.service.HlxAisService;
- import cn.com.taiji.utils.ByteArrayUtils;
- import com.alibaba.fastjson.JSON;
- import com.google.protobuf.InvalidProtocolBufferException;
- import com.google.protobuf.util.JsonFormat;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.support.Acknowledgment;
- import org.springframework.stereotype.Component;
- import java.util.List;
- import java.util.Optional;
- /**
- * @author kok20
- * @date 2023/3/3
- */
- @Component
- public class HlxAisConsumer {
- @Autowired
- HlxAisService hlxAisService;
- @KafkaListener(topics = "hlx_ax_original_ais", groupId = "20230303yougi")
- public void aisConsumer(List<ConsumerRecord<byte[],byte[]>> records, Acknowledgment ack) {
- for (ConsumerRecord<byte[],byte[]> record : records) {
- Optional<byte[]> kafkaMessage = Optional.ofNullable(record.value());
- kafkaMessage.ifPresent(result -> {
- byte[] head = new byte[21];
- byte[] entry = new byte[result.length-21];
- // System.out.println("protobuf数据bytes[]:" + ByteArrayUtils.bytes2hexDisplayHex(result));
- // System.out.println("protobuf序列化大小: " + result.length);
- System.arraycopy(result,0,head,0,21);
- System.arraycopy(result,21,entry,0,result.length-21);
- Target.TargetAisList list = null;
- try {
- HlxHead hlxHead = hlxAisService.getHeadFromByte(head);
- // System.out.println("hlxHead:\n" + JSON.toJSONString(hlxHead));
- list = Target.TargetAisList.parseFrom(entry);
- String jsonObject = JsonFormat.printer().print(list);
- // System.out.println("Json格式化结果:\n" + jsonObject);
- // System.out.println("Json格式化数据大小: " + jsonObject.getBytes().length);
- HlxAisPackage pack = new HlxAisPackage();
- pack.setHead(JSON.toJSONString(hlxHead));
- pack.setAisList(jsonObject);
- hlxAisService.putAisList(pack);
- } catch (InvalidProtocolBufferException e) {
- // e.printStackTrace();
- }
- });
- }
- ack.acknowledge();
- }
- @KafkaListener(topics = "hlx_ax_original_radar", groupId = "20230303yougi")
- public void radarConsumer(List<ConsumerRecord<byte[],byte[]>> records, Acknowledgment ack) {
- for (ConsumerRecord<byte[],byte[]> record : records) {
- Optional<byte[]> kafkaMessage = Optional.ofNullable(record.value());
- kafkaMessage.ifPresent(result -> {
- byte[] head = new byte[21];
- byte[] entry = new byte[result.length-21];
- System.arraycopy(result,0,head,0,21);
- System.arraycopy(result,21,entry,0,result.length-21);
- System.out.println("protobuf数据bytes[]:" + ByteArrayUtils.bytes2hexDisplayHex(result));
- System.out.println("protobuf序列化大小: " + result.length);
- Target.TargetRadarList list = null;
- try {
- HlxHead hlxHead = hlxAisService.getHeadFromByte(head);
- list = Target.TargetRadarList.parseFrom(entry);
- String jsonObject = JsonFormat.printer().print(list);
- System.out.println("Json格式化结果:\n" + jsonObject);
- System.out.println("Json格式化数据大小: " + jsonObject.getBytes().length);
- HlxRadarPackage pack = new HlxRadarPackage();
- pack.setHead(JSON.toJSONString(hlxHead));
- pack.setRadarList(jsonObject);
- hlxAisService.putRadarList(pack);
- } catch (InvalidProtocolBufferException e) {
- e.printStackTrace();
- }
- });
- }
- ack.acknowledge();
- }
- }
|