Spark Streaming Pyspark code not working

0 votes

I have the below Pyspark streaming code not working. I am trying to consume a simple Kafka topic (called "test") as a stream in Pyspark but the code is not displaying the message.

import os
import time
import sys
#import findspark
#findspark.init("/usr/lib/spark")
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="PythonSSKafka")
ssc = StreamingContext(sc,10)
print("ssc ================= {} {}");
kafkaStream = KafkaUtils.createStream(ssc, "test", "localhost:9092", {"imagetext":1})
print("contexts =================== {} {}");
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()

ssc.start()
time.sleep(10)
#ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

I am getting the below error. Help, please.

19/01/21 07:49:46 ERROR scheduler.TaskSetManager: Task 0 in stage 18.0 failed 1 times; aborting job
19/01/21 07:49:46 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks have all completed, from pool 
19/01/21 07:49:46 INFO scheduler.TaskSchedulerImpl: Cancelling stage 18
19/01/21 07:49:46 INFO scheduler.DAGScheduler: ResultStage 18 (start at NativeMethodAccessorImpl.java:-2) failed in 0.327 s due to Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 18, localhost, executor driver): kafka.common.InvalidConfigException: client.id localhost:9092 is illegal, contains a character other than ASCII alphanumerics, ".", "_" and "-"
at kafka.common.Config$class.validateChars(Config.scala:32)
at kafka.consumer.ConsumerConfig$.validateChars(ConsumerConfig.scala:25)
at kafka.consumer.ConsumerConfig$.validateClientId(ConsumerConfig.scala:65)
Jul 11, 2019 in Apache Spark by Rishi
395 views

1 answer to this question.

0 votes
The address you are using in the code for zookeeper is incorrect as to create producer you are using broker address localhost:9092, please change the zookeeper address in the code to localhost:2181 and also use the same address to create the topic.
answered Jul 11, 2019 by Shir

Related Questions In Apache Spark

0 votes
1 answer

Spark-shell not working

First, reboot the system. And after reboot, ...READ MORE

answered Jul 15, 2019 in Apache Spark by Mahesh
416 views
0 votes
0 answers

not able to get output in spark streaming??

Hi everyone, I tried to count individual words ...READ MORE

Feb 4 in Apache Spark by akhtar
• 4,160 points
49 views
0 votes
1 answer

Getting error while connecting zookeeper in Kafka - Spark Streaming integration

I guess you need provide this kafka.bootstrap.servers ...READ MORE

answered May 24, 2018 in Apache Spark by Shubham
• 13,380 points
946 views
0 votes
1 answer
+1 vote
1 answer
+1 vote
1 answer

Hadoop Mapreduce word count Program

Firstly you need to understand the concept ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 10,870 points
4,537 views
0 votes
1 answer

hadoop.mapred vs hadoop.mapreduce?

org.apache.hadoop.mapred is the Old API  org.apache.hadoop.mapreduce is the ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 10,870 points
643 views
+1 vote
11 answers

hadoop fs -put command?

put syntax: put <localSrc> <dest> copy syntax: copyFr ...READ MORE

answered Dec 7, 2018 in Big Data Hadoop by Aditya
25,640 views
0 votes
1 answer

Error : split value is not a member of org.apache.spark.sql.Row

spark.read.csv is used when loading into a ...READ MORE

answered Jul 10, 2019 in Apache Spark by Rishi
1,575 views
0 votes
1 answer

Error : split value is not a member of org.apache.spark.sql.Row

spark.read.csv is used when loading into a ...READ MORE

answered Jul 22, 2019 in Apache Spark by Firoz
630 views