How to create new column with function in Spark Dataframe?

0 votes

I'm trying to figure out the new dataframe API in Spark. I am facing an issue here that I have a dataframe with 2 columns, "ID" and "Amount". As a generic example, say I want to return a new column called "code" that returns a code based on the value of "Amt". I can write a function something like this:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}

When I try to use it like this:

val DF = sqlContext.parquetFile("hdfs://temp/file.parquet")

DF.withColumn("Code", coder(DF("Amt")))
I get type mismatch errors

found   : org.apache.spark.sql.Column
required: Integer

 

I've tried changing the input type on my function to org.apache.spark.sql.Column but I then I start getting errors with the function compiling because it wants a boolean in the if statement.

Am I doing this wrong? Is there another way to do this than using withColumn?

Thanks in advance.

Jun 26, 2018 in Apache Spark by coldcode
• 2,010 points
14,784 views

11 answers to this question.

0 votes

Let's say you have "Amt" column in your Schema:

import org.apache.spark.sql.functions._
val DF = sqlContext.parquetFile("hdfs://temp/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
DF.withColumn("Code", sqlfunc(col("Amt")))

I guess withColumn is the right way to add a column.

answered Jun 26, 2018 by nitinrawat895
• 10,110 points
0 votes
df = sqlContext.createDataFrame(
    [(1, "a", 25.0), (2, "B", -25.0)], ("c1", "c2", "c3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 1
   elif value == 2: return 2
   ...
   else: return 0

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("c1"))
answered Dec 7, 2018 by Rahul
0 votes
df.select('*', (df.column_name + 10).alias('new_column'))
answered Dec 7, 2018 by Raj
0 votes
new_col = []
for column in COLUMN_LIST:
    if column in df.columns:
        new_col.append(column)
    else:
        new_col.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(new_col)
answered Dec 7, 2018 by Lakheer
0 votes

You can do it using udf:

a = F.udf(lambda :yourstring,StringType())
a.select(a().alias('new_column')
answered Dec 7, 2018 by Goutam
0 votes
creator = udf(
    lambda val: val,
    StringType()
)
df.withColumn('new_col_name', creator(df.old_col))
answered Dec 7, 2018 by Manoj
0 votes
DF.withColumn("new_col", DF.col("old_col") + 10)
answered Dec 7, 2018 by Vinod
0 votes
import org.apache.spark.sql.functions.lit
    val addColumn :(String)=>String=(data:String)=>{data}
    val ColUDF= udf(addColumn)
     val output = inputDataFrame.withColumn("Name",ColUDF(lit("abcde")))
answered Dec 7, 2018 by Ashok
0 votes
Dataset<Row>newDs = ds.withColumn("new_col",functions.lit(1));

Source:https://www.tutorialkart.com/apache-spark/spark-add-new-column-to-dataset-example/​

answered Dec 7, 2018 by Lalit
0 votes
val df2 = dataFrame .withColumn("F", lit("foo")) .select("F", "A", "B", "C", "D", "E")
answered Dec 7, 2018 by Suman
+2 votes
val coder: (Int => String) = v => if (v > 100) "Big" else "Small"
import org.apache.spark.sql.functions.udf
val coder_udf = udf(coder)
DF.withColumn("Code", coder_udf( DF.col("Amt")))
answered Apr 4 by anonymous

edited Apr 5 by Omkar

Related Questions In Apache Spark

0 votes
1 answer
0 votes
1 answer

Changing Column position in spark dataframe

Yes, you can reorder the dataframe elements. You need ...READ MORE

answered Apr 19, 2018 in Apache Spark by Ashish
• 2,630 points
3,436 views
0 votes
3 answers

How to transpose Spark DataFrame?

Please check the below mentioned links for ...READ MORE

answered Dec 31, 2018 in Apache Spark by anonymous
4,252 views
0 votes
2 answers

In a Spark DataFrame how can I flatten the struct?

// Collect data from input avro file ...READ MORE

answered Jul 4 in Apache Spark by Dhara dhruve
561 views
0 votes
4 answers

How to change the spark Session configuration in Pyspark?

You can dynamically load properties. First create ...READ MORE

answered Dec 10, 2018 in Apache Spark by Vini
10,547 views
0 votes
1 answer

Ways to create RDD in Apache Spark

There are two popular ways using which ...READ MORE

answered Jun 19, 2018 in Apache Spark by nitinrawat895
• 10,110 points
1,013 views
0 votes
0 answers
0 votes
1 answer

How to convert rdd object to dataframe in spark

SqlContext has a number of createDataFrame methods ...READ MORE

answered May 30, 2018 in Apache Spark by nitinrawat895
• 10,110 points
1,033 views
0 votes
6 answers
0 votes
1 answer

Different Spark Ecosystem

Spark has various components: Spark SQL (Shark)- for ...READ MORE

answered Jun 4, 2018 in Apache Spark by kurt_cobain
• 9,240 points
44 views