HlxAisConsumer.java 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package cn.com.taiji.consumer;
  2. import cn.com.taiji.model.HlxAisPackage;
  3. import cn.com.taiji.model.HlxHead;
  4. import cn.com.taiji.model.HlxRadarPackage;
  5. import cn.com.taiji.protobuf.Target;
  6. import cn.com.taiji.service.HlxAisService;
  7. import cn.com.taiji.utils.ByteArrayUtils;
  8. import com.alibaba.fastjson.JSON;
  9. import com.google.protobuf.InvalidProtocolBufferException;
  10. import com.google.protobuf.util.JsonFormat;
  11. import org.apache.kafka.clients.consumer.ConsumerRecord;
  12. import org.springframework.beans.factory.annotation.Autowired;
  13. import org.springframework.kafka.annotation.KafkaListener;
  14. import org.springframework.kafka.support.Acknowledgment;
  15. import org.springframework.stereotype.Component;
  16. import java.util.List;
  17. import java.util.Optional;
  18. /**
  19. * @author kok20
  20. * @date 2023/3/3
  21. */
  22. @Component
  23. public class HlxAisConsumer {
  24. @Autowired
  25. HlxAisService hlxAisService;
  26. @KafkaListener(topics = "hlx_ax_original_ais", groupId = "20230303yougi")
  27. public void aisConsumer(List<ConsumerRecord<byte[],byte[]>> records, Acknowledgment ack) {
  28. for (ConsumerRecord<byte[],byte[]> record : records) {
  29. Optional<byte[]> kafkaMessage = Optional.ofNullable(record.value());
  30. kafkaMessage.ifPresent(result -> {
  31. byte[] head = new byte[21];
  32. byte[] entry = new byte[result.length-21];
  33. // System.out.println("protobuf数据bytes[]:" + ByteArrayUtils.bytes2hexDisplayHex(result));
  34. // System.out.println("protobuf序列化大小: " + result.length);
  35. System.arraycopy(result,0,head,0,21);
  36. System.arraycopy(result,21,entry,0,result.length-21);
  37. Target.TargetAisList list = null;
  38. try {
  39. HlxHead hlxHead = hlxAisService.getHeadFromByte(head);
  40. // System.out.println("hlxHead:\n" + JSON.toJSONString(hlxHead));
  41. list = Target.TargetAisList.parseFrom(entry);
  42. String jsonObject = JsonFormat.printer().print(list);
  43. // System.out.println("Json格式化结果:\n" + jsonObject);
  44. // System.out.println("Json格式化数据大小: " + jsonObject.getBytes().length);
  45. HlxAisPackage pack = new HlxAisPackage();
  46. pack.setHead(JSON.toJSONString(hlxHead));
  47. pack.setAisList(jsonObject);
  48. hlxAisService.putAisList(pack);
  49. } catch (InvalidProtocolBufferException e) {
  50. // e.printStackTrace();
  51. }
  52. });
  53. }
  54. ack.acknowledge();
  55. }
  56. @KafkaListener(topics = "hlx_ax_original_radar", groupId = "20230303yougi")
  57. public void radarConsumer(List<ConsumerRecord<byte[],byte[]>> records, Acknowledgment ack) {
  58. for (ConsumerRecord<byte[],byte[]> record : records) {
  59. Optional<byte[]> kafkaMessage = Optional.ofNullable(record.value());
  60. kafkaMessage.ifPresent(result -> {
  61. byte[] head = new byte[21];
  62. byte[] entry = new byte[result.length-21];
  63. System.arraycopy(result,0,head,0,21);
  64. System.arraycopy(result,21,entry,0,result.length-21);
  65. System.out.println("protobuf数据bytes[]:" + ByteArrayUtils.bytes2hexDisplayHex(result));
  66. System.out.println("protobuf序列化大小: " + result.length);
  67. Target.TargetRadarList list = null;
  68. try {
  69. HlxHead hlxHead = hlxAisService.getHeadFromByte(head);
  70. list = Target.TargetRadarList.parseFrom(entry);
  71. String jsonObject = JsonFormat.printer().print(list);
  72. System.out.println("Json格式化结果:\n" + jsonObject);
  73. System.out.println("Json格式化数据大小: " + jsonObject.getBytes().length);
  74. HlxRadarPackage pack = new HlxRadarPackage();
  75. pack.setHead(JSON.toJSONString(hlxHead));
  76. pack.setRadarList(jsonObject);
  77. hlxAisService.putRadarList(pack);
  78. } catch (InvalidProtocolBufferException e) {
  79. e.printStackTrace();
  80. }
  81. });
  82. }
  83. ack.acknowledge();
  84. }
  85. }