快捷搜索:  汽车  科技

kafka producer获取源数据(kafka-0.8.X教学-Producer-API示例)

kafka producer获取源数据(kafka-0.8.X教学-Producer-API示例)集群的broker全集,但是至少需要配置两个broker以保证可用性。这里不需要配置(1)引入maven依赖<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion&

kafka producer获取源数据(kafka-0.8.X教学-Producer-API示例)(1)

版本:kafka-0.8.X,语言:java

一、Producer API

提供单条发送和批量发送消息

/** * V: type of the message * K: type of the optional key associated with the message */ class kafka.javaapi.producer.Producer<K V> { public Producer(ProducerConfig config); /** * Sends the data to a single topic partitioned by key using either the * synchronous or the asynchronous producer * @param message the producer data object that encapsulates the topic key and message data */ public void send(KeyedMessage<K V> message); /** * Use this API to send data to multiple topics * @param messages list of producer data objects that encapsulate the topic key and message data */ public void send(List<KeyedMessage<K V>> messages); /** * Close API to close the producer pool connections to all kafka brokers. */ public void close(); }

二、示例

(1)引入maven依赖

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>jmxri</artifactId> <groupId>com.sun.jmx</groupId> </exclusion> <exclusion> <artifactId>jms</artifactId> <groupId>javax.jms</groupId> </exclusion> <exclusion> <artifactId>jmxtools</artifactId> <groupId>com.sun.jdmk</groupId> </exclusion> </exclusions> </dependency>

(2)定义如下配置文件:

Properties props = new Properties(); props.put("metadata.broker.list" "broker1:9092 broker2:9092"); props.put("serializer.class" "kafka.serializer.StringEncoder"); props.put("partitioner.class" "example.producer.SimplePartitioner"); props.put("request.required.acks" "1"); ProducerConfig config = new ProducerConfig(props);

第一个属性"metadata.broker.list",Producer通过连接list中的broker获取集群的Leader。

这里不需要配置

集群的broker全集,但是至少需要配置两个broker以保证可用性。

第二个属性"serializer.class"定义消息的序列化方式。咱们的例子中使用了一个简单的字符串编码器。

可以通过属性"key.serializer.class"单独定义Key的序列化方式。如果不配置默认使用"serializer.class"属性定义的序列化方式。

第三个属性"partioner.class"用来定义分区策略,决定消息被发送到哪个partition。如果你没有定义该属性,

Kafka使用默认的分区器。如果key是null,Producer将会使用一个随机分区器。

(3)定义Producer对象

Producer是个模板类,需要指定key和message的类型。

Producer<String String> producer = new Producer<String String>(config);

(4)生成消息

Random rnd = new Random(); long runtime = new Date().getTime(); String ip = “192.168.2.” rnd.nextInt(255); String msg = runtime “ www.example.com ” ip;

(5)发送消息

"page_visits"是Topic名称ip用来做分区的key,如果key为null,即使定义了分区器,Kafka仍然会使用随机分区器。

KeyedMessage<String String> data = new KeyedMessage<String String>("page_visits" ip msg); producer.send(data);

三、完整代码

import java.util.*; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class TestProducer { public static void main(String[] args) { long events = Long.parseLong(args[0]); Random rnd = new Random(); Properties props = new Properties(); props.put("metadata.broker.list" "broker1:9092 broker2:9092 "); props.put("serializer.class" "kafka.serializer.StringEncoder"); props.put("partitioner.class" "example.producer.SimplePartitioner"); props.put("request.required.acks" "1"); ProducerConfig config = new ProducerConfig(props); Producer<String String> producer = new Producer<String String>(config); for (long nEvents = 0; nEvents < events; nEvents ) { long runtime = new Date().getTime(); String ip = “192.168.2.” rnd.nextInt(255); String msg = runtime “ www.example.com ” ip; KeyedMessage<String String> data = new KeyedMessage<String String>("page_visits" ip msg); producer.send(data); } producer.close(); } }

分区器代码:

import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class SimplePartitioner implements Partitioner { public SimplePartitioner (VerifiableProperties props) { } public int partition(Object key int a_numPartitions) { int partition = 0; String stringKey = (String) key; int offset = stringKey.lastIndexOf('.'); if (offset > 0) { partition = Integer.parseInt( stringKey.substring(offset 1)) % a_numPartitions; } return partition; } }

四、执行

(1)首先通过命令创建page_visits Topic

bin/kafka-create-topic.sh --topic page_visits --replica 3 --zookeeper localhost:2181 --partition 5

(2)通过consumer命令消费消息

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits --from-beginning

**喜欢我的小伙伴,欢迎关注我的头条号,后续会有更多精彩内容。**

猜您喜欢: