Spark SQL DataFramesΒΆ

SQLContextΒΆ

spark-shell
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

employee.json

	{"id" : "1201", "name" : "satish", "age" : "25"},
	{"id" : "1202", "name" : "krishna", "age" : "28"},
	{"id" : "1203", "name" : "amith", "age" : "39"},
	{"id" : "1204", "name" : "javed", "age" : "23"},
	{"id" : "1205", "name" : "prudvi", "age" : "23"}

DataFrame OperationsΒΆ

scala> val dfs = spark.sqlContext.read.json("employee.json")
scala> dfs.show()
scala> dfs.printSchema()
scala> dfs.select("name").show()
scala> dfs.filter(dfs("age") > 23).show()
scala> dfs.groupBy("age").count().show()

Running SQL Queries ProgrammaticallyΒΆ

Methods Description
Inferring the Schema using Reflection This method uses reflection to generate the schema of an RDD that contains specific types of objects.
Programmatically Specifying the Schema The second method for creating DataFrame is through programmatic interface that allows you to construct a schema and then apply it to an existing RDD.

Inferring the Schema using ReflectionΒΆ

ExampleΒΆ

employee.txt

1201, satish, 25
1202, krishna, 28
1203, amith, 39
1204, javed, 23
1205, prudvi, 23

Start the Spark ShellΒΆ

$ spark-shell

Create SQLContextΒΆ

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Import SQL FunctionsΒΆ

scala> import sqlContext.implicits._

Create Case ClassΒΆ

scala> case class Employee(id: Int, name: String, age: Int)
defined class Employee

Create RDD and Apply TransformationsΒΆ

scala> val empl=sc.textFile("employee.txt").map(_.split(",")).map(e=> Employee(e(0).trim.toInt,e(1), e(2).trim.toInt)).toDF()

Output: empl: org.apache.spark.sql.DataFrame = [id: int, name: string … 1 more field]

Store the DataFrame Data in a TableΒΆ

scala> empl.registerTempTable("employee")

Select Query on DataFrameΒΆ

scala> val allrecords = sqlContext.sql("SELeCT * FROM employee")
scala> allrecords.show()

Where Clause SQL Query on DataFrameΒΆ

scala> val agefilter = sqlContext.sql("SELeCT * FROM employee WHERE age >=20 AND age <= 35")
scala> agefilter.show()

Fetch ID values from agefilter DataFrame using column indexΒΆ

scala> agefilter.map(t=>"ID: "+t(0)).collect().foreach(println)

Programmatically Specifying the SchemaΒΆ

ExampleΒΆ

employee.txt

Start the Spark ShellΒΆ

$ spark-shell

Create SQLContextΒΆ

scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)

Read Input from Text FileΒΆ

scala> val employee = sc.textFile("employee.txt")

Create an Encoded Schema in a String FormatΒΆ

scala> val schemaString = "id name age"

Import Respective APIsΒΆ

scala> import org.apache.spark.sql.Row;
scala> import org.apache.spark.sql.types.{StructType, StructField, StringType};

Generate SchemaΒΆ

scala> val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

Apply Transformation for Reading Data from Text FileΒΆ

scala> val rowRDD = employee.map(_.split(",")).map(e => Row(e(0).trim.toInt, e(1), e(2).trim.toInt))
scala> val rowRDD = employee.map(_.split(",")).map(e => Row(e(0).trim, e(1), e(2).trim))

Apply RowRDD in Row Data based on SchemaΒΆ

scala> val employeeDF = sqlContext.createDataFrame(rowRDD, schema)

Store DataFrame Data into TableΒΆ

scala> employeeDF.registerTempTable("employee")

Select Query on DataFrameΒΆ

scala> val allrecords = sqlContext.sql("SELECT * FROM employee")
scala> allrecords.show()