Spark Streaming Pyspark code not working

0 votes

I have the below Pyspark streaming code not working. I am trying to consume a simple Kafka topic (called "test") as a stream in Pyspark but the code is not displaying the message.

import os
import time
import sys
#import findspark
#findspark.init("/usr/lib/spark")
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="PythonSSKafka")
ssc = StreamingContext(sc,10)
print("ssc ================= {} {}");
kafkaStream = KafkaUtils.createStream(ssc, "test", "localhost:9092", {"imagetext":1})
print("contexts =================== {} {}");
lines = kafkaStream.map(lambda x: x[1])
lines.pprint()

ssc.start()
time.sleep(10)
#ssc.awaitTermination()
ssc.stop(stopSparkContext=True,stopGraceFully=True)

I am getting the below error. Help, please.

19/01/21 07:49:46 ERROR scheduler.TaskSetManager: Task 0 in stage 18.0 failed 1 times; aborting job
19/01/21 07:49:46 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 18.0, whose tasks have all completed, from pool 
19/01/21 07:49:46 INFO scheduler.TaskSchedulerImpl: Cancelling stage 18
19/01/21 07:49:46 INFO scheduler.DAGScheduler: ResultStage 18 (start at NativeMethodAccessorImpl.java:-2) failed in 0.327 s due to Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 18, localhost, executor driver): kafka.common.InvalidConfigException: client.id localhost:9092 is illegal, contains a character other than ASCII alphanumerics, ".", "_" and "-"
at kafka.common.Config$class.validateChars(Config.scala:32)
at kafka.consumer.ConsumerConfig$.validateChars(ConsumerConfig.scala:25)
at kafka.consumer.ConsumerConfig$.validateClientId(ConsumerConfig.scala:65)
Jul 11, 2019 in Apache Spark by Rishi
1,387 views

1 answer to this question.

0 votes

The address you are using in the code for zookeeper is incorrect as to create producer you are using broker address localhost:9092, please change the zookeeper address in the code to localhost:2181 and also use the same address to create the topic.

After that it will work.

To know more about it, get your Pyspark certification today and become expert.

Thanks.

answered Jul 11, 2019 by Shir

Related Questions In Apache Spark

0 votes
1 answer

Spark-shell not working

First, reboot the system. And after reboot, ...READ MORE

answered Jul 15, 2019 in Apache Spark by Mahesh
2,843 views
0 votes
0 answers

not able to get output in spark streaming??

Hi everyone, I tried to count individual words ...READ MORE

Feb 4, 2020 in Apache Spark by akhtar
• 38,220 points
287 views
0 votes
1 answer

Getting error while connecting zookeeper in Kafka - Spark Streaming integration

I guess you need provide this kafka.bootstrap.servers ...READ MORE

answered May 24, 2018 in Apache Spark by Shubham
• 13,480 points
1,863 views
0 votes
1 answer
+1 vote
2 answers
+1 vote
1 answer

Hadoop Mapreduce word count Program

Firstly you need to understand the concept ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 11,380 points
8,057 views
0 votes
1 answer

hadoop.mapred vs hadoop.mapreduce?

org.apache.hadoop.mapred is the Old API  org.apache.hadoop.mapreduce is the ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 11,380 points
1,372 views
+2 votes
11 answers

hadoop fs -put command?

Hi, You can create one directory in HDFS ...READ MORE

answered Mar 16, 2018 in Big Data Hadoop by nitinrawat895
• 11,380 points
67,326 views
0 votes
2 answers

Error : split value is not a member of org.apache.spark.sql.Row

var d=rdd2col.rdd.map(x=>x.split(",")) or val names=rd ...READ MORE

answered Aug 5, 2020 in Apache Spark by Ramkumar Ramasamy.
6,439 views
0 votes
1 answer

Error : split value is not a member of org.apache.spark.sql.Row

spark.read.csv is used when loading into a ...READ MORE

answered Jul 22, 2019 in Apache Spark by Firoz
1,910 views