发布网友
共2个回答
热心网友
publicstaticvoidconsumer(){Propertiesprops=newProperties();props.put("zk.connect","hadoop-2:2181");props.put("zk.connectiontimeout.ms","1000000");props.put("groupid","fans_group");//CreatetheconnectiontotheclusterConsumerConfigconsumerConfig=newConsumerConfig(props);ConsumerConnectorconsumerConnector=Consumer.createJavaConsumerConnector(consumerConfig);Mapmap=newHashMap();map.put("fans",1);//create4partitionsofthestreamfortopic“test”,toallow4threadstoconsumeMap>>topicMessageStreams=consumerConnector.createMessageStreams(map);List>streams=topicMessageStreams.get("fans");//createlistof4threadstoconsumefromeachofthepartitionsExecutorServiceexecutor=Executors.newFixedThreadPool(1);longstartTime=System.currentTimeMillis();//consumethemessagesinthethreadsfor(finalKafkaStreamstream:streams){executor.submit(newRunnable(){publicvoidrun(){ConsumerIteratorit=stream.iterator();while(it.hasNext()){log.debug(byteBufferToString(it.next().message().payload()));}}});log.debug("usetime="+(System.currentTimeMillis()-startTime));}}追问没有
热心网友
配置上hosts,通过zk连接kafka要保证zk端口通,并且kafka也要通;同时zk中配置的kafka的主机名要与hosts的一致。
热心网友
publicstaticvoidconsumer(){Propertiesprops=newProperties();props.put("zk.connect","hadoop-2:2181");props.put("zk.connectiontimeout.ms","1000000");props.put("groupid","fans_group");//CreatetheconnectiontotheclusterConsumerConfigconsumerConfig=newConsumerConfig(props);ConsumerConnectorconsumerConnector=Consumer.createJavaConsumerConnector(consumerConfig);Mapmap=newHashMap();map.put("fans",1);//create4partitionsofthestreamfortopic“test”,toallow4threadstoconsumeMap>>topicMessageStreams=consumerConnector.createMessageStreams(map);List>streams=topicMessageStreams.get("fans");//createlistof4threadstoconsumefromeachofthepartitionsExecutorServiceexecutor=Executors.newFixedThreadPool(1);longstartTime=System.currentTimeMillis();//consumethemessagesinthethreadsfor(finalKafkaStreamstream:streams){executor.submit(newRunnable(){publicvoidrun(){ConsumerIteratorit=stream.iterator();while(it.hasNext()){log.debug(byteBufferToString(it.next().message().payload()));}}});log.debug("usetime="+(System.currentTimeMillis()-startTime));}}追问没有
热心网友
配置上hosts,通过zk连接kafka要保证zk端口通,并且kafka也要通;同时zk中配置的kafka的主机名要与hosts的一致。