hadoop - Spark Streaming with large messages java.lang.OutOfMemoryError: Java heap space -
i using spark streaming 1.6.1 kafka0.9.0.1 (createstreams api) hdp 2.4.2, use case sends large messages kafka topics ranges 5mb 30 mb in such cases spark streaming fails complete job , crashes below exception.i doing dataframe operation , saving on hdfs in csv format, below code snippet
reading kafka topic: val lines = kafkautils.createstream[string, string, stringdecoder, stringdecoder](ssc, kafkaparams, topicmap, storagelevel.memory_and_disk_ser_2/*memory_only_ser_2*/).map(_._2) writing on hdfs: val hdfsdf: dataframe = getdf(sqlcontext, eventdf, schema,topicname) hdfsdf.show hdfsdf.write .format("com.databricks.spark.csv") .option("header", "false") .save(hdfspath + "/" + "out_" + system.currenttimemillis().tostring()) 16/11/11 12:12:35 warn receivertracker: error reported receiver stream 0: error handling message; exiting - java.lang.outofmemoryerror: java heap space @ java.lang.stringcoding$stringdecoder.decode(stringcoding.java:149) @ java.lang.stringcoding.decode(stringcoding.java:193) @ java.lang.string.<init>(string.java:426) @ java.lang.string.<init>(string.java:491) @ kafka.serializer.stringdecoder.frombytes(decoder.scala:50) @ kafka.serializer.stringdecoder.frombytes(decoder.scala:42) @ kafka.message.messageandmetadata.message(messageandmetadata.scala:32) @ org.apache.spark.streaming.kafka.kafkareceiver$messagehandler.run(kafkainputdstream.scala:137) @ java.util.concurrent.executors$runnableadapter.call(executors.java:511) @ java.util.concurrent.futuretask.run(futuretask.java:266) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745)
followed :
java.lang.exception: not compute split, block input-0-1478610837000 not found @ org.apache.spark.rdd.blockrdd.compute(blockrdd.scala:51) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:313) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:277) @ org.apache.spark.rdd.mappartitionsrdd.compute(mappartitionsrdd.scala:38) @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:313) @ org.apache.spark.rdd.rdd.iterator(rdd.scala:277) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:66) @ org.apache.spark.scheduler.task.run(task.scala:89) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:214) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1142) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:617) @ java.lang.thread.run(thread.java:745) driver stacktrace:
Comments
Post a Comment