Spark SQL DataFrames

SQLContext

spark-shell
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

employee.json

	{"id" : "1201", "name" : "satish", "age" : "25"},
	{"id" : "1202", "name" : "krishna", "age" : "28"},
	{"id" : "1203", "name" : "amith", "age" : "39"},
	{"id" : "1204", "name" : "javed", "age" : "23"},
	{"id" : "1205", "name" : "prudvi", "age" : "23"}

DataFrame Operations

scala> val dfs = spark.sqlContext.read.json("employee.json")
scala> dfs.show()
scala> dfs.printSchema()
scala> dfs.select("name").show()
scala> dfs.filter(dfs("age") > 23).show()
scala> dfs.groupBy("age").count().show()

Running SQL Queries Programmatically

Methods Description
Inferring the Schema using Reflection This method uses reflection to generate the schema of an RDD that contains specific types of objects.
Programmatically Specifying the Schema The second method for creating DataFrame is through programmatic interface that allows you to construct a schema and then apply it to an existing RDD.

Inferring the Schema using Reflection

Example

employee.txt

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

Start the Spark Shell

$ spark-shell

Create SQLContext

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Import SQL Functions

scala> import sqlContext.implicits._

Create Case Class

scala> case class Employee(id: Int, name: String, age: Int)
defined class Employee

Create RDD and Apply Transformations

scala> val empl=sc.textFile("employee.txt").map(_.split(",")).map(e=> Employee(e(0).trim.toInt,e(1), e(2).trim.toInt)).toDF()

Output: empl: org.apache.spark.sql.DataFrame = [id: int, name: string … 1 more field]

Store the DataFrame Data in a Table

scala> empl.registerTempTable("employee")

Select Query on DataFrame

scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")
scala> allrecords.show()

Where Clause SQL Query on DataFrame

scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE age >=20 AND age <= 35")
scala> agefilter.show()

Fetch ID values from agefilter DataFrame using column index

scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

Programmatically Specifying the Schema

Example

employee.txt

Start the Spark Shell

$ spark-shell

Create SQLContext

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Read Input from Text File

scala> val employee = sc.textFile("employee.txt")

Create an Encoded Schema in a String Format

scala> val schemaString = "id name age"

Import Respective APIs

scala> import org.apache.spark.sql.Row;
scala> import org.apache.spark.sql.types.{StructType, StructField, StringType};

Generate Schema

scala> val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

Apply Transformation for Reading Data from Text File

scala> val rowRDD = employee.map(_.split(",")).map(e => Row(e(0).trim.toInt, e(1), e(2).trim.toInt))
scala> val rowRDD = employee.map(_.split(",")).map(e => Row(e(0).trim, e(1), e(2).trim))

Apply RowRDD in Row Data based on Schema

scala> val employeeDF = sqlContext.createDataFrame(rowRDD, schema)

Store DataFrame Data into Table

scala> employeeDF.registerTempTable("employee")

Select Query on DataFrame

scala> val allrecords = sqlContext.sql("SELECT * FROM employee")
scala> allrecords.show()

References