转载

kafka-简单事例

开始创建项目,这里所用的工程结构是maven。

在pox.xml中添加kafka的依赖包,如下所示:

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka_2.9.1</artifactId>

<version>0.8.2.2</version>

</dependency>

  依赖配置参照  http://mvnrepository.com/search?q=kafka 中对应的kafka版本

下面开始编写代码:

1.ConfigureAPI

首先是一个配置结构类文件,配置Kafka的相关参数,代码如下所示:

package com.kafka;  public class ConfigureAPI {  public interface KafkaProperties{   public final static String ZK = "123.56.91.163:2181";         public final static String GROUP_ID = "test-consumer-group";         public final static String TOPIC = "my-topic";         public final static String BROKER_LIST = "123.56.91.163:9092";         public final static int BUFFER_SIZE = 64 * 1024;         public final static int TIMEOUT = 20000;         public final static int INTERVAL = 10000;  } }

2.Producer

生产消息程序,用于产生Kafka的消息供Consumer去消费,具体代码如下所示:

package com.kafka;  import java.util.Properties; import com.kafka.ConfigureAPI.KafkaProperties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;  public class TProducer extends Thread {   private Producer<Integer, String> producer;      private String topic;      private Properties props = new Properties();      private final int SLEEP = 1000 * 3;       public TProducer(String topic) {       //配置value序列化类          props.put("serializer.class", "kafka.serializer.StringEncoder");          //配置kafka端口          props.put("metadata.broker.list", KafkaProperties.BROKER_LIST);          props.put("request.required.acks", "1");          producer = new Producer<Integer, String>(new ProducerConfig(props));          this.topic = topic;      }       @Override      public void run() {          int offsetNo = 1;          while (true) {              String msg = new String("message_" + offsetNo);              System.out.println("Send-> " + msg + " ");              producer.send(new KeyedMessage<Integer, String>(topic, msg));              offsetNo++;              try {                  sleep(SLEEP);              } catch (Exception ex) {                  ex.printStackTrace();              }          }      } }

3.Consumer

消费程序,用于消费Kafka的消息,代码如下所示:

package com.kafka;  import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;  import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;  import com.kafka.ConfigureAPI.KafkaProperties;  public class TConsumer extends Thread {  private ConsumerConnector consumer;  private String topic;  private final int SLEEP = 1000 * 3;    public TConsumer(String topic) {         consumer = Consumer.createJavaConsumerConnector(this.consumerConfig());         this.topic = topic;     }    private ConsumerConfig consumerConfig() {         Properties props = new Properties();         props.put("zookeeper.connect", KafkaProperties.ZK);         props.put("group.id", KafkaProperties.GROUP_ID);         //zookeeper超时         props.put("zookeeper.session.timeout.ms", "40000");         props.put("zookeeper.sync.time.ms", "200");         props.put("auto.commit.interval.ms", "1000");         return new ConsumerConfig(props);     }    @Override     public void run() {         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();         topicCountMap.put(topic, new Integer(1));         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);         KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);         ConsumerIterator<byte[], byte[]> it = stream.iterator();         while (it.hasNext()) {             System.out.println("Receive->[" + new String(it.next().message()) + "]");             try {                 sleep(SLEEP);             } catch (Exception ex) {                 ex.printStackTrace();             }         }     } }

4.在开发完Consumer和Producer的代码后,我们来测试相关应用,下面是一个Client去测试Consumer和Producer,具体代码如下所示:

package com.kafka;  import com.kafka.ConfigureAPI.KafkaProperties;  public class KafkaClient {  public static void main(String[] args) {         TProducer pro = new TProducer(KafkaProperties.TOPIC);         pro.start();          TConsumer con = new TConsumer(KafkaProperties.TOPIC);         con.start();     } }

运行结果如下:

kafka-简单事例

注意:我在测试阶段遇到一个问题就是一直发送不成功,最后定位到kafka启动问题。

之前部署安装 http://www.cnblogs.com/yinchengzhe/p/5111672.html 时,在配置config/server.properties中配置了host.name参数,启动时

是按内网启动的,外网无法访问进去。如果你本地代码和服务器不是同一网络,需要把这个参数去掉

原文  http://www.cnblogs.com/yinchengzhe/p/5129974.html
正文到此结束
Loading...