Primary keys in Apache Spark

0 votes
I am having a JDBC connection with Apache Spark and PostgreSQL and I want to insert some data into my database. When I use append mode I need to specify id for each DataFrame.Row. Is there any way for Spark to create primary keys?
Aug 9 in Apache Spark by nitinrawat895
• 10,490 points
17 views

1 answer to this question.

0 votes

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Extract schema for further usage:

val schema = df.schema

Add id field:

val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Create DataFrame:

val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

The same thing in Python:

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

If you prefer the consecutive number you can replace zipWithUniqueId with zipWithIndex but it is a little bit more expensive.

Directly with DataFrame API:

(universal Scala, Python, Java, R with pretty much the same syntax)

Previously I've missed monotonically increasing id function which should work just fine as long as you don't require consecutive numbers:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

While useful monotonically increasing id is non-deterministic. Not only ids may be different from execution to execution but without additional tricks cannot be used to identify rows when subsequent operations contain filters.

Note:

It is also possible to use the rowNumber window function:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

Unfortunately:

WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


So unless you have a natural way to partition your data and ensure uniqueness is not particularly useful at this moment.

answered Aug 9 by ravikiran
• 4,200 points

Related Questions In Apache Spark

+5 votes
11 answers

Concatenate columns in apache spark dataframe

its late but this how you can ...READ MORE

answered Mar 21 in Apache Spark by anonymous
25,496 views
0 votes
1 answer

cache tables in apache spark sql

Caching the tables puts the whole table ...READ MORE

answered May 4, 2018 in Apache Spark by Data_Nerd
• 2,360 points
613 views
0 votes
1 answer

Ways to create RDD in Apache Spark

There are two popular ways using which ...READ MORE

answered Jun 19, 2018 in Apache Spark by nitinrawat895
• 10,490 points
1,215 views
0 votes
7 answers

How to print the contents of RDD in Apache Spark?

Simple and easy: line.foreach(println) READ MORE

answered Dec 10, 2018 in Apache Spark by Kuber
8,191 views
0 votes
1 answer

What do we exactly mean by “Hadoop” – the definition of Hadoop?

The official definition of Apache Hadoop given ...READ MORE

answered Mar 16, 2018 in Big Data Hadoop by Shubham
170 views
+1 vote
1 answer
0 votes
3 answers

Can we run Spark without using Hadoop?

No, you can run spark without hadoop. ...READ MORE

answered May 7 in Big Data Hadoop by pradeep
135 views
0 votes
1 answer

Joining Multiple Spark Dataframes

You can run the below code to ...READ MORE

answered Mar 26, 2018 in Big Data Hadoop by Bharani
• 4,550 points
288 views
0 votes
1 answer

How do I turn off INFO Logging in Spark?

Execute this command in the spark directory: cp ...READ MORE

answered Jul 12 in Apache Spark by ravikiran
• 4,200 points
52 views
0 votes
1 answer

How do I access the Map Task ID in Spark?

You can access task information using TaskContext: import org.apache.spark.TaskContext sc.parallelize(Seq[Int](), ...READ MORE

answered Jul 23 in Apache Spark by ravikiran
• 4,200 points
25 views