kafka producer获取源数据(kafka-0.8.X教学-Producer-API示例)
kafka producer获取源数据(kafka-0.8.X教学-Producer-API示例)集群的broker全集,但是至少需要配置两个broker以保证可用性。这里不需要配置(1)引入maven依赖<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion&
版本:kafka-0.8.X,语言:java
一、Producer API
提供单条发送和批量发送消息
/** * V: type of the message * K: type of the optional key associated with the message */ class kafka.javaapi.producer.Producer<K V> { public Producer(ProducerConfig config); /** * Sends the data to a single topic partitioned by key using either the * synchronous or the asynchronous producer * @param message the producer data object that encapsulates the topic key and message data */ public void send(KeyedMessage<K V> message); /** * Use this API to send data to multiple topics * @param messages list of producer data objects that encapsulate the topic key and message data */ public void send(List<KeyedMessage<K V>> messages); /** * Close API to close the producer pool connections to all kafka brokers. */ public void close(); }
二、示例
(1)引入maven依赖
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> </exclusions> </dependency>
(2)定义如下配置文件:
Properties props = new Properties(); props.put("metadata.broker.list" "broker1:9092 broker2:9092"); props.put("serializer.class" "kafka.serializer.StringEncoder"); props.put("partitioner.class" "example.producer.SimplePartitioner"); props.put("request.required.acks" "1"); ProducerConfig config = new ProducerConfig(props);
第一个属性"metadata.broker.list",Producer通过连接list中的broker获取集群的Leader。
这里不需要配置
集群的broker全集,但是至少需要配置两个broker以保证可用性。
第二个属性"serializer.class"定义消息的序列化方式。咱们的例子中使用了一个简单的字符串编码器。
可以通过属性"key.serializer.class"单独定义Key的序列化方式。如果不配置默认使用"serializer.class"属性定义的序列化方式。
第三个属性"partioner.class"用来定义分区策略,决定消息被发送到哪个partition。如果你没有定义该属性,
Kafka使用默认的分区器。如果key是null,Producer将会使用一个随机分区器。
(3)定义Producer对象
Producer是个模板类,需要指定key和message的类型。
Producer<String String> producer = new Producer<String String>(config);
(4)生成消息
Random rnd = new Random(); long runtime = new Date().getTime(); String ip = “192.168.2.” rnd.nextInt(255); String msg = runtime “ www.example.com ” ip;
(5)发送消息
"page_visits"是Topic名称ip用来做分区的key,如果key为null,即使定义了分区器,Kafka仍然会使用随机分区器。
KeyedMessage<String String> data = new KeyedMessage<String String>("page_visits" ip msg); producer.send(data);
三、完整代码
import java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { long events = Long.parseLong(args[0]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list" "broker1:9092 broker2:9092 "); props.put("serializer.class" "kafka.serializer.StringEncoder"); props.put("partitioner.class" "example.producer.SimplePartitioner"); props.put("request.required.acks" "1"); ProducerConfig config = new ProducerConfig(props); Producer<String String> producer = new Producer<String String>(config); for (long nEvents = 0; nEvents < events; nEvents ) { long runtime = new Date().getTime(); String ip = “192.168.2.” rnd.nextInt(255); String msg = runtime “ www.example.com ” ip; KeyedMessage<String String> data = new KeyedMessage<String String>("page_visits" ip msg); producer.send(data); } producer.close(); } }
分区器代码:
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset 1)) % a_numPartitions; } return partition; } }
四、执行
(1)首先通过命令创建page_visits Topic
bin/kafka-create-topic.sh --topic page_visits --replica 3 --zookeeper localhost:2181 --partition 5
(2)通过consumer命令消费消息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits --from-beginning
**喜欢我的小伙伴,欢迎关注我的头条号,后续会有更多精彩内容。**