Primary keys in Apache Spark

+1 vote
I am having a JDBC connection with Apache Spark and PostgreSQL and I want to insert some data into my database. When I use append mode I need to specify id for each DataFrame.Row. Is there any way for Spark to create primary keys?
Aug 9, 2019 in Apache Spark by nitinrawat895
• 11,380 points

1 answer to this question.

0 votes

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Extract schema for further usage:

val schema = df.schema

Add id field:

val rows ={
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Create DataFrame:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

The same thing in Python:

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

If you prefer the consecutive number you can replace zipWithUniqueId with zipWithIndex but it is a little bit more expensive.

Directly with DataFrame API:

(universal Scala, Python, Java, R with pretty much the same syntax)

Previously I've missed monotonically increasing id function which should work just fine as long as you don't require consecutive numbers:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

While useful monotonically increasing id is non-deterministic. Not only ids may be different from execution to execution but without additional tricks cannot be used to identify rows when subsequent operations contain filters.


It is also possible to use the rowNumber window function:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()


WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

So unless you have a natural way to partition your data and ensure uniqueness is not particularly useful at this moment.

To know more about SQL, It's recommended to join PySpark Course today.

answered Aug 9, 2019 by ravikiran
• 4,620 points

Related Questions In Apache Spark

+5 votes
11 answers

Concatenate columns in apache spark dataframe

its late but this how you can ...READ MORE

answered Mar 21, 2019 in Apache Spark by anonymous
0 votes
1 answer

cache tables in apache spark sql

Caching the tables puts the whole table ...READ MORE

answered May 4, 2018 in Apache Spark by Data_Nerd
• 2,390 points
0 votes
1 answer

Ways to create RDD in Apache Spark

There are two popular ways using which ...READ MORE

answered Jun 19, 2018 in Apache Spark by nitinrawat895
• 11,380 points
+1 vote
8 answers

How to print the contents of RDD in Apache Spark?

Save it to a text file: line.saveAsTextFile("alicia.txt") Print contains ...READ MORE

answered Dec 10, 2018 in Apache Spark by Akshay
0 votes
1 answer

What do we exactly mean by “Hadoop” – the definition of Hadoop?

The official definition of Apache Hadoop given ...READ MORE

answered Mar 16, 2018 in Big Data Hadoop by Shubham
+1 vote
1 answer
0 votes
3 answers

Can we run Spark without using Hadoop?

No, you can run spark without hadoop. ...READ MORE

answered May 7, 2019 in Big Data Hadoop by pradeep
0 votes
1 answer

Joining Multiple Spark Dataframes

You can run the below code to ...READ MORE

answered Mar 26, 2018 in Big Data Hadoop by Bharani
• 4,660 points
0 votes
1 answer

Primary keys in Apache Spark

I found the following solution to be ...READ MORE

answered Sep 11, 2019 in Apache Spark by ravikiran
• 4,620 points
+1 vote
1 answer

How do I turn off INFO Logging in Spark?

Hi, You need to edit one property in ...READ MORE

answered Jul 12, 2019 in Apache Spark by ravikiran
• 4,620 points

edited Dec 20, 2020 by MD 5,742 views
webinar_success Thank you for registering Join Edureka Meetup community for 100+ Free Webinars each month JOIN MEETUP GROUP