🍤代码和Springboot整合Kafka
其实在Kafka的官方文档中是含有很多的代码案例,其实可以直接看官方文档的代码,上面连续即可跳转,同时第三个连接为本blog的代码,刚需可自行获取.
1️⃣官方入门案例
1.消费者
private final static String TOPIC_NAME = "code-create-topic";
private final static String CONSUMER_GROUP_NAME = "codeCreateGroup";
private final static String BOOTSTRAP_SERVERS_IP = "192.168.230.133:39092,192.168.230.134:29092,192.168.230.135:19092";
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", BOOTSTRAP_SERVERS_IP);
props.setProperty("group.id", CONSUMER_GROUP_NAME);
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
//长轮询 往服务端拉取消息 循环的去拉 拉到就返回
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
2.生产者
private final static String TOPIC_NAME = "code-create-topic";
private final static String BOOTSTRAP_SERVERS_IP = "192.168.230.133:39092,192.168.230.134:29092,192.168.230.135:19092";
public static void main(String[] args) {
Properties props = new Properties();
// 通过kafka找到zk 其实就是往zk里面写文件
props.put("bootstrap.servers", BOOTSTRAP_SERVERS_IP);
props.put("acks", "all");
props.put("linger.ms", 1);
// key的序列化器
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
// value的序列化器
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>(TOPIC_NAME, Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
两者的逻辑区别不大,都有固定的规律
生产者:
- 指定好要发送的Topic名字和集群IP
- 配置
Properties
,组装对应的参数信息 - 构建发送消息的载体值
KafkaProducer
- send发送消息,并关闭资源
消费者:
- 配置
Properties
,组装对应的参数信息 - 构建接收消息的载体值
KafkaConsumer
- 订阅对应的Topic ,并
poll
载体中的消息即可
2️⃣指定分区的同步发送
1.生产者
Properties props = new Properties();
// 通过kafka找到zk 其实就是往zk里面写文件
props.put*(配置各种信息 略);
Producer<String, String> producer = new KafkaProducer<>(props);
int msgCount = 5;
for (int i = 0; i < msgCount; i++) {
//订单对象
Order order = new Order(i, 100 + i, 1, 1000.00);
//实例化发送消息的载体 并指定发送到0号分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
, 0, order.getOrderId().toString(), JSON.toJSONString(order));
//等待消息发送成功的同步堵塞方法
RecordMetadata recordMetadata = producer.send(producerRecord).get();
log.info("同步方式发送消息结果:" + "topic:" + recordMetadata.topic() + "|partition:"
+ recordMetadata.partition() + "|offset-" + recordMetadata.offset());
}
producer.close();
消费者可与官网一致,略
idea中生产者的日志输出
14:44:16.463 [main] INFO com.ralph.kafkaapi.KafkaMsgProducer2 - 同步方式发送消息结果:topic:code-create-topic|partition:0|offset-100
14:44:16.463 [main] INFO com.ralph.kafkaapi.KafkaMsgProducer2 - 同步方式发送消息结果:topic:code-create-topic|partition:0|offset-101
14:44:16.463 [main] INFO com.ralph.kafkaapi.KafkaMsgProducer2 - 同步方式发送消息结果:topic:code-create-topic|partition:0|offset-102
14:44:16.463 [main] INFO com.ralph.kafkaapi.KafkaMsgProducer2 - 同步方式发送消息结果:topic:code-create-topic|partition:0|offset-103
14:44:16.463 [main] INFO com.ralph.kafkaapi.KafkaMsgProducer2 - 同步方式发送消息结果:topic:code-create-topic|partition:0|offset-104
同时我们还可以采用命令行的方式来消费
# bin/kafka-console-consumer.sh --bootstrap-server 192.168.230.133:39092,192.168.230.134:29092,192.168.230.135:19092 --from-beginning --topic code-create-topic
{"orderAmount":1000,"orderId":0,"productId":100,"productNum":1}
{"orderAmount":1000,"orderId":1,"productId":101,"productNum":1}
{"orderAmount":1000,"orderId":2,"productId":102,"productNum":1}
{"orderAmount":1000,"orderId":3,"productId":103,"productNum":1}
{"orderAmount":1000,"orderId":4,"productId":104,"productNum":1}
3️⃣指定分区的异步发送
1.生产者
Properties props = new Properties();
// 通过kafka找到zk 其实就是往zk里面写文件
props.put*(配置各种信息 略);
int msgCount = 5;
final CountDownLatch countDownLatch = new CountDownLatch(msgCount);
for (int i = 0; i < msgCount; i++) {
//订单对象
Order order = new Order(i, 100 + i, 1, 1000.00);
//实例化发送消息的载体 并指定发送到0号分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
, 0, order.getOrderId().toString(), JSON.toJSONString(order));
//异步回调方式发送消息
producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error("发送消息失败:" + Arrays.toString(exception.getStackTrace()));
}
if (metadata != null) {
log.info("异步方式发送消息结果:" + "topic:" + metadata.topic() + "|partition:"
+ metadata.partition() + "|offset-" + metadata.offset());
}
countDownLatch.countDown();
}
});
// 可以直接处理业务逻辑
}
producer.close();
}
14:58:16.052 [kafka-producer-network-thread | producer-1] INFO com.ralph.kafkaapi.KafkaMsgProducer3 - 异步方式发送消息结果:topic:code-create-topic|partition:0|offset-105
14:58:16.052 [kafka-producer-network-thread | producer-1] INFO com.ralph.kafkaapi.KafkaMsgProducer3 - 异步方式发送消息结果:topic:code-create-topic|partition:0|offset-106
14:58:16.052 [kafka-producer-network-thread | producer-1] INFO com.ralph.kafkaapi.KafkaMsgProducer3 - 异步方式发送消息结果:topic:code-create-topic|partition:0|offset-107
14:58:16.052 [kafka-producer-network-thread | producer-1] INFO com.ralph.kafkaapi.KafkaMsgProducer3 - 异步方式发送消息结果:topic:code-create-topic|partition:0|offset-108
14:58:16.052 [kafka-producer-network-thread | producer-1] INFO com.ralph.kafkaapi.KafkaMsgProducer3 - 异步方式发送消息结果:topic:code-create-topic|partition:0|offset-109
4️⃣同步发送和异步发送的应用场景
如果发送完消息后不需要在做对应的业务处理,那么可以使用同步发送的方式,但是如果是发送消息结束之后,还需要做一些业务处理的话,那么就尽可能的使用异步发送
5️⃣不指定分区分方式发送信息
1.生产者
上面说过了指定分区的同步发送和异步发送两种方式,其实不指定分区的情况也没多大区别,我们就那异步的来举例:
// 略上面的put配置信息
Producer<String, String> producer = new KafkaProducer<>(props);
int msgCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(msgCount);
for (int i = 0; i < msgCount; i++) {
//订单对象
Order order = new Order(i, 10 + i, 1, 1000.00);
//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
, order.getOrderId().toString(), JSON.toJSONString(order));
//异步回调方式发送消息
producer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
log.error("发送消息失败:" + Arrays.toString(exception.getStackTrace()));
}
if (metadata != null) {
log.info("异步方式发送消息结果:" + "topic:" + metadata.topic() + "|partition:"
+ metadata.partition() + "|offset-" + metadata.offset());
}
countDownLatch.countDown();
});
// 可以直接处理业务逻辑
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.close();
}
idea的输出日志
根据key的规则计算去法,同时首次
切换了分区后offset就会更新 ,多起启动后offset还是会根据上一次的值进行迭代的哈
异步方式发送消息结果:topic:testr1p4|partition:3|offset-0
异步方式发送消息结果:topic:testr1p4|partition:3|offset-1
异步方式发送消息结果:topic:testr1p4|partition:3|offset-2
异步方式发送消息结果:topic:testr1p4|partition:3|offset-3
异步方式发送消息结果:topic:testr1p4|partition:3|offset-4
异步方式发送消息结果:topic:testr1p4|partition:0|offset-0
15:02:48.008 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 1 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=6): ProduceResponseData(responses=[TopicProduceResponse(name='testr1p4', partitionResponses=[PartitionProduceResponse(index=2, errorCode=0, baseOffset=0, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
异步方式发送消息结果:topic:testr1p4|partition:2|offset-0
15:02:48.011 [kafka-producer-network-thread | producer-1] DEBUG o.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Received PRODUCE response from node 3 for request with header RequestHeader(apiKey=PRODUCE, apiVersion=9, clientId=producer-1, correlationId=7): ProduceResponseData(responses=[TopicProduceResponse(name='testr1p4', partitionResponses=[PartitionProduceResponse(index=4, errorCode=0, baseOffset=0, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null), PartitionProduceResponse(index=1, errorCode=0, baseOffset=0, logAppendTimeMs=-1, logStartOffset=0, recordErrors=[], errorMessage=null)])], throttleTimeMs=0)
异步方式发送消息结果:topic:testr1p4|partition:4|offset-0
异步方式发送消息结果:topic:testr1p4|partition:4|offset-1
异步方式发送消息结果:topic:testr1p4|partition:1|offset-0
通过消费消息来校验:
# bin/kafka-console-consumer.sh --bootstrap-server 192.168.230.133:39092,192.168.230.134:29092,192.168.230.135:19092 --from-beginning --topic testr1p4
{"orderAmount":1000,"orderId":2,"productId":12,"productNum":1}
{"orderAmount":1000,"orderId":3,"productId":13,"productNum":1}
{"orderAmount":1000,"orderId":4,"productId":14,"productNum":1}
{"orderAmount":1000,"orderId":6,"productId":16,"productNum":1}
{"orderAmount":1000,"orderId":9,"productId":19,"productNum":1}
{"orderAmount":1000,"orderId":5,"productId":15,"productNum":1}
{"orderAmount":1000,"orderId":8,"productId":18,"productNum":1}
{"orderAmount":1000,"orderId":1,"productId":11,"productNum":1}
{"orderAmount":1000,"orderId":7,"productId":17,"productNum":1}
{"orderAmount":1000,"orderId":0,"productId":10,"productNum":1}
2.选择分区的源码剖析
那么kafka到底是如何根据发送的key来选择分区发送的呢?
源码入口:roducer.send()->KafkaProducer.send()->doSend()
int partition = partition(record, serializedKey, serializedValue, cluster);
record
:发送消息的载体serializedKey/serializedValue
:序列化之后的key 和value
在这里大家要注意的是,其实实例化value,才是存载我们消息的真正载体,而key才是做判断或者其他用途。继续往下进入partition()
方法里面,判断没有指定分区,有指定就按照指定的发,没有的话
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
选默认的方式,Utils.murmur2(keyBytes)
某种hash算法 根据分区总数取模获取具体的分区号
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
6️⃣Springboot整合Kafka
1.Springboot三板斧以及环境前期准备
在基本使用的博客中已经说了在server的配置文件中要找修改对应的advertised.listeners
,如果仅仅是虚拟机学习环境or 公司非跨机房的局域内网可以不再配置,一般最好配置一下,毕竟官网也建议搭建配置
advertised.listeners=PLAINTEXT://112.126.74.249:9092
然后就是对应的springboot配置了yaml:
spring:
kafka:
bootstrap-servers: 192.168.230.133:39092,192.168.230.134:29092,192.168.230.135:19092
producer: # 生产者
retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编解码方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交
# MANUAL_IMMEDIATE
ack-mode: MANUAL_IMMEDIATE
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ralph</groupId>
<artifactId>SringBootKafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBootKafka</name>
<description>SpringBootKafka</description>
<properties>
<java.version>1.8</java.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.1.41</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
Topic
我们可以自己先创建好,也可以让代码帮我们创建,但这种情况下创建的topic默认只有一个分区,分区也没有副本。所以,我们可以在项目中新建一个配置类专门用来初始化topic,如下。
@Configuration
public class KafkaConfig {
@Bean
public NewTopic initialTopic() {
return new NewTopic("Springboot-Create-Topic",5, (short) 3 );
}
}
2.简单生产者案例
下面示例创建了一个生产者,发送消息到Springboot-Create-Topic。
@Resource
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* send simple message by produce
* @param message message
* @return
*/
@GetMapping("/api/send/{message}")
public String sendMessage1(@PathVariable("message") String message) {
kafkaTemplate.send("Springboot-Create-Topic", message);
return "send message!";
}
3.简单消费者案例
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"Springboot-Create-Topic"})
public void onMessage1(ConsumerRecord<String, String> record, Acknowledgment ack){
// 消费的哪个topic、partition的消息,打印出消息内容
log.info("简单消费[消息所在的Topic:"+record.topic()+"消息所在的分区:"+record.partition()+"消息内容:"+record.value()+"]");
ack.acknowledge();
}
消费者监听Springboot-Create-Topic
消费消息。监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。启动项目,postman调接口触发生产者发送消息
4.带回调的生产者案例
kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法:
/**
* send callback1 message by produce
* @param callbackMessage callbackMessage
*/
@GetMapping("/api/sendCallback1/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("Springboot-Create-Topic", callbackMessage).addCallback(success -> {
// 消息发送到的topic
assert success != null;
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
log.info("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
log.error("发送消息失败:" + failure.getMessage());
});
}
/**
* send callback2 message by produce
* @param callbackMessage callbackMessage
*/
@GetMapping("/api/sendCallback2/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("Springboot-Create-Topic", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.error("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("发送消息成功:" + result.getRecordMetadata().topic() + ":"
+ result.getRecordMetadata().partition() + ":" + result.getRecordMetadata().offset());
}
});
}
5.消费者@KafkaListener的常规使用
指定Topic 和组:
@KafkaListener(topics = "Springboot-topic",groupId = "AGroup")
@KafkaListener(topics = "Springboot",groupId = "BGroup")
还可以指定Topic分区:
concurrency
就是同组下的消费者个数,就是并发消费数,必须小于等于分区总数
@KafkaListener(groupId = "AGroup", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0", "1"})},concurrency = "6")
同时还可以指定offset
消费
@KafkaListener(groupId = "AGroup", topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))},concurrency = "6")
完整伪代码:
@KafkaListener(groupId = "EGtestGroup", topicPartitions = {
@TopicPartition(topic = "EGtopic1", partitions = {"0", "1"}),
@TopicPartition(topic = "EGtopic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
},concurrency = "6")
public void EGtest(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//手动提交offset
//ack.acknowledge();
}
评论区