| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 
 | package cn.idea360.assistant.dev.kafka;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 
 import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
 
 
 
 
 
 @Slf4j
 public class SimpleKafkaClient {
 
 public SimpleKafkaClient() {
 }
 
 public static KafkaProducer<String, String> createProducer() {
 
 Properties props = new Properties();
 
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.10.205:9092");
 
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
 
 props.put(ProducerConfig.ACKS_CONFIG, "all");
 
 props.put(ProducerConfig.RETRIES_CONFIG, 3);
 
 props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
 
 props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
 
 props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
 
 props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);
 
 props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
 
 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
 
 return new KafkaProducer<>(props);
 }
 
 public static KafkaConsumer<String, String> createConsumer() {
 
 Properties props = new Properties();
 
 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.10.205:9092");
 
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group");
 
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
 
 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
 
 props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
 
 props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
 
 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000);
 
 return new KafkaConsumer<>(props);
 }
 
 public static void main(String[] args) {
 String topic = "test_data";
 String key = "key";
 String value = "idea360.cn";
 
 KafkaProducer<String, String> producer = CustomKafkaClient.createProducer();
 ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, key, value);
 producer.send(producerRecord, (metadata, exception) -> {
 if (exception == null) {
 System.out.printf("Sent record [%s] with key [%s] to partition [%s] with offset [%s]%n", value, key, metadata.partition(), metadata.offset());
 } else {
 exception.printStackTrace();
 }
 });
 producer.close();
 
 
 KafkaConsumer<String, String> consumer = CustomKafkaClient.createConsumer();
 consumer.subscribe(Collections.singletonList(topic));
 try {
 while (true) {
 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
 for (ConsumerRecord<String, String> record : records) {
 System.out.printf("Consumed record with key [%s] and value [%s] from partition [%d] with offset [%d]%n",
 record.key(), record.value(), record.partition(), record.offset());
 }
 consumer.commitSync();
 }
 } finally {
 consumer.close();
 }
 }
 }
 
 |