| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- package com.uas.ps.product;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.stereotype.Service;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.List;
- import java.util.Properties;
- /**
- * @author sunyj
- * @since 2018/1/17 19:27
- */
- @Service
- @Configuration
- public class MessageConsumer {
- private static final List<String> topics = Collections.singletonList("RESPONSE");
- private final Logger logger = LoggerFactory.getLogger(getClass());
- @Autowired
- private KafkaProperties kafkaProperties;
- @Value("${kafka.consumer.max-poll-interval-millis}")
- private Long maxPollIntervalMillis;
- private List<String> batchCodes = new ArrayList<>();
- @Bean
- public Properties kafkaConfig() {
- Properties config = new Properties();
- // 配置kafka集群机器
- setProperty(config, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
- KafkaProperties.Consumer kafkaPropertiesConsumer = kafkaProperties.getConsumer();
- // 消费者分组
- setProperty(config, ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConsumer.getGroupId());
- setProperty(config, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaPropertiesConsumer.getKeyDeserializer());
- setProperty(config, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaPropertiesConsumer.getValueDeserializer());
- // 消费者自动提交已消费消息的 offset
- setProperty(config, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaPropertiesConsumer.getEnableAutoCommit());
- // 自动提交的时间间隔
- setProperty(config, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaPropertiesConsumer.getAutoCommitInterval());
- // 每次 poll 的最大数据个数
- setProperty(config, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaPropertiesConsumer.getMaxPollRecords());
- // 设置使用最开始的 offset 偏移量为该 group.id的 最早
- // 如果不设置,则会是 latest 即该 topic 最新一个消息的 offset
- // 如果采用 latest,消费者只能得到其启动后,生产者生产的消息
- // 一般配置 earliest 或者 latest 值
- setProperty(config, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaPropertiesConsumer.getAutoOffsetReset());
- return config;
- }
- private void setProperty(Properties properties, Object key, Object value) {
- if (value != null) {
- properties.put(key, value);
- }
- }
- /**
- * 如果groupId之前存在 , 则从之前提交的最后消费数据的offset处继续开始消费数据
- * 如果groupId之前不存在,则从当前分区的最后位置开始消费
- * <p>
- * 注意如果enable.auto.commit 设置为false,如果消费完数据没有提交已消费数据的offset,
- * 则会出现重复消费数据的情况
- */
- public void waitResponse(String batchCode) {
- logger.info("waiting batchCode: " + batchCode);
- if(batchCodes.contains(batchCode)){
- batchCodes.remove(batchCode);
- return;
- }
- final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfig());
- // 订阅topic,并实现 ConsumerRebalanceListener
- consumer.subscribe(topics);
- boolean processed = false;
- while (!processed) {
- try {
- ConsumerRecords<String, String> records = consumer.poll(maxPollIntervalMillis);
- logger.info("topic: " + topics + " pool return records size: " + records.count());
- for (ConsumerRecord<String, String> record : records) {
- String value = record.value();
- logger.info("receiving batchCode... " + value);
- if(value.equals(batchCode)){
- processed = true;
- } else{
- batchCodes.add(batchCode);
- }
- // 手动提交已消费数据的 offset
- Boolean enableAutoCommit = kafkaProperties.getConsumer().getEnableAutoCommit();
- if (enableAutoCommit != null && !enableAutoCommit) {
- consumer.commitSync();
- }
- }
- } catch (Exception e) {
- logger.error("消息处理出错", e);
- }
- }
- consumer.unsubscribe();
- consumer.close();
- }
- }
|