Kafka produce or consume msg failed from windows clients to remote Linux servers -


i both download kafka_2.10-0.10.0.1 windows , linux machine(i have cluster has 3 linux machines ,192.168.80.128/129/130).so , use windows machine kafka client , linux machines kafka servers. try produce msg windows remote kafka server, command , response below:

f:\kafka_2.10-0.10.0.1\kafka_2.10-0.10.0.1\bin\windows>kafka-console-pr oducer.bat --broker-list 192.168.80.128:9092 --topic wuchang dadfasdf asdfasf [2016-11-08 22:41:30,311] error error when sending message topic wuchang  key: null, value: 8 bytes error: (org.apache.kafka.clients.producer.intern als.errorloggingcallback) org.apache.kafka.common.errors.timeoutexception: batch containing 2 record(s) ex pired due timeout while requesting metadata brokers wuchang-0 [2016-11-08 22:41:30,313] error error when sending message topic wuchang  key: null, value: 7 bytes error: (org.apache.kafka.clients.producer.intern als.errorloggingcallback) org.apache.kafka.common.errors.timeoutexception: batch containing 2 record(s) ex pired due timeout while requesting metadata brokers wuchang-0 

i sure kafka cluster ok because run produce , consume command directly on linux server successfully.

ofcource , comsume msg remote kafka server failed:

f:\kafka_2.10-0.10.0.1\kafka_2.10-0.10.0.1\bin\windows>kafka-console-co nsumer.bat --bootstrap-server 192.168.80.128:9092 --topic wuchang --from-beginni ng --zookeeper 192.168.80.128:2181 [2016-11-08 22:56:43,486] warn fetching topic metadata correlation id 0  topics [set(wuchang)] broker [brokerendpoint(1,vm02,9092)] failed (kafka.c lient.clientutils$) java.nio.channels.closedchannelexception         @ kafka.network.blockingchannel.send(blockingchannel.scala:110)         @ kafka.producer.syncproducer.liftedtree1$1(syncproducer.scala:80)         @ kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncp roducer.scala:79)         @ kafka.producer.syncproducer.send(syncproducer.scala:124)         @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:59)         @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:94)         @ kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consu merfetchermanager.scala:66)         @ kafka.utils.shutdownablethread.run(shutdownablethread.scala:63) 

also , want try kafka java api example on windows machine , failed without error msg,my java code :

package com.netease.ecom.data.connect.hdfs;   import com.twitter.bijection.injection; import com.twitter.bijection.avro.genericavrocodecs; import org.apache.avro.schema; import org.apache.avro.generic.genericdata; import org.apache.avro.generic.genericrecord; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerrecord;  import java.util.properties;  public class simpleavroproducer {      public static final string user_schema = "{"             + "\"type\":\"record\","             + "\"name\":\"myrecord\","             + "\"fields\":["             + "  { \"name\":\"str1\", \"type\":\"string\" },"             + "  { \"name\":\"str2\", \"type\":\"string\" },"             + "  { \"name\":\"int1\", \"type\":\"int\" }"             + "]}";      public static void main(string[] args) throws interruptedexception {         properties props = new properties();         props.put("bootstrap.servers", "192.168.80.128:9092");         props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");         props.put("value.serializer", "org.apache.kafka.common.serialization.bytearrayserializer");          schema.parser parser = new schema.parser();         schema schema = parser.parse(user_schema);         injection<genericrecord, byte[]> recordinjection = genericavrocodecs.tobinary(schema);          kafkaproducer<string, byte[]> producer = new kafkaproducer<>(props);          (int = 0; < 1000; i++) {             genericdata.record avrorecord = new genericdata.record(schema);             avrorecord.put("str1", "str 1-" + i);             avrorecord.put("str2", "str 2-" + i);             avrorecord.put("int1", i);              byte[] bytes = recordinjection.apply(avrorecord);              producerrecord<string, byte[]> record = new producerrecord<>("mytopic", bytes);             producer.send(record);              thread.sleep(250);          }          producer.close();     } } 

yes , code want send avro data kafka , , failed without errors.

one of kafka server.properties on linux machines :


Comments

Popular posts from this blog

php - How to add and update images or image url in Volusion using Volusion API -

Laravel mail error `Swift_TransportException in StreamBuffer.php line 269: Connection could not be established with host smtp.gmail.com [ #0]` -

c# SetCompatibleTextRenderingDefault must be called before the first -