用java代码对kafka消息进行消费与发送,首先我们得引入相关jar包
maven:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> </dependency>
gradle:
compile("org.apache.kafka:kafka_2.10:0.8.2.1")
在新版本的kafka中(具体版本记不清楚了),添加了java代码实现的producer,consumer目前还是Scala的,之前的producer和consumer均是Scala编写的,在这里则介绍java版本的producer。
另一点需要特别注意:
当发送消息时我们不指定key时,producer将消息分发到各partition的机制是:
Scala版本的producer:在你的producer启动的时候,随机获得一个partition,然后后面的消息都会发送到这个partition,也就是说,只要程序启动了,这个producer都会往同一个partition里发送消息
java版本的producer:会轮询每个partition,所以发送的会比较平均
所以当使用Scala版本的producer时,尽量传入key,保证消息在partition的平均性
下面是具体的代码:
package cn.qlt.study.kafka; import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.apache.commons.lang.SerializationUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import cn.qlt.study.domain.User; public class KafkaUtil { private static KafkaProducer<String, byte[]> producer=null; private static ConsumerConnector consumer=null; static{ //生产者配置文件,具体配置可参考ProducerConfig类源码,或者参考官网介绍 Map<String,Object> config=new HashMap<String, Object>(); //kafka服务器地址 config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.100.90:9092,192.168.100.91:9092"); //kafka消息序列化类 即将传入对象序列化为字节数组 config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); //kafka消息key序列化类 若传入key的值,则根据该key的值进行hash散列计算出在哪个partition上 config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5); //往kafka服务器提交消息间隔时间,0则立即提交不等待 config.put(ProducerConfig.LINGER_MS_CONFIG,0); //消费者配置文件 Properties props = new Properties(); //zookeeper地址 props.put("zookeeper.connect", "192.168.100.90:2181"); //组id props.put("group.id", "123"); //自动提交消费情况间隔时间 props.put("auto.commit.interval.ms", "1000"); ConsumerConfig consumerConfig=new ConsumerConfig(props); producer=new KafkaProducer<String,byte[]>(config); consumer=Consumer.createJavaConsumerConnector(consumerConfig); } /** *启动一个消费程序 * @param topic 要消费的topic名称 * @param handler 自己的处理逻辑的实现 * @param threadCount 消费线程数,该值应小于等于partition个数,多了也没用 */ public static <T extends Serializable>void startConsumer(String topic,final MqMessageHandler<T> handler,int threadCount) throws Exception{ if(threadCount<1) throw new Exception("处理消息线程数最少为1"); //设置处理消息线程数,线程数应小于等于partition数量,若线程数大于partition数量,则多余的线程则闲置,不会进行工作 //key:topic名称 value:线程数 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(threadCount)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); //声明一个线程池,用于消费各个partition ExecutorService executor=Executors.newFixedThreadPool(threadCount); //获取对应topic的消息队列 List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); //为每一个partition分配一个线程去消费 for (final KafkaStream stream : streams) { executor.execute(new Runnable() { @Override public void run() { ConsumerIterator<byte[], byte[]> it = stream.iterator(); //有信息则消费,无信息将会阻塞 while (it.hasNext()){ T message=null; try { //将字节码反序列化成相应的对象 byte[] bytes=it.next().message(); message = (T) SerializationUtils.deserialize(bytes); } catch (Exception e) { e.printStackTrace(); return; } //调用自己的业务逻辑 try { handler.handle(message); } catch (Exception e) { e.printStackTrace(); } } } }); } } /** *发送消息,发送的对象必须是可序列化的 */ public static Future<RecordMetadata> send(String topic,Serializable value) throws Exception{ try { //将对象序列化称字节码 byte[] bytes=SerializationUtils.serialize(value); Future<RecordMetadata> future=producer.send(new ProducerRecord<String,byte[]>(topic,bytes)); return future; }catch(Exception e){ throw e; } } //内部抽象类 用于实现自己的处理逻辑 public static abstract class MqMessageHandler<T extends Serializable>{ public abstract void handle(T message); } public static void main(String[] args) throws Exception { //发送一个信息 send("test",new User("id","userName", "password")); //为test启动一个消费者,启动后每次有消息则打印对象信息 KafkaUtil.startConsumer("test", new MqMessageHandler<User>() { @Override public void handle(User user) { //实现自己的处理逻辑,这里只打印出消息 System.out.println(user.toString()); } },2); } }
相关配置解释:
producer:
1、producer的配置不需要zookeeper地址,会直接获取kafka的元数据,直接和broker进行通信
2、ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG即value.serializer,kafka生产者与broker之间数据是以byte进行传递的,所以这个参数的意思是把我们传入对象转换成byte[]的类,一般使用org.apache.kafka.common.serialization.ByteArraySerializer即可,我们自己把对象序列化为byte[]
3、ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG即key.serializer,首先说明下key值是干什么的,若我们指定了key的值,生产者则会根据该key进行hash散列计算出具体的partition。若不指定,则随机选择partition。一般情况下我们没必要指定该值。这个类与上面功能一样,即将key转换成byte[]
4、ProducerConfig.LINGER_MS_CONFIG即linger.ms,为了减少请求次数提高吞吐率,这个参数为每次提交间隔的次数,若设置了该值,如1000,则意味着我们的消息可能不会马上提交到kafka服务器,需要等上1秒中,才会进行批量提交。我们可以适当的配置该值。0为不等待立刻提交。
consumer:
1、zookeeper.connect:zookeeper的地址,多个之间用,分割
2、group.id:这个值可以随便写,但建议写点有意义的值,别随便写个123。kafka保证同一个组内的消息只会被消费一次,若需要重复消费消息,则可以配置不同的groupid。
3、auto.commit.interval.ms:consumer自己会记录消费的偏移量,并定时往zookeeper上提交,该值即为提交时间间隔,若该值设置太大可能会出现重复消费的情况,如我们停止了某个consumer,但该consumer还未往zookeeper提交某段时间的消费记录,这导致我们下次启动该消费者的时候,它会从上次提交的偏移量进行消费,这就导致了某些数据的重复消费。
注意:在杀死consumer进程后,应等一会儿再去重启,因为杀死consumer进程时,会删除zookeeper的一些临时节点,若我们马上重启的话,可能会在启动的时候那些节点还没删除掉,出现写不必要的错误
相关推荐
Simple application demonstrate kafka java springboot
Kafka-Simple-Producer-Consumer:使用Java 8的kafka的生产者和消费者的简单变化
Kafka 目前支持SSL、SASL/Kerberos、SASL/PLAIN三种认证机制 ,我拿第三种进行了 配置 。你可以直接下载 运行并测试
带有Rest URL的Kafka Producer和Consumer API的Spring Boot应用程序 生产者:将数据或消息发送到kafka服务器的应用程序 消息:一小段数据,即kafka的字节数组 使用者:数据的接收者,即从kafka服务器读取数据 Kafka...
2 : 更简单的 Java Consumer API,可以根据用户需要扩展其实现和功能。 3 : 用于监控消息系统的 Web 控制台。 如何使用 simple-kafka 创建生产者 使用 simple-kafka 创建一个 kafka 生产者是一个更简单的过程,...
Apache Kafka是一个开源流处理平台,由Scala和Java编写,由Apache软件基金会开发。它最初由LinkedIn公司开发,并于2011年初开源,2012年10月从Apache Incubator毕业。Kafka的目标是为处理实时数据提供一个统一、高...
使用java客户端, kafka-producer, kafka-consumer进行kafka的连接 注: 0.10 版本之后, 连接kafka只需要brokerip即可, 不需要zookeeper的信息
使用Kafka Consumer,Producer和Streaming API的基于Java的示例 该存储库中的示例演示了如何将Kafka Consumer,Producer和Streaming API与HDInsight集群上的Kafka一起使用。 此存储库中包含两个项目: 生产者-...
卡夫卡制片人-消费者Kafka 0.8.2.2的简单Java示例
produce启动的时候参数使用的是kafka的端口而consumer启动的时候使用的是zookeeper的端口; 单机连通性能测试 运行producer bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 运行...
首先介绍了Kafka的核心概念,如Broker、Topic、Producer、Consumer等,随后详细阐述了Kafka的安装步骤、配置过程以及其与Zookeeper的交互。通过对Kafka生产者和消费者的代码示例分析,文档展示了消息的发送和接收...
消费者生产者 这是一个监听的Kafka Spring... kafka-console-producer.sh --broker-list localhost:9092 --topic事件输入 样本数据 {“ bookId”:1,“ bookName”:“ Kafka”,“ bookAuthor”:“ Sameer”,“ is
之前一直在项目中使用kafka原生的Consumer和Producer API,但总是觉得有点不方便: 初始化加载的时候比较繁琐 使用Consumer的高级API无法真正实现批量处理(低级API超复杂又不会用T_T) 每次部署一个新topic都会进行很...
数据生产者,producer 的用法:《producer 的用法》、《producer 使用注意》 数据消费者,consumer 的用法:《consumer 的用法》 迓有些零碎的,关亍通信段的源码览读:《net 包源码览读》、《broker 配置》 扩展的...
Apache Kafka基础知识和在JAVA中实现的迷你项目(Twitter Sreaming_Producer和Consumer配置)的工作示例。 该项目是根据而开发的。 我强烈建议他在Udemy上他的课程,以帮助您了解Apache Kafka生态系统,体系结构,...
为了开始使用Apache Kafka,请从找到快速入门手册。 完成这些操作后,我们就可以开始一个一个地理解Apache Kafka的概念了。 首先,Apache Kafka是一个分布式排队系统,这意味着所产生的消息将被发送出去,并进入...
Apache Kafka入门demo,用来理解Apache Kafka的基本原理,附带必要代码注释。基于maven的用Java api编写的Producer和Consumer;
卡夫卡生产者-消费者卡夫卡生产者与消费者的测试代码简单的卡夫卡制作人阿夫罗·卡夫卡(Avro kafka)制片人简单的卡夫卡消费者阿夫罗·卡夫卡(Avro kafka)制片人
confluent-kafka-python提供了与所有兼容的高级Producer,Consumer和AdminClient 经纪人> = v0.8, 和。 客户是: 可靠-它是 (通过二进制车轮自动提供)的包装,已在各种生产场景中广泛部署。 它使用Java客户端,...
li-apache-kafka-clients介绍li-apache-kafka-clients是在香草Apache ... LiKafkaProducer和LiKafkaConsumer都实现了香草Kafka Producer和Consumer接口。 li-apache-kafka-clients也是高度可定制的,因此用户可以插