Saturday, January 4, 2020

Basic Transformations with Apache Spark

Being an automation developer, it wasn't easy for me to enter into the Big Data domain. Theoretical concepts are never enough. So, for hands-on, I tried different simple and little complex problems in Apache Spark. I have had worked using Spark 1.6 and Spark 2.4 versions. I haven't tested the Spark 3.0 version yet.

So, in this blog, I will cover all the operations practiced by me. Also, some real-time project scenarios will be covered experienced by me. I won't be covering theory concepts about RDD and DATAFRAMES(I am assuming you already know about these concepts). Below is the list of the libraries which I used while working on these scenarios:
1) Apache Spark 1.6
2) Scala 2.11.8
3) Apache Spark 2.4 

I, personally never prefer RDDS API since they are slower than Spark SQL API but, you should know everything about the technology you learn. Also, the other reason which I feel is, coding using RDDS is always difficult than DATAFRAME or DATASETS.

Since spark supports HQL(Hive Query Language), I will be providing hive solutions as well for some of the questions.

I have used a spark-shell to run all the queries. If you have to practice in IDE then you will have to configure spark with that IDE and then create a spark variable before using it.

Also, subqueries in spark is not an easy process. So, in the latter part, I will cover one example of this scenario.

Download all the datasets using this GitHub link. 

Now, let's start with the loading of different datasets using RDD and DATAFRAMES.

Using Spark 1.6 (RDD):



//For Department 
1) case class department(dep_id:java.lang.Long,dep_name:String,dep_location:String)

2)     val rd1=sc.textFile("file:////home/cloudera/Desktop/Employees/department_data.txt").filter(p=>{p!=null && p.trim.length>0})

3) val departmentDf = rd1.map(_.split(",")).map(p=>department(if(p(0).length > 0 ) p(0).toLong else 0L,p(1),p(2))).toDF()

4) departmentDf.show()

//For Salary Grade Table
1) case class salary_grade(grade:java.lang.Integer,min_salary:java.lang.Integer,max_salary:java.lang.Integer)

2) val rd2=sc.textFile("file:////home/cloudera/Desktop/Employees/salary_grade_data.txt").filter(p=>{p!=null && p.trim.length>0})

3) val salaryGradeDf=rd2.map(_.split(",")).map(p=>salary_grade(if(p(0).length > 0)p(0).toInt else 0,if(p(1).length>0)p(1).toInt else 0,if(p(2).length>0)p(2).toInt else 0)).toDF()
4) salaryGradeDf.show()

//For employees table
1) case class employees(emp_id:java.lang.Long,emp_name:String, job_name:String,manager_id:java.lang.Long,hire_date:String,salary:java.lang.Double,commision:java.lang.Double,dep_id:java.lang.Long)

2) val rdd=sc.textFile("file:////home/cloudera/Desktop/Employees/employees.txt").filter(p=>{p!=null && p.trim.length>0})

3) val employeesDf=rdd.map(_.split(",")).map(p=>employees(if(p(0).length>0)p(0).toLong else 0L,p(1),p(2),if(p(3).length>0) p(3).toLong else 0L,p(4),if(p(5).length>0) p(5).toDouble else 0D, if(p(6).length>0) p(6).toDouble else 0D,if(p(7).length>0)p(7).toLong else 0L)).toDF()



The reason why I created case class and defined data types for each of the variables is if your data has any null values it will be stored initially because when you print schema you can see the property as nullable:true which means that if your initial data has null values then it won't throw any exceptions. You can handle this part later on.

Using Spark 2.4 version (DATAFRAME):

1) departmentDf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("path of your dataset")
2) employeesDf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("path of your dataset")
3) salaryGradeDf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("path of your dataset")

"inferSchema" property will identify data type for each column and will store data accordingly. If you don't use this property while loading the data, all the columns will be treated as strings then you might have to change the data type of columns based on your requirements.

Once the datasets are loaded try displaying each of them. If there is a problem while displaying it then you might have to go back and look at your code. Let's start with the questions:

Write a query to list the emp_name and salary is increased by 15% and expressed as no.of Dollars:
-import org.apache.spark.sql.functions.{concat, lit}
RDD:
employeesRdd.map(_.split(",")).map(a=>(a(5)+" $")).foreach(println)

DATAFRAME:


employeesDf.select(concat(col("salary")*1.15, lit("$ ")).alias("Salary")).show()



Write a query to produce the output of employees as follows:
Output:

Employee
JONAS(manager)



RDD:


employeesRdd.map(_.split(",")).map(a=>(a(1)+a(2).toLowerCase)).foreach(println)


DATAFRAME:
employeesDf.select(concat(col("emp_name"), lit("("),col("job_name"),lit(")")).alias("Employee Name with designation")).show()

HIVE:
SELECT concat(emp_name , (lower(job_name)) ) FROM employees;

Write a query to list the employees with Hire date in the format like February 22, 1991:

DATAFRAME:

employeesDf.select("*").withColumn("Formatted Date", date_format(col("hire_date"), "MMMMM dd,YYYY")).drop("hire_date").
withColumnRenamed("hire_date", "Formatted Date").show()

HIVE:
select emp_name,date_format(hire_date,'MMMMM dd, YYYY') from employees;

Write a query to count the no. of characters without considering the spaces for each name:

DATAFRAME:
employeesDf.select(length(trim(col("emp_name"))).alias("Name")).show()

RDD:
scala> employeesRdd.map(_.split(",")).map(a=>(a(1).trim.length)).foreach(println)
HIVE:
hive> select length(trim(emp_name)) from employees;

Write a query to display the unique department with jobs:

DATAFRAME:
employeesDf.select($"job_name",$"dep_id").distinct.show()


RDD:

employeesRdd.map(_.split(",")).map(a=>(a(1).distinct,a(2).distinct)).foreach(println)

Write a query to list the employees who does not belong to department 2001:

RDD:
employeesRdd.map(_.split(",")).filter(a=>(!a(7).contains("2001"))).collect

DATAFRAME:
finalEmployeesDf.filter($"dep_id"!==2001).select("*").show()

Write a query to list the employees who joined before 1991:

DATAFRAME:
finalEmployeesDf.filter(year($"hire_date").lt(1991)).select("*").show()

Write a query to display the average salaries of all the employees who work as ANALYST:

DATAFRAME:
finalEmployeesDf.groupBy($"job_name").agg(avg($"salary")).
where($"job_name"==="ANALYST").show()

Write a query to display all the details of the employees whose commission is more than their salary:

DATAFRAME:
employeesDf.filter(col("salary")*1.25 > 3000).select(col("emp_id"),col("emp_name"),col("job_name"),
col("manager_id"),col("hire_date"),col("salary"),(col("salary")*1.25).
alias("Modified Salary"),col("commision"),col("dep_id")).show()

HIVE:
select emp_name,salary,(salary*1.25) as (Modified_Salary) from Employees where (salary*1.25) > '3000'

Write a query to list the employees who joined in the month of January:

DATAFRAME:
employeesDf.filter(date_format(col("hire_date"),"MMMMM")==="January").
select("*").show(false)

HIVE:
select * from Employees where date_format(hire_date,'MM')=01;

List the name of employees and their manager separated by the string 'works for':

DATAFRAME:

employeesDf.as("df1").join(finalEmployeesDf.as("df2"), $"df1.emp_id" === $"df2.manager_id").
select(concat($"df1.emp_name",lit(" works for "),$"df2.emp_name").
alias( "WORKS FOR COLUMN")).show(false) 

HIVE:

SELECT concat(e.emp_name, ' works for ' , m.emp_name) as (WORKS_FOR) FROM employees e,employees m WHERE e.manager_id = m.emp_id;

List the employees whose experience is more than 10 years:

DATAFRAME:
employeesDf.filter(year(current_timestamp())-year(col("hire_date")) > 10).select("*").show()

HIVE:
select * from Employees where (YEAR(current_timestamp())-year(hire_date)) > 10

Write a query to list those employees whose salary contain only 3 digits:

DATAFRAME:

employeesDf.filter(col("salary").between(100,999)).select("*").show()

HIVE:
select * from Employees where salary between 100 and 999


List the ID, name, salary, and job_name of the employees for -
1. Annual salary is below 34000 but receiving some commission which should not be more than the salary,

2. And the designation is SALESMAN and working for department 3001.

DATAFRAME:
employeesDf.filter(($"salary"+$"commision")*12 < 34000 && col("commision").isNotNull &&
($"commision" < $"salary") &&
col("job_name")==="SALESMAN" && col("dep_id")===3001).select("*").show()

HIVE:
SELECT * FROM employees WHERE 12*(salary+commision) < 34000 AND commision IS NOT NULL AND commision < salary AND job_name = 'SALESMAN' AND dep_id = 3001;


Query to list the employees who have joined on the following dates 1st May,20th Feb, and 03rd Dec in the year 1991:

DATAFRAME:
employeesDf.filter(year(col("hire_date"))===1991 && (date_format($"hire_date","dd")===01) ||
date_format($"hire_date","dd")===20 || date_format($"hire_date","dd")===03).show()

Using ISIN operator:

Val year=Seq(“1991”)
Val dates=Seq(“01”,”03”,”20”)

employeesDf.filter(date_format(col("hire_date"),"yyyy").isin(year:_*) &&
date_format(col("hire_date"),"dd").isin(dates:_*)).show()

HIVE:
select * from employees where date_format(hire_date,'yyyy') in (1991) AND (date_format(hire_date,'dd')==01 OR date_format(hire_date,'dd' )==03 OR date_format(hire_date,'dd')==20);

Using ISIN operator:
select * from employees where date_format(hire_date,'yyyy') in ('1991')
AND date_format(hire_date,'dd') IN ('01','03','20');

List the name, job name, salary, grade and department name of employees except
CLERK and sort result set on the basis of the highest salary:

DATAFRAME:
employeesDf.as("df1").join(depDf.as("df2"),$"df1.dep_id"===$"df2.dep_id").join(gradeDf.as("df3")).
filter(($"df1.salary" between($"df3.min_sal",$"df3.max_sal")) && ($"df1.job_name" !=="CLERK")).
orderBy($"df1.salary".desc).show() 

List the names of those employees starting with 'A' and with six characters in length:

DATAFRAME:
employeesDf.filter($"emp_name".like("A%") && length(trim($"emp_name"))===6).show()

HIVE:
select * from employees where emp_name like 'A%' and length(trim(emp_name))=6");

Write a query to list the department where at least two employees are working

DATAFRAME:
employeesDf.groupBy($"dep_id").agg(count("*").alias("cnt")).filter($"cnt" > 2).select($"dep_id",$"cnt").show()

HIVE:
SELECT dep_id,count(*) FROM employees GROUP BY dep_id HAVING count(*) >= 2;

List the name of departments where at least 2 employees are working in that department

DATAFRAME:
employeesDf.as("df1").join(depDf.as("df2"), $"df1.dep_id" === $"df2.dep_id").
select($"dep_name").groupBy($"dep_name").agg(count($"dep_name").as("cnt")).
filter($"cnt" < 6).show()

HIVE:
SELECT d.dep_name,count(*) FROM employees e,department d WHERE e.dep_id = d.dep_id GROUP BY d.dep_name HAVING count(*) >= 2;


So, I hope with these examples you will be in better shape to solve more problems on your own. Below are some examples that are commonly asked in the interviews:

To find the nth max or min salary : ( here we are finding 2nd max salary)


sqlContext.sql("select * from  (SELECT dep_name,salary,DENSE_RANK()
over(ORDER BY salary desc) as rank FROM department) as A where rank = 2 ").show

To find a minimum salary just order by salary and use rank =1.

RDD:


val rdd=sc.textFile(“file:///home/cloudera/Desktop/dep_ex.txt”) //Loading data


//Selecting specific salary column
scala> val salary=rdd.map(_.split(",")).map(p=>(p(2).toLong)).collect
salary: Array[Long] = Array(20000, 30000, 4000, 20000, 30000, 40000, 25000)


// Sorting it in descending order . 
scala> val maxSortedList=salary.distinct.sortBy(p=>(-p.toLong)) //Negation is put for desc 
maxSortedList: Array[Long] = Array(40000, 30000, 25000, 20000, 4000)


//Printing max 
scala> maxSortedList.take(1).foreach(println)
40000


scala> val secondHighestSalary=maxSortedList.zipWithIndex.filter(p=>p._2==1)
//Where 1 is the index of element (since indexes start from 0) where the second highest salary is
used as well as we use map function to remove 1 from the resulting array 

Note: zipWithIndex will pair elements with indices like (40000,0),(30000,1) and
so on that's where p._2 stands for indices and when we compare with the right-hand value like 1
to get a second highest salary, a value associated with 1 is returned like here its 30000

Dense_rank,rank and row_number :


scala> sqlContext.sql("SELECT dep_name,salary,DENSE_RANK() over(ORDER BY salary desc) as
dense_rank,row_number() over(order by salary desc) as row_number,rank() over(order by salary desc) as rank FROM department").show()

SUBQUERIES:
Write a query in Spark Dataframe to display all the details of managers:

val managerIdDf=employeesDf.filter($"manager_id"!==0).select($"manager_id").distinct // Selecting manager id's 

val seqDf=managerIdDf.rdd.map(r => r(0).asInstanceOf[Long]).collect.toList  // Converting dataframe to List since we have to pass it
in isin operator

employeesDf.filter($"emp_id".isin(seqDf: _*)).select("*").show() // displaying data 

I would like to add a few scenarios of the user's choice. If you face any difficulties or have any doubts regarding any of the concepts or code or something new then please put comments. I would be happy to solve them.

I can learn it from you!