单线程操作
Maven 配置
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
生产者代码:
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerDemo2 {
public static void main(String[] args) {
Properties props = new Properties();
// kafka的地址
props.put("bootstrap.servers",
"synhadoop100:9092,synhadoop101:9092,synhadoop102:9092,synhadoop103:9092,synhadoop104:9092");
// acks:消息的确认机制,默认值是0
// acks=0:如果设置为0,生产者不会等待kafka的响应。
// acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应
// acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证
props.put("acks", "all");
// 配置为大于0的值的话,客户端会在消息发送失败时重新发送
props.put("retries", 0);
// 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
props.put("batch.size", 16384);
// 在正常负载的情况下, 要想减少请求的数量. 加上一个认为的延迟: 不是立即发送消息, 而是延迟等待更多的消息一起批量发送. 类似TCP中的Nagle算法.
// 当获得了batch.size的同一partition的消息会立即发送, 不管linger.ms的设置. 假如要发送的消息比较少,
// 会等待指定的时间以获取更多的消息.
props.put("linger.ms", 1000);
// producer可以使用的最大内存来缓存等待发送到server端的消息.
props.put("buffer.memory", 33554432);
// 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 值序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 主题
String topic = "test";
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
消费者代码:
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class AutoCommitConsumerDemo {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
// kafka的地址
props.put("bootstrap.servers",
"synhadoop100:9092,synhadoop101:9092,synhadoop102:9092,synhadoop103:9092,synhadoop104:9092");
// 组名
// 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,
// 那么这里你只需要更改组名就可以重复消费了。
props.put("group.id", "tes1et");
// 是否自动提交,默认为true。
props.put("enable.auto.commit", "true");
// 一次最大拉取的条数。
props.put("max.poll.records", "100");
// 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@SuppressWarnings("resource")
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 添加读取topic列表
consumer.subscribe(Arrays.asList("test"));
Duration duration = Duration.of(1000, ChronoUnit.MILLIS);
while (true) {
// timeout(ms): buffer 中的数据未就绪情况下,等待的最长时间,如果设置为0,立即返回 buffer 中已经就绪的数据
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
System.out.println("~~~~~~~~~~~~~~~~~~~~~~");
Thread.sleep(1000);
}
}
}
开多个 AutoCommitConsumerDemo 的main函数,他们只有一个能读取kafka中的数据因为其group.id 相同。如果将 group.id 设置为不同的值,则多个AutoCommitConsumerDemo 会读取到相同的值,但需要他们都在运行,而不是先运行一个,等第一个拿到数据之后,会想Kafka发送数据已经读取,所以第二再开始的时候数据已经没了,并不能读取到。他这个里面有一个确认机制,一旦开启非自动确认,在程序内部没有确认的情况下,数据不会被认为读取过,就不会从Kafka中移除。
多线程同时读取代码
Maven需要配置:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.0</version>
</dependency>
多线程同时读取我现在了解的有俩种方式,第一只就是多topic,这个比较明了,一个写、一个读,写多个topic每个线程维护自己的topic。第二种是在同一个topic下建立多个partition,每个线程维护自己对应的topic下的partition.文章主要实现第二种方式,第一种大家可以根据上面的单线程进行修改即可得。
生产者代码:
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class ProducerDemo {
public static void main(String[] args) {
Properties props = new Properties();
// kafka的地址
props.put("bootstrap.servers",
"synhadoop100:9092,synhadoop101:9092,synhadoop102:9092,synhadoop103:9092,synhadoop104:9092");
// acks:消息的确认机制,默认值是0
// acks=0:如果设置为0,生产者不会等待kafka的响应。
// acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应
// acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证
props.put("acks", "all");
// 配置为大于0的值的话,客户端会在消息发送失败时重新发送
props.put("retries", 0);
// 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
props.put("batch.size", 16384);
// 在正常负载的情况下, 要想减少请求的数量. 加上一个认为的延迟: 不是立即发送消息, 而是延迟等待更多的消息一起批量发送. 类似TCP中的Nagle算法.
// 当获得了batch.size的同一partition的消息会立即发送, 不管linger.ms的设置. 假如要发送的消息比较少,
// 会等待指定的时间以获取更多的消息.
props.put("linger.ms", 1000);
// producer可以使用的最大内存来缓存等待发送到server端的消息.
props.put("buffer.memory", 33554432);
// 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 值序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String topic = "test";
Producer<String, String> producer = new KafkaProducer<>(props);
for (int j = 0; j < 3; j++) {
// 创建partition ,要提前创,虽然producer.send函数说是会自动创建topic和partition。
// 但是实际测试发现,其对于 partition 大于0的写入数据时时间会很漫长,而且没有正确send到kafka结果
KafkaUtils.createTopic(topic, j, 0, props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>(topic, j, Integer.toString(i), Integer.toString(i)));
System.out.println(j + "---------------------------");
}
}
producer.close();
}
}
多线程消费者线程类:
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
public class ThreadConsumerLoop implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public ThreadConsumerLoop(int id, String groupId, List<String> topics) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
// kafka的地址
props.put("bootstrap.servers",
"synhadoop100:9092,synhadoop101:9092,synhadoop102:9092,synhadoop103:9092,synhadoop104:9092");
// 组名
// 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,
// 那么这里你只需要更改组名就可以重复消费了。
// 同一个GROUPid
props.put("group.id", groupId);
// 是否自动提交,默认为true。
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "earliest");
// 一次最大拉取的条数。
props.put("max.poll.records", "10");
// 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
Duration duration = Duration.of(1000, ChronoUnit.MILLIS);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(duration);
// 创建 topic 对应的partition
TopicPartition tp = new TopicPartition("test", id);
// 读取对应的partition
List<ConsumerRecord<String, String>> li = records.records(tp);
for (ConsumerRecord<String, String> record : li)
System.out.println(id + "___" + record);
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
// 调用线程该方法则其会停止。
public void shutdown() {
consumer.wakeup();
}
}
多线程消费主MAIN类:
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MultiThreadConsumer {
public static void main(String[] args) {
int numConsumers = 3;
String groupId = "test";
List<String> topics = Arrays.asList("test");
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 100, 1000, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<Runnable>(100));
for (int i = 0; i < numConsumers; i++) {
ThreadConsumerLoop consumer = new ThreadConsumerLoop(i, groupId + i, topics);
executor.execute(consumer);
}
System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size()
+ ",已执行玩别的任务数目:" + executor.getCompletedTaskCount());
}
}
对于关闭自动提交的例子下面我找了一个,供参考:
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class ManualCommitConsumerDemo {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
// kafka的地址
props.put("bootstrap.servers",
"synhadoop100:9092,synhadoop101:9092,synhadoop102:9092,synhadoop103:9092,synhadoop104:9092");
// 组名
// 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,
// 那么这里你只需要更改组名就可以重复消费了。
props.put("group.id", "test");
// 是否自动提交,默认为true。*******开启非自动提交
props.put("enable.auto.commit", "false");
// 一次最大拉取的条数。
props.put("max.poll.records", "10");
// 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
@SuppressWarnings("resource")
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 读取topic list
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 20;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
Duration duration = Duration.of(1000, ChronoUnit.MILLIS);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(duration);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
// 处理完在确认消费
consumer.commitSync();
buffer.clear();
}
Thread.sleep(2000);
}
}
/**
* 处理获取到的数据
**/
private static void insertIntoDb(List<ConsumerRecord<String, String>> buffer) {
System.out.println(buffer.size());
}
}