1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
| import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer;
public class KafkaConsumerDemo { private final static String TOPIC = "test"; private final static String BOOTSTRAP_SERVERS = "localhost:9092";
private static Consumer<Long, String> createConsumer() { final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleConsumer"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
return consumer; }
public static void getPartitionsForTopic() { final Consumer<Long, String> consumer = createConsumer();
Collection<PartitionInfo> partitionInfos = consumer.partitionsFor(TOPIC); System.out.println("Get the partition info as below:"); List<TopicPartition> tp =new ArrayList<TopicPartition>(); partitionInfos.forEach(str -> { System.out.println("Partition Info:"); System.out.println(str);
tp.add(new TopicPartition(TOPIC,str.partition())); consumer.assign(tp); consumer.seekToEnd(tp);
System.out.println("Partition " + str.partition() + " 's latest offset is '" + consumer.position(new TopicPartition(TOPIC, str.partition()))); }); }
public static void run() throws InterruptedException { final Consumer<Long, String> consumer = createConsumer(); consumer.subscribe(Collections.singletonList(TOPIC)); final int giveUp = 100; int noRecordsCount = 0;
while(true){ final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);
if(consumerRecords.count()==0){ noRecordsCount++; if(noRecordsCount > giveUp) break; else continue; }
consumerRecords.forEach(record -> { System.out.printf("Consumer Record:(%d, %s, %d, %d)\n", record.key(), record.value(), record.partition(), record.offset()); });
consumer.commitAsync(); }
consumer.close(); System.out.println("Kafka Consumer Exited"); } }
|