kafka connect到底会不会重写/丢失数据

发布网友 发布时间:2022-04-24 01:10

我来回答

1个回答

热心网友 时间:2022-04-10 14:17

要注意些注意事项于partitionconsumer
1. consumer比partition浪费kafka设计partition允许并发所consumer数要于partition数
2. consumer比partition少consumer应于partitions主要合理配consumer数partition数否则导致partition面数据取均匀
partiton数目consumer数目整数倍所partition数目重要比取24容易设定consumer数目
3. consumerpartition读数据保证数据间顺序性kafka保证partition数据序partition根据读顺序同
4. 增减consumerbrokerpartition导致rebalance所rebalanceconsumer应partition发变化
5. High-level接口获取数据候block
简单版
简单坑测试流程先proce些数据再用consumer读记加第句设置
初始offset默认非设置意思offset非何修offset默认largest即新所加配置读前proce数据且候再加smallest配置没用offset合再修需要手工或用工具改重置offset

Properties props = new Properties();
props.put("auto.offset.reset", "smallest"); //必须要加要读旧数据
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "pv");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
String topic = "page_visits";
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream> streams = consumerMap.get(topic);

KafkaStream stream = streams.get(0);
ConsumerIterator it = stream.iterator();
while (it.hasNext()){
System.out.println("message: " + new String(it.next().message()));
}

if (consumer != null) consumer.shutdown(); //其实执行面hasNextblock

用high-levelconsumer两给力工具
1. bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group pv
看前group offset状况比看pv状况3partition
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 21 21 0 none
pv page_visits 1 19 19 0 none
pv page_visits 2 20 20 0 none
关键offsetlogSizeLag
前读完所offset=logSize并且Lag=0
2. bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK earliest config/consumer.properties page_visits
3参数
[earliest | latest]表示offset置哪
consumer.properties 配置文件路径
topictopic名page_visits
我面pv group执行完操作再check group offset状况结
Group Topic Pid Offset logSize Lag Owner
pv page_visits 0 0 21 21 none
pv page_visits 1 0 19 19 none
pv page_visits 2 0 20 20 none
看offset已经清0Lag=logSize

底给原文线程consumer完整代码

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;

public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector( // 创建Connector注意面conf配置
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}

public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
}

public void run(int a_numThreads) { // 创建并发consumers
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(a_numThreads)); // 描述读取哪topic需要几线程读
Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap); // 创建Streams
List<KafkaStream> streams = consumerMap.get(topic); // 每线程应于KafkaStream

// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);

// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber)); // 启consumer thread
threadNumber++;
}
}

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);
}

public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);

ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);

try {
Thread.sleep(10000);
} catch (InterruptedException ie) {

}
example.shutdown();
}
}

SimpleConsumer
另种SimpleConsumer名字起简单接口其实low-level consumer更复杂接口
参考
候用接口?
Read a message multiple times
Consume only a subset of the partitions in a topic in a process
Manage transactions to make sure a message is processed once and only once

用接口代价即partition,broker,offset再透明需要自管理些并且要handle broker leader切换麻烦
所定要用别用
You must keep track of the offsets in your application to know where you left off consuming.
You must figure out which Broker is the lead Broker for a topic and partition
You must handle Broker leader changes
使用SimpleConsumer步骤:
Find an active Broker and find out which Broker is the leader for your topic and partition
Determine who the replica Brokers are for your topic and partition
Build the request defining what data you are interested in
Fetch the data
Identify and recover from leader changes
首先必须知道读哪topic哪partition
找负责该partitionbroker leader找存该partition副本broker
再者自写request并fetch数据
终要注意需要识别处理broker leader改变

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com