python - Kafka consumer: AttributeError: 'list' object has no attribute 'map' -
i want read messages kafka queue in python. example, in scala it's quite easy do:
val ssc = new streamingcontext(conf, seconds(20)) // divide topic partitions val topicmessages = "mykafkatopic" val topicmessagesmap = topicmessages.split(",").map((_, kafkanumthreads)).tomap val messages = kafkautils.createstream(ssc, zkquorum, group, topicmessagesmap).map(_._2) messages.foreachrdd { rdd => //... }
i want same in python. current python code:
from pyspark.streaming import streamingcontext pyspark.streaming.kafka import kafkautils ssc = streamingcontext(sc, 20) topicmessages = "mykafkatopic" topicmessagesmap = topicmessages.split(",").map((_, kafkanumthreads)).tomap messages = kafkautils.createstream(ssc, zkquorum, group, topicmessagesmap)
however error @ line topicmessagesmap = topicmessages.split(",").map((_, kafkanumthreads)).tomap
:
attributeerror: 'list' object has no attribute 'map'
how make code working?
update:
if run code in jupyter notebook, error shown below:
messages = kafkautils.createstream(ssc, zkquorum, "spark-streaming-consumer", {inputkafkatopic: list})
spark streaming's kafka libraries not found in class path. try 1 of following.
include kafka library , dependencies in spark-submit command as
$ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.0.0 ...
download jar of artifact maven central http://search.maven.org/, group id = org.apache.spark, artifact id = spark-streaming-kafka-0-8-assembly, version = 2.0.0. then, include jar in spark-submit command as
$ bin/spark-submit --jars ...
do understand correctly way make working use spark-submit
, it's impossible run code jupyter/ipython?
Comments
Post a Comment