Type mismatch error in scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions.{col, lit, when}
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.expressions.Window

object accounts_ver_creation  
def main(args: Array[String])  
val conf = new SparkConf()  
val sc=new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val high_date = "9999-12-31 00:00:00"
val df_source_main = sqlContext.sql("SELECT * FROM sqoop.accounts")
val df_source_aud = sqlContext.sql("SELECT * FROM sqoop.accounts_aud")
val df_u = df_source_aud.filter(col("action_code") === "U")
val df_d = df_source_aud.filter(col("action_code") === "D")
val raw_column_names = df_source_main.columns
val column_names = raw_column_names  ++ Array("is_current", "is_updated", "is_deleted", "start_date", "enddate","tot")
val res = ddf_source_main.withColumn("tot",df_source_main.groupBy("account_number").agg(count("account_number"))).withColumn("ver",df_source_aud.groupBy("account_number").agg(count("account_number"))).select(column_names.map(col): _*)

val df_target_main = df_source_main.withColumn("start_date", df_source_main("update_date")).withColumn("enddate", lit(high_date)).withColumn("is_current", lit(1)).withColumn("is_updated", lit(false)).withColumn("is_deleted", lit(false)).select(column_names.map(col): _*)
val df_target_u = df_d.withColumn("enddate", df_u("end_date")).withColumn("start_date", df_u("create_date")).withColumn("is_current", lit(0)).withColumn("is_deleted", lit(false)).select(column_names.map(col): _*)
val df_target_d = df_u.withColumn("enddate", df_d("end_update")).withColumn("start_date", df_d("update_date")).withColumn("is_current", lit(0)).withColumn("is_deleted", lit(true)).select(column_names.map(col): _*)
val df_merge = df_target_main.unionAll(df_target_u).unionAll(df_target_d)  
val df_merge_final = df_merge.withColumn("version_key", df_source_main.groupBy("account_number").agg(count("account_number").as("total")).join(df_source_aud.groupBy("account_number").agg(count("account_number").as("version_key")), Seq("account_number"), "left").na.fill(0).orderBy(col("version_key").desc))
sqlContext.sql("DROP TABLE IF EXISTS sqoop.accounts_ver")
df_merge_final.write.format("orc").option("path", "/user/hive/warehouse/sqoop.db/accounts_ver").mode("overwrite").saveAsTable("sqoop.accounts_ver")

Error Message:

<console>:240: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
 required: org.apache.spark.sql.Column
Which line are you getting this error on? The error tells at some point in the code you are trying to use the Dataframe but you have to use a Column

Your problem is here:

val df_merge_final = df_merge
.withColumn("version_key", df_source_main.groupBy("account_number").agg(count("account_number").as("total")).join(df_source_aud.groupBy("account_number").agg(count("account_number").as("version_key")), Seq("account_number"), "left").na.fill(0).orderBy(col("version_key").desc))

You are trying to put an entire dataframe into a column, this is not allowed. If you want to get last version key you need to process that dataframe and collect the last row and after that put the value into the column.Tell me if you want an example.

Also: Try to use aliases, your code it's going to look prettier 

Hope this helps!

