Spark Streaming Pyspark code not working

0 votes

I have the below Pyspark streaming code not working. I am trying to consume a simple Kafka topic (called "test") as a stream in Pyspark but the code is not displaying the message.

import os
import time
import sys
#import findspark
#findspark.init("/usr/lib/spark")
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="PythonSSKafka")
ssc = StreamingContext(sc,10)
print("ssc ================= {} {}");
kafkaStream = KafkaUtils.createStream(ssc, "test", "localhost:9092", {"imagetext":1})
print("contexts =================== {} {}");
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()

ssc.start()
time.sleep(10)
#ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

I am getting the below error. Help, please.

19/01/21 07:49:46 ERROR scheduler.TaskSetManager: Task 0 in stage 18.0 failed 1 times; aborting job
19/01/21 07:49:46 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks have all completed, from pool 
19/01/21 07:49:46 INFO scheduler.TaskSchedulerImpl: Cancelling stage 18
19/01/21 07:49:46 INFO scheduler.DAGScheduler: ResultStage 18 (start at NativeMethodAccessorImpl.java:-2) failed in 0.327 s due to Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 18, localhost, executor driver): kafka.common.InvalidConfigException: client.id localhost:9092 is illegal, contains a character other than ASCII alphanumerics, ".", "_" and "-"
at kafka.common.Config$class.validateChars(Config.scala:32)
at kafka.consumer.ConsumerConfig$.validateChars(ConsumerConfig.scala:25)
at kafka.consumer.ConsumerConfig$.validateClientId(ConsumerConfig.scala:65)
Jul 11 in Apache Spark by Rishi
14 views

1 answer to this question.

0 votes
The address you are using in the code for zookeeper is incorrect as to create producer you are using broker address localhost:9092, please change the zookeeper address in the code to localhost:2181 and also use the same address to create the topic.
answered Jul 11 by Shir

Related Questions In Apache Spark

0 votes
1 answer

Spark-shell not working

First, reboot the system. And after reboot, ...READ MORE

answered 4 days ago in Apache Spark by Mahesh
13 views
0 votes
1 answer

Getting error while connecting zookeeper in Kafka - Spark Streaming integration

I guess you need provide this kafka.bootstrap.servers ...READ MORE

answered May 24, 2018 in Apache Spark by Shubham
• 13,190 points
447 views
0 votes
1 answer
0 votes
1 answer

How can I write a text file in HDFS not from an RDD, in Spark program?

Yes, you can go ahead and write ...READ MORE

answered May 29, 2018 in Apache Spark by Shubham
• 13,190 points
865 views
0 votes
0 answers
0 votes
1 answer

Hadoop Mapreduce word count Program

Firstly you need to understand the concept ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 10,110 points
2,046 views
0 votes
1 answer

hadoop.mapred vs hadoop.mapreduce?

org.apache.hadoop.mapred is the Old API  org.apache.hadoop.mapreduce is the ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 10,110 points
196 views
0 votes
10 answers

hadoop fs -put command?

copy command can be used to copy files ...READ MORE

answered Dec 7, 2018 in Big Data Hadoop by Sujay
10,475 views
0 votes
1 answer

Error : split value is not a member of org.apache.spark.sql.Row

spark.read.csv is used when loading into a ...READ MORE

answered Jul 10 in Apache Spark by Rishi
29 views
0 votes
1 answer

Spark: Saving file csv

 If you need a single output file ...READ MORE

answered May 22 in Apache Spark by Rishi
26 views