Kafka produce or consume msg failed from windows clients to remote Linux servers -
i both download kafka_2.10-0.10.0.1 windows , linux machine(i have cluster has 3 linux machines ,192.168.80.128/129/130).so , use windows machine kafka client , linux machines kafka servers. try produce msg windows remote kafka server, command , response below:
f:\kafka_2.10-0.10.0.1\kafka_2.10-0.10.0.1\bin\windows>kafka-console-pr oducer.bat --broker-list 192.168.80.128:9092 --topic wuchang dadfasdf asdfasf [2016-11-08 22:41:30,311] error error when sending message topic wuchang key: null, value: 8 bytes error: (org.apache.kafka.clients.producer.intern als.errorloggingcallback) org.apache.kafka.common.errors.timeoutexception: batch containing 2 record(s) ex pired due timeout while requesting metadata brokers wuchang-0 [2016-11-08 22:41:30,313] error error when sending message topic wuchang key: null, value: 7 bytes error: (org.apache.kafka.clients.producer.intern als.errorloggingcallback) org.apache.kafka.common.errors.timeoutexception: batch containing 2 record(s) ex pired due timeout while requesting metadata brokers wuchang-0
i sure kafka cluster ok because run produce , consume command directly on linux server successfully.
ofcource , comsume msg remote kafka server failed:
f:\kafka_2.10-0.10.0.1\kafka_2.10-0.10.0.1\bin\windows>kafka-console-co nsumer.bat --bootstrap-server 192.168.80.128:9092 --topic wuchang --from-beginni ng --zookeeper 192.168.80.128:2181 [2016-11-08 22:56:43,486] warn fetching topic metadata correlation id 0 topics [set(wuchang)] broker [brokerendpoint(1,vm02,9092)] failed (kafka.c lient.clientutils$) java.nio.channels.closedchannelexception @ kafka.network.blockingchannel.send(blockingchannel.scala:110) @ kafka.producer.syncproducer.liftedtree1$1(syncproducer.scala:80) @ kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncp roducer.scala:79) @ kafka.producer.syncproducer.send(syncproducer.scala:124) @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:59) @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:94) @ kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consu merfetchermanager.scala:66) @ kafka.utils.shutdownablethread.run(shutdownablethread.scala:63)
also , want try kafka java api example on windows machine , failed without error msg,my java code :
package com.netease.ecom.data.connect.hdfs; import com.twitter.bijection.injection; import com.twitter.bijection.avro.genericavrocodecs; import org.apache.avro.schema; import org.apache.avro.generic.genericdata; import org.apache.avro.generic.genericrecord; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerrecord; import java.util.properties; public class simpleavroproducer { public static final string user_schema = "{" + "\"type\":\"record\"," + "\"name\":\"myrecord\"," + "\"fields\":[" + " { \"name\":\"str1\", \"type\":\"string\" }," + " { \"name\":\"str2\", \"type\":\"string\" }," + " { \"name\":\"int1\", \"type\":\"int\" }" + "]}"; public static void main(string[] args) throws interruptedexception { properties props = new properties(); props.put("bootstrap.servers", "192.168.80.128:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.bytearrayserializer"); schema.parser parser = new schema.parser(); schema schema = parser.parse(user_schema); injection<genericrecord, byte[]> recordinjection = genericavrocodecs.tobinary(schema); kafkaproducer<string, byte[]> producer = new kafkaproducer<>(props); (int = 0; < 1000; i++) { genericdata.record avrorecord = new genericdata.record(schema); avrorecord.put("str1", "str 1-" + i); avrorecord.put("str2", "str 2-" + i); avrorecord.put("int1", i); byte[] bytes = recordinjection.apply(avrorecord); producerrecord<string, byte[]> record = new producerrecord<>("mytopic", bytes); producer.send(record); thread.sleep(250); } producer.close(); } }
yes , code want send avro data kafka , , failed without errors.
one of kafka server.properties on linux machines :
Comments
Post a Comment