🍤代码和Springboot整合Kafka

其实在Kafka的官方文档中是含有很多的代码案例,其实可以直接看官方文档的代码,上面连续即可跳转,同时第三个连接为本blog的代码,刚需可自行获取.

1️⃣官方入门案例

1.消费者

    private final static String TOPIC_NAME = "code-create-topic";
    private final static String CONSUMER_GROUP_NAME = "codeCreateGroup";
    private final static String BOOTSTRAP_SERVERS_IP = "192.168.230.133:39092,192.168.230.134:29092,192.168.230.135:19092";


    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS_IP);
        props.setProperty("group.id", CONSUMER_GROUP_NAME);
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        while (true) {
            //长轮询 往服务端拉取消息 循环的去拉 拉到就返回
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }

2.生产者

    private final static String TOPIC_NAME = "code-create-topic";
    private final static String BOOTSTRAP_SERVERS_IP = "192.168.230.133:39092,192.168.230.134:29092,192.168.230.135:19092";

    public static void main(String[] args) {
        Properties props = new Properties();
        // 通过kafka找到zk 其实就是往zk里面写文件
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS_IP);
        props.put("acks", "all");
        props.put("linger.ms", 1);
        // key的序列化器
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        // value的序列化器
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>(TOPIC_NAME, Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }

两者的逻辑区别不大,都有固定的规律

生产者:

  1. 指定好要发送的Topic名字和集群IP
  2. 配置Properties,组装对应的参数信息
  3. 构建发送消息的载体值KafkaProducer
  4. send发送消息,并关闭资源

消费者:

  1. 配置Properties,组装对应的参数信息
  2. 构建接收消息的载体值KafkaConsumer
  3. 订阅对应的Topic ,并poll 载体中的消息即可

2️⃣指定分区的同步发送

1.生产者

Properties props = new Properties();
        // 通过kafka找到zk 其实就是往zk里面写文件
        props.put*(配置各种信息 略);
        Producer<String, String> producer = new KafkaProducer<>(props);
        int msgCount = 5;
        for (int i = 0; i < msgCount; i++) {
            //订单对象
            Order order = new Order(i, 100 + i, 1, 1000.00);
            //实例化发送消息的载体 并指定发送到0号分区
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , 0, order.getOrderId().toString(), JSON.toJSONString(order));
            //等待消息发送成功的同步堵塞方法
            RecordMetadata recordMetadata = producer.send(producerRecord).get();

            log.info("同步方式发送消息结果:" + "topic:" + recordMetadata.topic() + "|partition:"
                    + recordMetadata.partition() + "|offset-" + recordMetadata.offset());
        }
        producer.close();

消费者可与官网一致,略

idea中生产者的日志输出

14:44:16.463 [main] INFO  com.ralph.kafkaapi.KafkaMsgProducer2 - 同步方式发送消息结果:topic:code-create-topic|partition:0|offset-100
14:44:16.463 [main] INFO  com.ralph.kafkaapi.KafkaMsgProducer2 - 同步方式发送消息结果:topic:code-create-topic|partition:0|offset-101
14:44:16.463 [main] INFO  com.ralph.kafkaapi.KafkaMsgProducer2 - 同步方式发送消息结果:topic:code-create-topic|partition:0|offset-102
14:44:16.463 [main] INFO  com.ralph.kafkaapi.KafkaMsgProducer2 - 同步方式发送消息结果:topic:code-create-topic|partition:0|offset-103
14:44:16.463 [main] INFO  com.ralph.kafkaapi.KafkaMsgProducer2 - 同步方式发送消息结果:topic:code-create-topic|partition:0|offset-104

同时我们还可以采用命令行的方式来消费

# bin/kafka-console-consumer.sh --bootstrap-server 192.168.230.133:39092,192.168.230.134:29092,192.168.230.135:19092 --from-beginning --topic code-create-topic  

{"orderAmount":1000,"orderId":0,"productId":100,"productNum":1}
{"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}
{"orderAmount":1000,"orderId":2,"productId":102,"productNum":1}
{"orderAmount":1000,"orderId":3,"productId":103,"productNum":1}
{"orderAmount":1000,"orderId":4,"productId":104,"productNum":1}

3️⃣指定分区的异步发送

1.生产者

 Properties props = new Properties();
        // 通过kafka找到zk 其实就是往zk里面写文件
        props.put*(配置各种信息 略);
        int msgCount = 5;
        final CountDownLatch countDownLatch = new CountDownLatch(msgCount);
        for (int i = 0; i < msgCount; i++) {
            //订单对象
            Order order = new Order(i, 100 + i, 1, 1000.00);

            //实例化发送消息的载体 并指定发送到0号分区
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , 0, order.getOrderId().toString(), JSON.toJSONString(order));


            //异步回调方式发送消息
            producer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        log.error("发送消息失败:" + Arrays.toString(exception.getStackTrace()));

                    }
                    if (metadata != null) {
                        log.info("异步方式发送消息结果:" + "topic:" + metadata.topic() + "|partition:"
                                + metadata.partition() + "|offset-" + metadata.offset());
                    }
                    countDownLatch.countDown();
                }
            });
            // 可以直接处理业务逻辑

        }
        producer.close();
    }
14:58:16.052 [kafka-producer-network-thread | producer-1] INFO  com.ralph.kafkaapi.KafkaMsgProducer3 - 异步方式发送消息结果:topic:code-create-topic|partition:0|offset-105
14:58:16.052 [kafka-producer-network-thread | producer-1] INFO  com.ralph.kafkaapi.KafkaMsgProducer3 - 异步方式发送消息结果:topic:code-create-topic|partition:0|offset-106
14:58:16.052 [kafka-producer-network-thread | producer-1] INFO  com.ralph.kafkaapi.KafkaMsgProducer3 - 异步方式发送消息结果:topic:code-create-topic|partition:0|offset-107
14:58:16.052 [kafka-producer-network-thread | producer-1] INFO  com.ralph.kafkaapi.KafkaMsgProducer3 - 异步方式发送消息结果:topic:code-create-topic|partition:0|offset-108
14:58:16.052 [kafka-producer-network-thread | producer-1] INFO  com.ralph.kafkaapi.KafkaMsgProducer3 - 异步方式发送消息结果:topic:code-create-topic|partition:0|offset-109

4️⃣同步发送和异步发送的应用场景

如果发送完消息后不需要在做对应的业务处理,那么可以使用同步发送的方式,但是如果是发送消息结束之后,还需要做一些业务处理的话,那么就尽可能的使用异步发送

5️⃣不指定分区分方式发送信息

1.生产者

上面说过了指定分区的同步发送和异步发送两种方式,其实不指定分区的情况也没多大区别,我们就那异步的来举例:

// 略上面的put配置信息
Producer<String, String> producer = new KafkaProducer<>(props);
        int msgCount = 10;
        final CountDownLatch countDownLatch = new CountDownLatch(msgCount);
        for (int i = 0; i < msgCount; i++) {
            //订单对象
            Order order = new Order(i, 10 + i, 1, 1000.00);

            //未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , order.getOrderId().toString(), JSON.toJSONString(order));


            //异步回调方式发送消息
            producer.send(producerRecord, (metadata, exception) -> {
                if (exception != null) {
                    log.error("发送消息失败:" + Arrays.toString(exception.getStackTrace()));

                }
                if (metadata != null) {
                    log.info("异步方式发送消息结果:" + "topic:" + metadata.topic() + "|partition:"
                            + metadata.partition() + "|offset-" + metadata.offset());
                }
                countDownLatch.countDown();
            });
            // 可以直接处理业务逻辑

        }
        countDownLatch.await(5, TimeUnit.SECONDS);
        producer.close();
    }

idea的输出日志

根据key的规则计算去法,同时首次切换了分区后offset就会更新 ,多起启动后offset还是会根据上一次的值进行迭代的哈

异步方式发送消息结果:topic:testr1p4|partition:3|offset-0
异步方式发送消息结果:topic:testr1p4|partition:3|offset-1
异步方式发送消息结果:topic:testr1p4|partition:3|offset-2
异步方式发送消息结果:topic:testr1p4|partition:3|offset-3
异步方式发送消息结果:topic:testr1p4|partition:3|offset-4
异步方式发送消息结果:topic:testr1p4|partition:0|offset-0
15:02:48.008 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=6): ProduceResponseData(responses=[TopicProduceResponse(name='testr1p4', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=0, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
异步方式发送消息结果:topic:testr1p4|partition:2|offset-0
15:02:48.011 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 3 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=7): ProduceResponseData(responses=[TopicProduceResponse(name='testr1p4', partitionResponses=[PartitionProduceResponse(index=4, errorCode=0, baseOffset=0, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null), PartitionProduceResponse(index=1, errorCode=0, baseOffset=0, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
异步方式发送消息结果:topic:testr1p4|partition:4|offset-0
异步方式发送消息结果:topic:testr1p4|partition:4|offset-1
异步方式发送消息结果:topic:testr1p4|partition:1|offset-0

通过消费消息来校验:

# bin/kafka-console-consumer.sh --bootstrap-server 192.168.230.133:39092,192.168.230.134:29092,192.168.230.135:19092 --from-beginning --topic testr1p4
{"orderAmount":1000,"orderId":2,"productId":12,"productNum":1}
{"orderAmount":1000,"orderId":3,"productId":13,"productNum":1}
{"orderAmount":1000,"orderId":4,"productId":14,"productNum":1}
{"orderAmount":1000,"orderId":6,"productId":16,"productNum":1}
{"orderAmount":1000,"orderId":9,"productId":19,"productNum":1}
{"orderAmount":1000,"orderId":5,"productId":15,"productNum":1}
{"orderAmount":1000,"orderId":8,"productId":18,"productNum":1}
{"orderAmount":1000,"orderId":1,"productId":11,"productNum":1}
{"orderAmount":1000,"orderId":7,"productId":17,"productNum":1}
{"orderAmount":1000,"orderId":0,"productId":10,"productNum":1}

2.选择分区的源码剖析

那么kafka到底是如何根据发送的key来选择分区发送的呢?

源码入口:roducer.send()->KafkaProducer.send()->doSend()

int partition = partition(record, serializedKey, serializedValue, cluster);
  • record:发送消息的载体
  • serializedKey/serializedValue:序列化之后的key 和value

在这里大家要注意的是,其实实例化value,才是存载我们消息的真正载体,而key才是做判断或者其他用途。继续往下进入partition()方法里面,判断没有指定分区,有指定就按照指定的发,没有的话

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(
                        record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

选默认的方式,Utils.murmur2(keyBytes) 某种hash算法 根据分区总数取模获取具体的分区号

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

6️⃣Springboot整合Kafka

1.Springboot三板斧以及环境前期准备

在基本使用的博客中已经说了在server的配置文件中要找修改对应的advertised.listeners,如果仅仅是虚拟机学习环境or 公司非跨机房的局域内网可以不再配置,一般最好配置一下,毕竟官网也建议搭建配置

advertised.listeners=PLAINTEXT://112.126.74.249:9092

然后就是对应的springboot配置了yaml:

spring:
  kafka:
    bootstrap-servers: 192.168.230.133:39092,192.168.230.134:29092,192.168.230.135:19092
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <groupId>com.ralph</groupId>
    <artifactId>SringBootKafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>SpringBootKafka</name>
    <description>SpringBootKafka</description>

    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.1.41</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>
</project>

Topic我们可以自己先创建好,也可以让代码帮我们创建,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下。

@Configuration
public class KafkaConfig {
    @Bean
    public NewTopic initialTopic() {
        return new NewTopic("Springboot-Create-Topic",5, (short) 3 );
    }
}

2.简单生产者案例

下面示例创建了一个生产者,发送消息到Springboot-Create-Topic。

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;


    /**
     * send simple message by produce
     * @param message message
     * @return
     */
    @GetMapping("/api/send/{message}")
    public String sendMessage1(@PathVariable("message") String message) {
        kafkaTemplate.send("Springboot-Create-Topic", message);
        return "send message!";
    }

3.简单消费者案例

@Slf4j
@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"Springboot-Create-Topic"})
    public void onMessage1(ConsumerRecord<String, String> record, Acknowledgment ack){
        // 消费的哪个topic、partition的消息,打印出消息内容
        log.info("简单消费[消息所在的Topic:"+record.topic()+"消息所在的分区:"+record.partition()+"消息内容:"+record.value()+"]");
        ack.acknowledge();
    }

消费者监听Springboot-Create-Topic消费消息。监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。启动项目,postman调接口触发生产者发送消息

4.带回调的生产者案例

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法:

    /**
     * send callback1 message by produce
     * @param callbackMessage callbackMessage
     */
    @GetMapping("/api/sendCallback1/{message}")
    public void sendMessage2(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("Springboot-Create-Topic", callbackMessage).addCallback(success -> {
            // 消息发送到的topic
            assert success != null;
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            log.info("发送消息成功:" + topic + "-" + partition + "-" + offset);
        }, failure -> {
            log.error("发送消息失败:" + failure.getMessage());

        });
    }

    /**
     * send callback2 message by produce
     * @param callbackMessage callbackMessage
     */
    @GetMapping("/api/sendCallback2/{message}")
    public void sendMessage3(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("Springboot-Create-Topic", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("发送消息失败:"+ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("发送消息成功:" + result.getRecordMetadata().topic() + ":"
                        + result.getRecordMetadata().partition() + ":" + result.getRecordMetadata().offset());
            }
        });
    }

5.消费者@KafkaListener的常规使用

指定Topic 和组:

@KafkaListener(topics = "Springboot-topic",groupId = "AGroup")
@KafkaListener(topics = "Springboot",groupId = "BGroup")

还可以指定Topic分区:

concurrency就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数

@KafkaListener(groupId = "AGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"})},concurrency = "6")

同时还可以指定offset消费

@KafkaListener(groupId = "AGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))},concurrency = "6")

完整伪代码:

         @KafkaListener(groupId = "EGtestGroup", topicPartitions = {
            @TopicPartition(topic = "EGtopic1", partitions = {"0", "1"}),
            @TopicPartition(topic = "EGtopic2", partitions = "0",
            partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
            },concurrency = "6")
         public void EGtest(ConsumerRecord<String, String> record, Acknowledgment ack) {
             String value = record.value();
             System.out.println(value);
             System.out.println(record);
             //手动提交offset
             //ack.acknowledge();
         }