- Producer
- Consumer
- GroupConsumer
1、MqProducer.java
package com.neohope.kafka.test;
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class MqProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "com.neohope.kafka.test.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for(int i=0;i<100;i++) {
KeyedMessage<String, String> data = new KeyedMessage<String, String>("neoTopic", "key"+i, "value"+i);
producer.send(data);
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>("neoTopic", "-=END=-", "-=END=-");
producer.send(data);
producer.close();
}
}
2、SimplePartitioner.java
package com.neohope.kafka.test;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
3、测试
#开启zookeeper bin\windows\zookeeper-server-start.bat config\zookeeper.properties #开启kafka-server bin\windows\kafka-server-start.bat config\server.properties #开启一个consumer bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic neoTopic --from-beginning #运行MqProducer.Main即可