How to call the Debug Mode in PySpark

0 votes

I am using IntelliJ IDEA set up with Apache Spark 1.4. I want to add debug points to my Spark Python scripts for easy debugging

I am currently running this bit of Python to initialise the spark process

proc = subprocess.Popen([SPARK_SUBMIT_PATH, scriptFile, inputFile], shell=SHELL_OUTPUT, stdout=subprocess.PIPE)

if VERBOSE:
    print proc.stdout.read()
    print proc.stderr.read()

When spark-submit calls myFirstSparkScript.py, the debug mode is not getting started instead it executes as normal. Editing the Apache Spark source code and running a customised copy is not a solution.

Please help me with this one.

Jul 26, 2019 in Apache Spark by nitinrawat895
• 11,380 points
5,510 views

1 answer to this question.

0 votes

As far as I understand your intentions what you want is not directly possible given Spark architecture. Even without subprocess call the only part of your program that is accessible directly on a driver is a SparkContext. From the rest you're effectively isolated by different layers of communication, including at least one (in the local mode) JVM instance. To illustrate that lets use a diagram from PySpark Internals documentation.

enter image description here

What is in the left box is the part that is accessible locally and could be used to attach a debugger. Since it is most limited to JVM calls there is really nothing there that should of interest for you, unless you're actually modifying PySpark itself.

What is on the right happens remotely and depending on a cluster manager you use is pretty much a black-box from an user perspective. Moreover there are many situations when Python code on the right does nothing more than calling JVM API.

This is was the bad part. The good part is that the most of the time there should be no need for remote debugging. Excluding accessing objects like TaskContext, which can be easily mocked, every part of your code should be easily runnable / testable locally without using Spark instance whatsoever.

Functions you pass to actions / transformations take standard and predictable Python objects and are expected to return standard Python objects as well. What is also important these should be side effects free

So at the end of the day you have to parts of your program - a thin layer that can be accessed interactively and tested based purely on inputs / outputs and "computational core" which doesn't require Spark for testing / debugging.

Other options

That being said you're not completely out of options here.

Local mode

(passively attach debugger to a running interpreter)

Both plain GDB and PySpark debugger can be attached to a running process. This can be done only, once PySpark daemon and /or worker processes have been started. In local mode you can force it by executing a dummy action, for example:

sc.parallelize([], n).count()

where n is a number of "cores" available in the local mode (local[n]). Example procedure step-by-step on Unix-like systems:

  • Start PySpark shell:

    $SPARK_HOME/bin/pyspark 
  • Use pgrep to check there is no daemon process running:

    ➜  spark-2.1.0-bin-hadoop2.7$ pgrep -f pyspark.daemon
    ➜  spark-2.1.0-bin-hadoop2.7$
  • The same thing can be determined in PyCharm by:

    alt+shift+a and choosing Attach to Local Process:

    enter image description here

    or Run -> Attach to Local Process.

    At this point you should see only PySpark shell (and possibly some unrelated processes).

    enter image description here

  • Execute dummy action:

    sc.parallelize([], 1).count()

  • Now you should see both daemon and worker (here only one):

    ➜  spark-2.1.0-bin-hadoop2.7$ pgrep -f pyspark.daemon
    13990
    14046
    ➜  spark-2.1.0-bin-hadoop2.7$

    and

    enter image description here

    The process with lower pid is a daemon, the one with higher pid is (possibly) ephemeral worker.

  • At this point you can attach debugger to a process of interest:

    • In PyCharm by choosing the process to connect.
    • With plain GDB by calling:

      gdb python <pid of running process>

The biggest disadvantage of this approach is that you have find the right interpreter at the right moment.

Distributed mode

(Using active component which connects to debugger server)

With PyCharm

PyCharm provides Python Debug Server which can be used with PySpark jobs.

First of all you should add a configuration for remote debugger:

  • alt+shift+a and choose Edit Configurations or Run -> Edit Configurations.
  • Click on Add new configuration (green plus) and choose Python Remote Debug.
  • Configure host and port according to your own configuration (make sure that port and be reached from a remote machine)

    enter image description here

  • Start debug server:

    shift+F9

    You should see debugger console:

    enter image description here

  • Make sure that pyddev is accessible on the worker nodes, either by installing it or distributing the egg file.

  • pydevd uses an active component which has to be included in your code:

    import pydevd
    pydevd.settrace(<host name>, port=<port number>)

    The tricky part is to find the right place to include it and unless you debug batch operations (like functions passed to mapPartitions) it may require patching PySpark source itself, for example pyspark.daemon.worker or RDD methods like RDD.mapPartitions. Let's say we are interested in debugging worker behavior. Possible patch can look like this:

    diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
    index 7f06d4288c..6cff353795 100644
    --- a/python/pyspark/daemon.py
    +++ b/python/pyspark/daemon.py
    @@ -44,6 +44,9 @@ def worker(sock):
         """
         Called by a worker process after the fork().
         """
    +    import pydevd
    +    pydevd.settrace('foobar', port=9999, stdoutToServer=True, stderrToServer=True)
    +
         signal.signal(SIGHUP, SIG_DFL)
         signal.signal(SIGCHLD, SIG_DFL)
         signal.signal(SIGTERM, SIG_DFL)

    If you decide to patch Spark source be sure to use patched source not packaged version which is located in $SPARK_HOME/python/lib.

  • Execute PySpark code. Go back to the debugger console and have fun:

    enter image description here

Other tools

There is a number of tools, including python-manhole or pyrasite which can be used, with some effort, to work with PySpark.

Hope this helps!

Join Pyspark training online today to know more about Pyspark.

Thanks.

answered Jul 26, 2019 by ravikiran
• 4,620 points

check out this tool pyspark_xray which solves exact problem of local debugging pyspark code that runs on worker nodes.  pyspark_xray is a diagnostic tool, in the form of Python library, for pyspark developers to debug and troubleshoot PySpark applications locally, specifically it enables local debugging of PySpark RDD transformation functions.

The purpose of developing pyspark_xray is to create a development framework that enables PySpark application developers to debug and troubleshoot locally and do production runs remotely using the same code base of a pyspark application. For the part of debugging Spark application code locally, pyspark_xray specifically provides capability of locally debugging Spark application code that runs on slave nodes, the missing of this capability is an unfilled gap for Spark application developers right now.

Related Questions In Apache Spark

0 votes
5 answers

How to change the spark Session configuration in Pyspark?

You aren't actually overwriting anything with this ...READ MORE

answered Dec 14, 2020 in Apache Spark by Gitika
• 65,910 points
121,589 views
0 votes
1 answer
0 votes
1 answer

How to get the number of elements in partition?

rdd.mapPartitions(iter => Array(iter.size).iterator, true) This command will ...READ MORE

answered May 8, 2018 in Apache Spark by kurt_cobain
• 9,390 points
1,931 views
0 votes
1 answer
+1 vote
2 answers
+1 vote
1 answer

Hadoop Mapreduce word count Program

Firstly you need to understand the concept ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 11,380 points
10,558 views
0 votes
1 answer

hadoop.mapred vs hadoop.mapreduce?

org.apache.hadoop.mapred is the Old API  org.apache.hadoop.mapreduce is the ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 11,380 points
2,185 views
+2 votes
11 answers

hadoop fs -put command?

Hi, You can create one directory in HDFS ...READ MORE

answered Mar 16, 2018 in Big Data Hadoop by nitinrawat895
• 11,380 points
104,209 views
0 votes
1 answer

How do I access the Map Task ID in Spark?

You can access task information using TaskContext: import org.apache.spark.TaskContext sc.parallelize(Seq[Int](), ...READ MORE

answered Jul 23, 2019 in Apache Spark by ravikiran
• 4,620 points
1,205 views
0 votes
1 answer

How do I connect to a HIVE Meta store through a program in SparkSQL?

In spark 2.0.+ it should look something ...READ MORE

answered Sep 5, 2019 in Apache Spark by ravikiran
• 4,620 points
4,064 views
webinar REGISTER FOR FREE WEBINAR X
REGISTER NOW
webinar_success Thank you for registering Join Edureka Meetup community for 100+ Free Webinars each month JOIN MEETUP GROUP