How to create new column with function in Spark Dataframe?

0 votes

I'm trying to figure out the new dataframe API in Spark. I am facing an issue here that I have a dataframe with 2 columns, "ID" and "Amount". As a generic example, say I want to return a new column called "code" that returns a code based on the value of "Amt". I can write a function something like this:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}

When I try to use it like this:

val DF = sqlContext.parquetFile("hdfs://temp/file.parquet")

DF.withColumn("Code", coder(DF("Amt")))
I get type mismatch errors

found   : org.apache.spark.sql.Column
required: Integer

 

I've tried changing the input type on my function to org.apache.spark.sql.Column but I then I start getting errors with the function compiling because it wants a boolean in the if statement.

Am I doing this wrong? Is there another way to do this than using withColumn?

Thanks in advance.

Jun 26, 2018 in Apache Spark by coldcode
• 2,020 points
24,466 views

11 answers to this question.

0 votes

Let's say you have "Amt" column in your Schema:

import org.apache.spark.sql.functions._
val DF = sqlContext.parquetFile("hdfs://temp/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
DF.withColumn("Code", sqlfunc(col("Amt")))

I guess withColumn is the right way to add a column.

answered Jun 26, 2018 by nitinrawat895
• 10,690 points
0 votes
df = sqlContext.createDataFrame(
    [(1, "a", 25.0), (2, "B", -25.0)], ("c1", "c2", "c3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 1
   elif value == 2: return 2
   ...
   else: return 0

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("c1"))
answered Dec 7, 2018 by Rahul
0 votes
df.select('*', (df.column_name + 10).alias('new_column'))
answered Dec 7, 2018 by Raj
0 votes
new_col = []
for column in COLUMN_LIST:
    if column in df.columns:
        new_col.append(column)
    else:
        new_col.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(new_col)
answered Dec 7, 2018 by Lakheer
0 votes

You can do it using udf:

a = F.udf(lambda :yourstring,StringType())
a.select(a().alias('new_column')
answered Dec 7, 2018 by Goutam
0 votes
creator = udf(
    lambda val: val,
    StringType()
)
df.withColumn('new_col_name', creator(df.old_col))
answered Dec 7, 2018 by Manoj
0 votes
DF.withColumn("new_col", DF.col("old_col") + 10)
answered Dec 7, 2018 by Vinod
0 votes
import org.apache.spark.sql.functions.lit
    val addColumn :(String)=>String=(data:String)=>{data}
    val ColUDF= udf(addColumn)
     val output = inputDataFrame.withColumn("Name",ColUDF(lit("abcde")))
answered Dec 7, 2018 by Ashok
0 votes
Dataset<Row>newDs = ds.withColumn("new_col",functions.lit(1));

Source:https://www.tutorialkart.com/apache-spark/spark-add-new-column-to-dataset-example/​

answered Dec 7, 2018 by Lalit
0 votes
val df2 = dataFrame .withColumn("F", lit("foo")) .select("F", "A", "B", "C", "D", "E")
answered Dec 7, 2018 by Suman
+2 votes
val coder: (Int => String) = v => if (v > 100) "Big" else "Small"
import org.apache.spark.sql.functions.udf
val coder_udf = udf(coder)
DF.withColumn("Code", coder_udf( DF.col("Amt")))
answered Apr 4 by anonymous

edited Apr 5 by Omkar

Related Questions In Apache Spark

0 votes
1 answer
0 votes
1 answer

How to work with Matrix Multiplication in Apache Spark?

Hey, You can follow this below solution for ...READ MORE

answered Jul 31 in Apache Spark by Gitika
• 25,340 points
414 views
0 votes
1 answer

what is Paired RDD and how to create paired RDD in Spark?

Hi, Paired RDD is a distributed collection of ...READ MORE

answered Aug 2 in Apache Spark by Gitika
• 25,340 points
574 views
0 votes
1 answer

How to create paired RDD using subString method in Spark?

Hi, If you have a file with id ...READ MORE

answered Aug 2 in Apache Spark by Gitika
• 25,340 points
57 views
0 votes
1 answer

Changing Column position in spark dataframe

Yes, you can reorder the dataframe elements. You need ...READ MORE

answered Apr 19, 2018 in Apache Spark by Ashish
• 2,630 points
4,675 views
0 votes
3 answers

How to transpose Spark DataFrame?

Please check the below mentioned links for ...READ MORE

answered Dec 31, 2018 in Apache Spark by anonymous
6,157 views
0 votes
1 answer
0 votes
1 answer

How to convert rdd object to dataframe in spark

SqlContext has a number of createDataFrame methods ...READ MORE

answered May 30, 2018 in Apache Spark by nitinrawat895
• 10,690 points
1,481 views
0 votes
6 answers
0 votes
1 answer

Different Spark Ecosystem

Spark has various components: Spark SQL (Shark)- for ...READ MORE

answered Jun 4, 2018 in Apache Spark by kurt_cobain
• 9,260 points
78 views