Java通过zk连接kafka,程序未报错,但是取不到数据。将程序在另一台主机

发布网友

我来回答

2个回答

热心网友

publicstaticvoidconsumer(){Propertiesprops=newProperties();props.put("zk.connect","hadoop-2:2181");props.put("zk.connectiontimeout.ms","1000000");props.put("groupid","fans_group");//CreatetheconnectiontotheclusterConsumerConfigconsumerConfig=newConsumerConfig(props);ConsumerConnectorconsumerConnector=Consumer.createJavaConsumerConnector(consumerConfig);Mapmap=newHashMap();map.put("fans",1);//create4partitionsofthestreamfortopic“test”,toallow4threadstoconsumeMap>>topicMessageStreams=consumerConnector.createMessageStreams(map);List>streams=topicMessageStreams.get("fans");//createlistof4threadstoconsumefromeachofthepartitionsExecutorServiceexecutor=Executors.newFixedThreadPool(1);longstartTime=System.currentTimeMillis();//consumethemessagesinthethreadsfor(finalKafkaStreamstream:streams){executor.submit(newRunnable(){publicvoidrun(){ConsumerIteratorit=stream.iterator();while(it.hasNext()){log.debug(byteBufferToString(it.next().message().payload()));}}});log.debug("usetime="+(System.currentTimeMillis()-startTime));}}追问没有

热心网友

配置上hosts,通过zk连接kafka要保证zk端口通,并且kafka也要通;同时zk中配置的kafka的主机名要与hosts的一致。

热心网友

publicstaticvoidconsumer(){Propertiesprops=newProperties();props.put("zk.connect","hadoop-2:2181");props.put("zk.connectiontimeout.ms","1000000");props.put("groupid","fans_group");//CreatetheconnectiontotheclusterConsumerConfigconsumerConfig=newConsumerConfig(props);ConsumerConnectorconsumerConnector=Consumer.createJavaConsumerConnector(consumerConfig);Mapmap=newHashMap();map.put("fans",1);//create4partitionsofthestreamfortopic“test”,toallow4threadstoconsumeMap>>topicMessageStreams=consumerConnector.createMessageStreams(map);List>streams=topicMessageStreams.get("fans");//createlistof4threadstoconsumefromeachofthepartitionsExecutorServiceexecutor=Executors.newFixedThreadPool(1);longstartTime=System.currentTimeMillis();//consumethemessagesinthethreadsfor(finalKafkaStreamstream:streams){executor.submit(newRunnable(){publicvoidrun(){ConsumerIteratorit=stream.iterator();while(it.hasNext()){log.debug(byteBufferToString(it.next().message().payload()));}}});log.debug("usetime="+(System.currentTimeMillis()-startTime));}}追问没有

热心网友

配置上hosts,通过zk连接kafka要保证zk端口通,并且kafka也要通;同时zk中配置的kafka的主机名要与hosts的一致。

声明声明:本网页内容为用户发布,旨在传播知识,不代表本网认同其观点,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。E-MAIL:11247931@qq.com