Type mismatch error in scala

+2 votes
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions.{col, lit, when}
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.expressions.Window

object accounts_ver_creation  
{  
def main(args: Array[String])  
{  
val conf = new SparkConf()  
val sc=new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val high_date = "9999-12-31 00:00:00"
val df_source_main = sqlContext.sql("SELECT * FROM sqoop.accounts")
val df_source_aud = sqlContext.sql("SELECT * FROM sqoop.accounts_aud")
val df_u = df_source_aud.filter(col("action_code") === "U")
val df_d = df_source_aud.filter(col("action_code") === "D")
val raw_column_names = df_source_main.columns
val column_names = raw_column_names  ++ Array("is_current", "is_updated", "is_deleted", "start_date", "enddate","tot")
val res = ddf_source_main.withColumn("tot",df_source_main.groupBy("account_number").agg(count("account_number"))).withColumn("ver",df_source_aud.groupBy("account_number").agg(count("account_number"))).select(column_names.map(col): _*)

val df_target_main = df_source_main.withColumn("start_date", df_source_main("update_date")).withColumn("enddate", lit(high_date)).withColumn("is_current", lit(1)).withColumn("is_updated", lit(false)).withColumn("is_deleted", lit(false)).select(column_names.map(col): _*)
val df_target_u = df_d.withColumn("enddate", df_u("end_date")).withColumn("start_date", df_u("create_date")).withColumn("is_current", lit(0)).withColumn("is_deleted", lit(false)).select(column_names.map(col): _*)
val df_target_d = df_u.withColumn("enddate", df_d("end_update")).withColumn("start_date", df_d("update_date")).withColumn("is_current", lit(0)).withColumn("is_deleted", lit(true)).select(column_names.map(col): _*)
val df_merge = df_target_main.unionAll(df_target_u).unionAll(df_target_d)  
val df_merge_final = df_merge.withColumn("version_key", df_source_main.groupBy("account_number").agg(count("account_number").as("total")).join(df_source_aud.groupBy("account_number").agg(count("account_number").as("version_key")), Seq("account_number"), "left").na.fill(0).orderBy(col("version_key").desc))
sqlContext.sql("DROP TABLE IF EXISTS sqoop.accounts_ver")
df_merge_final.write.format("orc").option("path", "/user/hive/warehouse/sqoop.db/accounts_ver").mode("overwrite").saveAsTable("sqoop.accounts_ver")
}
}

Error Message:

<console>:240: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
 required: org.apache.spark.sql.Column
Aug 16, 2019 in Apache Spark by anonymous
1,815 views
Which line are you getting this error on? The error tells at some point in the code you are trying to use the Dataframe but you have to use a Column

1 answer to this question.

+2 votes

Hello,

Your problem is here:

val df_merge_final = df_merge
.withColumn("version_key", df_source_main.groupBy("account_number").agg(count("account_number").as("total")).join(df_source_aud.groupBy("account_number").agg(count("account_number").as("version_key")), Seq("account_number"), "left").na.fill(0).orderBy(col("version_key").desc))

You are trying to put an entire dataframe into a column, this is not allowed. If you want to get last version key you need to process that dataframe and collect the last row and after that put the value into the column.Tell me if you want an example.

Also: Try to use aliases, your code it's going to look prettier 

answered Dec 13, 2019 by Alexandru
• 510 points

Related Questions In Apache Spark

0 votes
1 answer
0 votes
1 answer

Getting error while connecting zookeeper in Kafka - Spark Streaming integration

I guess you need provide this kafka.bootstrap.servers ...READ MORE

answered May 24, 2018 in Apache Spark by Shubham
• 13,450 points
1,285 views
0 votes
1 answer
0 votes
1 answer

Error reading avro dataset in spark

For avro, you need to download and ...READ MORE

answered Feb 4, 2019 in Apache Spark by Omkar
• 69,030 points
771 views
+1 vote
2 answers
+1 vote
1 answer

Hadoop Mapreduce word count Program

Firstly you need to understand the concept ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 10,950 points
5,884 views
0 votes
1 answer

hadoop.mapred vs hadoop.mapreduce?

org.apache.hadoop.mapred is the Old API  org.apache.hadoop.mapreduce is the ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 10,950 points
885 views
+1 vote
11 answers

hadoop fs -put command?

put syntax: put <localSrc> <dest> copy syntax: copyF ...READ MORE

answered Dec 7, 2018 in Big Data Hadoop by Aditya
37,130 views
+1 vote
1 answer

Cannot resolve Error In Spark when filter records with two where condition

Try df.where($"cola".isNotNull && $"cola" =!= "" && !$"colb".isin(2,3)) your ...READ MORE

answered Dec 13, 2019 in Apache Spark by Alexandru
• 510 points

edited Dec 13, 2019 by Alexandru 701 views