package cn.com.taiji.consumer; import cn.com.taiji.model.HlxAisPackage; import cn.com.taiji.model.HlxHead; import cn.com.taiji.model.HlxRadarPackage; import cn.com.taiji.protobuf.Target; import cn.com.taiji.service.HlxAisService; import cn.com.taiji.utils.ByteArrayUtils; import com.alibaba.fastjson.JSON; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import java.util.List; import java.util.Optional; /** * @author kok20 * @date 2023/3/3 */ @Component public class HlxAisConsumer { @Autowired HlxAisService hlxAisService; @KafkaListener(topics = "hlx_ax_original_ais", groupId = "20230303yougi") public void aisConsumer(List> records, Acknowledgment ack) { for (ConsumerRecord record : records) { Optional kafkaMessage = Optional.ofNullable(record.value()); kafkaMessage.ifPresent(result -> { byte[] head = new byte[21]; byte[] entry = new byte[result.length-21]; // System.out.println("protobuf数据bytes[]:" + ByteArrayUtils.bytes2hexDisplayHex(result)); // System.out.println("protobuf序列化大小: " + result.length); System.arraycopy(result,0,head,0,21); System.arraycopy(result,21,entry,0,result.length-21); Target.TargetAisList list = null; try { HlxHead hlxHead = hlxAisService.getHeadFromByte(head); // System.out.println("hlxHead:\n" + JSON.toJSONString(hlxHead)); list = Target.TargetAisList.parseFrom(entry); String jsonObject = JsonFormat.printer().print(list); // System.out.println("Json格式化结果:\n" + jsonObject); // System.out.println("Json格式化数据大小: " + jsonObject.getBytes().length); HlxAisPackage pack = new HlxAisPackage(); pack.setHead(JSON.toJSONString(hlxHead)); pack.setAisList(jsonObject); hlxAisService.putAisList(pack); } catch (InvalidProtocolBufferException e) { // e.printStackTrace(); } }); } ack.acknowledge(); } @KafkaListener(topics = "hlx_ax_original_radar", groupId = "20230303yougi") public void radarConsumer(List> records, Acknowledgment ack) { for (ConsumerRecord record : records) { Optional kafkaMessage = Optional.ofNullable(record.value()); kafkaMessage.ifPresent(result -> { byte[] head = new byte[21]; byte[] entry = new byte[result.length-21]; System.arraycopy(result,0,head,0,21); System.arraycopy(result,21,entry,0,result.length-21); System.out.println("protobuf数据bytes[]:" + ByteArrayUtils.bytes2hexDisplayHex(result)); System.out.println("protobuf序列化大小: " + result.length); Target.TargetRadarList list = null; try { HlxHead hlxHead = hlxAisService.getHeadFromByte(head); list = Target.TargetRadarList.parseFrom(entry); String jsonObject = JsonFormat.printer().print(list); System.out.println("Json格式化结果:\n" + jsonObject); System.out.println("Json格式化数据大小: " + jsonObject.getBytes().length); HlxRadarPackage pack = new HlxRadarPackage(); pack.setHead(JSON.toJSONString(hlxHead)); pack.setRadarList(jsonObject); hlxAisService.putRadarList(pack); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } }); } ack.acknowledge(); } }