Java 操作Kafka 示例

Kafka

Posted by BY tuo on February 25, 2019

单线程操作

Maven 配置

	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-clients</artifactId>
		<version>2.1.1</version>
	</dependency>

生产者代码:


import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo2 {
	public static void main(String[] args) {
		Properties props = new Properties();
		// kafka的地址
		props.put("bootstrap.servers",
				"synhadoop100:9092,synhadoop101:9092,synhadoop102:9092,synhadoop103:9092,synhadoop104:9092");
		// acks:消息的确认机制,默认值是0
		// acks=0:如果设置为0,生产者不会等待kafka的响应。
		// acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应
		// acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证
		props.put("acks", "all");
		// 配置为大于0的值的话,客户端会在消息发送失败时重新发送
		props.put("retries", 0);
		// 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
		props.put("batch.size", 16384);
		// 在正常负载的情况下, 要想减少请求的数量. 加上一个认为的延迟: 不是立即发送消息, 而是延迟等待更多的消息一起批量发送. 类似TCP中的Nagle算法.
		// 当获得了batch.size的同一partition的消息会立即发送, 不管linger.ms的设置. 假如要发送的消息比较少,
		// 会等待指定的时间以获取更多的消息.
		props.put("linger.ms", 1000);
		// producer可以使用的最大内存来缓存等待发送到server端的消息.
		props.put("buffer.memory", 33554432);
		// 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 值序列化,默认org.apache.kafka.common.serialization.StringDeserializer
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    // 主题
		String topic = "test";
		Producer<String, String> producer = new KafkaProducer<>(props);
		for (int i = 0; i < 100; i++) {
			producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)));
		}
		producer.close();
	}
}

消费者代码:

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class AutoCommitConsumerDemo {
	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		// kafka的地址
		props.put("bootstrap.servers",
				"synhadoop100:9092,synhadoop101:9092,synhadoop102:9092,synhadoop103:9092,synhadoop104:9092");
		// 组名
		// 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,
		// 那么这里你只需要更改组名就可以重复消费了。
		props.put("group.id", "tes1et");
		// 是否自动提交,默认为true。
		props.put("enable.auto.commit", "true");
		// 一次最大拉取的条数。
		props.put("max.poll.records", "100");
		// 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// 值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		@SuppressWarnings("resource")
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		// 添加读取topic列表
		consumer.subscribe(Arrays.asList("test"));
		Duration duration = Duration.of(1000, ChronoUnit.MILLIS);
		while (true) {
			// timeout(ms): buffer 中的数据未就绪情况下,等待的最长时间,如果设置为0,立即返回 buffer 中已经就绪的数据
			ConsumerRecords<String, String> records = consumer.poll(duration);
			for (ConsumerRecord<String, String> record : records)
				System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
			System.out.println("~~~~~~~~~~~~~~~~~~~~~~");
			Thread.sleep(1000);
		}
	}
}

开多个 AutoCommitConsumerDemo 的main函数,他们只有一个能读取kafka中的数据因为其group.id 相同。如果将 group.id 设置为不同的值,则多个AutoCommitConsumerDemo 会读取到相同的值,但需要他们都在运行,而不是先运行一个,等第一个拿到数据之后,会想Kafka发送数据已经读取,所以第二再开始的时候数据已经没了,并不能读取到。他这个里面有一个确认机制,一旦开启非自动确认,在程序内部没有确认的情况下,数据不会被认为读取过,就不会从Kafka中移除。

多线程同时读取代码

Maven需要配置:

	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka_2.10</artifactId>
		<version>0.10.2.0</version>
	</dependency>

多线程同时读取我现在了解的有俩种方式,第一只就是多topic,这个比较明了,一个写、一个读,写多个topic每个线程维护自己的topic。第二种是在同一个topic下建立多个partition,每个线程维护自己对应的topic下的partition.文章主要实现第二种方式,第一种大家可以根据上面的单线程进行修改即可得。

生产者代码:


import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {
	public static void main(String[] args) {
		Properties props = new Properties();
		// kafka的地址
		props.put("bootstrap.servers",
				"synhadoop100:9092,synhadoop101:9092,synhadoop102:9092,synhadoop103:9092,synhadoop104:9092");
		// acks:消息的确认机制,默认值是0
		// acks=0:如果设置为0,生产者不会等待kafka的响应。
		// acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应
		// acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证
		props.put("acks", "all");
		// 配置为大于0的值的话,客户端会在消息发送失败时重新发送
		props.put("retries", 0);
		// 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
		props.put("batch.size", 16384);
		// 在正常负载的情况下, 要想减少请求的数量. 加上一个认为的延迟: 不是立即发送消息, 而是延迟等待更多的消息一起批量发送. 类似TCP中的Nagle算法.
		// 当获得了batch.size的同一partition的消息会立即发送, 不管linger.ms的设置. 假如要发送的消息比较少,
		// 会等待指定的时间以获取更多的消息.
		props.put("linger.ms", 1000);
		// producer可以使用的最大内存来缓存等待发送到server端的消息.
		props.put("buffer.memory", 33554432);
		// 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 值序列化,默认org.apache.kafka.common.serialization.StringDeserializer
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		String topic = "test";
		Producer<String, String> producer = new KafkaProducer<>(props);
		for (int j = 0; j < 3; j++) {
			// 创建partition ,要提前创,虽然producer.send函数说是会自动创建topic和partition。
			// 但是实际测试发现,其对于 partition 大于0的写入数据时时间会很漫长,而且没有正确send到kafka结果
			KafkaUtils.createTopic(topic, j, 0, props);
			for (int i = 0; i < 100; i++) {
				producer.send(new ProducerRecord<String, String>(topic, j, Integer.toString(i), Integer.toString(i)));
				System.out.println(j + "---------------------------");
			}
		}
		producer.close();
	}
}

多线程消费者线程类:

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

public class ThreadConsumerLoop implements Runnable {
	private final KafkaConsumer<String, String> consumer;
	private final List<String> topics;
	private final int id;

	public ThreadConsumerLoop(int id, String groupId, List<String> topics) {
		this.id = id;
		this.topics = topics;
		Properties props = new Properties();
		// kafka的地址
		props.put("bootstrap.servers",
				"synhadoop100:9092,synhadoop101:9092,synhadoop102:9092,synhadoop103:9092,synhadoop104:9092");
		// 组名
		// 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,
		// 那么这里你只需要更改组名就可以重复消费了。
		// 同一个GROUPid
		props.put("group.id", groupId);

		// 是否自动提交,默认为true。
		props.put("enable.auto.commit", "false");
		props.put("auto.offset.reset", "earliest");

		// 一次最大拉取的条数。
		props.put("max.poll.records", "10");
		// 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// 值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		this.consumer = new KafkaConsumer<>(props);
	}

	@Override
	public void run() {
		try {
			consumer.subscribe(topics);
			Duration duration = Duration.of(1000, ChronoUnit.MILLIS);
			while (true) {
				ConsumerRecords<String, String> records = consumer.poll(duration);
				// 创建 topic 对应的partition 
				TopicPartition tp = new TopicPartition("test", id);
				// 读取对应的partition
				List<ConsumerRecord<String, String>> li = records.records(tp);
				for (ConsumerRecord<String, String> record : li)
					System.out.println(id + "___" + record);

			}
		} catch (WakeupException e) {
			// ignore for shutdown
		} finally {
			consumer.close();
		}
	}
	// 调用线程该方法则其会停止。
	public void shutdown() {
		consumer.wakeup();
	}
}

多线程消费主MAIN类:

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MultiThreadConsumer {
	public static void main(String[] args) {
		int numConsumers = 3;
		String groupId = "test";
		List<String> topics = Arrays.asList("test");
		ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 100, 1000, TimeUnit.MILLISECONDS,
				new LinkedBlockingDeque<Runnable>(100));
		for (int i = 0; i < numConsumers; i++) {
			ThreadConsumerLoop consumer = new ThreadConsumerLoop(i, groupId + i, topics);
			executor.execute(consumer);
		}
		System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size()
				+ ",已执行玩别的任务数目:" + executor.getCompletedTaskCount());
	}
}

对于关闭自动提交的例子下面我找了一个,供参考:

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class ManualCommitConsumerDemo {
	public static void main(String[] args) throws InterruptedException {
		Properties props = new Properties();
		// kafka的地址
		props.put("bootstrap.servers",
				"synhadoop100:9092,synhadoop101:9092,synhadoop102:9092,synhadoop103:9092,synhadoop104:9092");
		// 组名
		// 不同组名可以重复消费。例如你先使用了组名A消费了kafka的1000条数据,但是你还想再次进行消费这1000条数据,并且不想重新去产生,
		// 那么这里你只需要更改组名就可以重复消费了。
		props.put("group.id", "test");
		// 是否自动提交,默认为true。*******开启非自动提交
		props.put("enable.auto.commit", "false");
		// 一次最大拉取的条数。
		props.put("max.poll.records", "10");
		// 键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// 值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		@SuppressWarnings("resource")
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		// 读取topic list
		consumer.subscribe(Arrays.asList("foo", "bar"));
		final int minBatchSize = 20;
		List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
		Duration duration = Duration.of(1000, ChronoUnit.MILLIS);

		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(duration);
			for (ConsumerRecord<String, String> record : records) {
				buffer.add(record);
			}
			if (buffer.size() >= minBatchSize) {
				insertIntoDb(buffer);
				// 处理完在确认消费
				consumer.commitSync();
				buffer.clear();
			}
			Thread.sleep(2000);
		}

	}
	
	/**
	* 处理获取到的数据
	**/
	private static void insertIntoDb(List<ConsumerRecord<String, String>> buffer) {
		System.out.println(buffer.size());
	}
}