You have to install Intellij with scala plugin. Add sdk for scala when it asks and add jdk path.
File New --> Project
Click on Next
Now give a name and create
Now copy paste the code here and save
Now right click on the scala file ==> Run As "file name"
As of now, we do not have any proper document for jar export in IntelliJ, but you can follow the below link for more information.
https://medium.com/@mrpowers/creating-a-spark-project-with-sbt-intellij-sbt-spark-package-and-friends-cc9108751c28
You can refer to the below screenshots to know about how we have done using Eclipse.
Code
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.Window
//import org.apache.spark.sql.hive.HiveContext
case class EmpHeader(Emp_Id:Int, First_Name:String, Last_Name:String, Dept_Id:Int, Salary: Int)
case class DeptHeader(Dept_Id:Int, Dept_Name:String)
object sql1 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
//val hc = new HiveContext(sc)
import sqlContext. implicits._
val EmpFile = sc.textFile("file:///home/edureka/Desktop/all-files/datsets/emp.txt")
val DeptFile = sc.textFile("file:///home/edureka/Desktop/all-files/datsets/dept.txt")
val EmpDF = EmpFile.map(x=>x.split(",")).map(x=> EmpHeader(x(0).toInt,x(1),x(2),x(3).toInt,x(4).toInt)).toDF //EmpRdd.printSchema
val DeptDF = EmpFile.map(x=>x.split(",")).map(x=> DeptHeader(x(0).toInt,x(1))).toDF
EmpDF.registerTempTable("Employee")
DeptDF.registerTempTable("Department")
//01//max_sal
sqlContext.sql("SELECT * from Employee").groupBy($"Dept_Id").agg(max("Salary").alias("max_solution")).show()
//02//rank_sal
//sqlContext.sql(" SELECT * row_number() OVER(PARTITION BY Emp_Id ORDER BY Salary DESC) AS Rank from Employee").show
//sqlContext.sql("SELECT First_Name, RANK() OVER (ORDER BY Salary) AS rank FROM Employee").show
EmpDF.orderBy(asc("Salary")).show
}}
=====================================================================================
-----------------build.sbt-------------------------
name := "Sql spark"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.2"
----------------------------------------
=================================Eclipse============================================
=================================spark submit===========================
Build with sbt package
syntax
Please find attached project folder,
Please follow correct folder structure and do sbt package to build or create the jar file required for spark-submit
Project folder -> { [ src -> main -> scala source code.scala ] | [ simple.sbt ] }
Go to terminal -> cd to project folder -> do sbt package
syntax
spark-submit --class <class/object name> <complete jar path>
For example (give exact jar path as per your project)
spark-submit --class sql1 file:///home/edureka/Desktop/all-files/codes/sibanisbt/target/scala-2.10/sql-spark_2.10-1.0.jar
=================Output======================
17/11/02 07:33:39 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
17/11/02 07:33:39 INFO scheduler.DAGScheduler: ResultStage 3 (show at sql1.scala:33) finished in 3.917 s
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Job 1 finished: show at sql1.scala:33, took 3.986554 s
+-------+------------+
|Dept_Id|max_solution|
+-------+------------+
| 1| 30000|
| 2| 40000|
| 3| 10000|
+-------+------------+
17/11/02 07:33:39 INFO spark.SparkContext: Starting job: show at sql1.scala:37
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Got job 2 (show at sql1.scala:37) with 2 output partitions
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Final stage: ResultStage 4(show at sql1.scala:37)
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Missing parents: List()
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[18] at show at sql1.scala:37), which has no missing parents
17/11/02 07:33:39 INFO storage.MemoryStore: ensureFreeSpace(5176) called with curMem=257917, maxMem=560497950
17/11/02 07:33:39 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 5.1 KB, free 534.3 MB)
17/11/02 07:33:39 INFO storage.MemoryStore: ensureFreeSpace(2827) called with curMem=263093, maxMem=560497950
17/11/02 07:33:39 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.8 KB, free 534.3 MB)
17/11/02 07:33:39 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:60184 (size: 2.8 KB, free: 534.5 MB)
7/11/02 07:33:39 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:861
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 4 (MapPartitionsRDD[18] at show at sql1.scala:37)
17/11/02 07:33:39 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 2 tasks
17/11/02 07:33:39 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 202, localhost, PROCESS_LOCAL, 2221 bytes)
17/11/02 07:33:39 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 203, localhost, PROCESS_LOCAL, 2221 bytes)
17/11/02 07:33:39 INFO executor.Executor: Running task 0.0 in stage 4.0 (TID 202)
17/11/02 07:33:39 INFO executor.Executor: Running task 1.0 in stage 4.0 (TID 203)
17/11/02 07:33:39 INFO rdd.HadoopRDD: Input split: file:/home/edureka/Desktop/all-files/datsets/emp.txt:35+35
17/11/02 07:33:39 INFO rdd.HadoopRDD: Input split: file:/home/edureka/Desktop/all-files/datsets/emp.txt:0+35
17/11/02 07:33:39 INFO executor.Executor: Finished task 0.0 in stage 4.0 (TID 202). 3556 bytes result sent to driver
17/11/02 07:33:39 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 202) in 50 ms on localhost (1/2)
17/11/02 07:33:39 INFO executor.Executor: Finished task 1.0 in stage 4.0 (TID 203). 3476 bytes result sent to driver
17/11/02 07:33:39 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 4.0 (TID 203) in 52 ms on localhost (2/2)
17/11/02 07:33:39 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
17/11/02 07:33:39 INFO scheduler.DAGScheduler: ResultStage 4 (show at sql1.scala:37) finished in 0.043 s
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Job 2 finished: show at sql1.scala:37, took 0.063828 s
+------+----------+---------+-------+------+
|Emp_Id|First_Name|Last_Name|Dept_Id|Salary|
+------+----------+---------+-------+------+
| 3| Honey| Sing| 3| 10000|
| 1| Guru| Randhawa| 1| 30000|
| 2| Sharry| Mann| 2| 40000|
+------+----------+---------+-------+------+
17/11/02 07:33:39 INFO spark.SparkContext: Invoking stop() from shutdown hook
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/sql,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/execution,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/json,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
Hope it helps!
If you want to know more about Apache Spark Scala, It's highly recommended to go for Apache Spark certification course today.
Thanks!!