MessageConsumer.java 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package com.uas.ps.product;
  2. import org.apache.kafka.clients.consumer.ConsumerConfig;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.apache.kafka.clients.consumer.ConsumerRecords;
  5. import org.apache.kafka.clients.consumer.KafkaConsumer;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import org.springframework.beans.factory.annotation.Autowired;
  9. import org.springframework.beans.factory.annotation.Value;
  10. import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
  11. import org.springframework.context.annotation.Bean;
  12. import org.springframework.context.annotation.Configuration;
  13. import org.springframework.stereotype.Service;
  14. import java.util.ArrayList;
  15. import java.util.Collections;
  16. import java.util.List;
  17. import java.util.Properties;
  18. /**
  19. * @author sunyj
  20. * @since 2018/1/17 19:27
  21. */
  22. @Service
  23. @Configuration
  24. public class MessageConsumer {
  25. private static final List<String> topics = Collections.singletonList("RESPONSE");
  26. private final Logger logger = LoggerFactory.getLogger(getClass());
  27. @Autowired
  28. private KafkaProperties kafkaProperties;
  29. @Value("${kafka.consumer.max-poll-interval-millis}")
  30. private Long maxPollIntervalMillis;
  31. private List<String> batchCodes = new ArrayList<>();
  32. @Bean
  33. public Properties kafkaConfig() {
  34. Properties config = new Properties();
  35. // 配置kafka集群机器
  36. setProperty(config, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
  37. KafkaProperties.Consumer kafkaPropertiesConsumer = kafkaProperties.getConsumer();
  38. // 消费者分组
  39. setProperty(config, ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConsumer.getGroupId());
  40. setProperty(config, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaPropertiesConsumer.getKeyDeserializer());
  41. setProperty(config, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaPropertiesConsumer.getValueDeserializer());
  42. // 消费者自动提交已消费消息的 offset
  43. setProperty(config, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaPropertiesConsumer.getEnableAutoCommit());
  44. // 自动提交的时间间隔
  45. setProperty(config, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaPropertiesConsumer.getAutoCommitInterval());
  46. // 每次 poll 的最大数据个数
  47. setProperty(config, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaPropertiesConsumer.getMaxPollRecords());
  48. // 设置使用最开始的 offset 偏移量为该 group.id的 最早
  49. // 如果不设置,则会是 latest 即该 topic 最新一个消息的 offset
  50. // 如果采用 latest,消费者只能得到其启动后,生产者生产的消息
  51. // 一般配置 earliest 或者 latest 值
  52. setProperty(config, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaPropertiesConsumer.getAutoOffsetReset());
  53. return config;
  54. }
  55. private void setProperty(Properties properties, Object key, Object value) {
  56. if (value != null) {
  57. properties.put(key, value);
  58. }
  59. }
  60. /**
  61. * 如果groupId之前存在 , 则从之前提交的最后消费数据的offset处继续开始消费数据
  62. * 如果groupId之前不存在,则从当前分区的最后位置开始消费
  63. * <p>
  64. * 注意如果enable.auto.commit 设置为false,如果消费完数据没有提交已消费数据的offset,
  65. * 则会出现重复消费数据的情况
  66. */
  67. public void waitResponse(String batchCode) {
  68. logger.info("waiting batchCode: " + batchCode);
  69. if(batchCodes.contains(batchCode)){
  70. batchCodes.remove(batchCode);
  71. return;
  72. }
  73. final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfig());
  74. // 订阅topic,并实现 ConsumerRebalanceListener
  75. consumer.subscribe(topics);
  76. boolean processed = false;
  77. while (!processed) {
  78. try {
  79. ConsumerRecords<String, String> records = consumer.poll(maxPollIntervalMillis);
  80. logger.info("topic: " + topics + " pool return records size: " + records.count());
  81. for (ConsumerRecord<String, String> record : records) {
  82. String value = record.value();
  83. logger.info("receiving batchCode... " + value);
  84. if(value.equals(batchCode)){
  85. processed = true;
  86. } else{
  87. batchCodes.add(batchCode);
  88. }
  89. // 手动提交已消费数据的 offset
  90. Boolean enableAutoCommit = kafkaProperties.getConsumer().getEnableAutoCommit();
  91. if (enableAutoCommit != null && !enableAutoCommit) {
  92. consumer.commitSync();
  93. }
  94. }
  95. } catch (Exception e) {
  96. logger.error("消息处理出错", e);
  97. }
  98. }
  99. consumer.unsubscribe();
  100. consumer.close();
  101. }
  102. }