编写Spark SQL查询程序
首先在maven项目的pom.xml中添加Spark SQL的依赖
org.apache.spark
spark-sql_2.10
1.5.2
通过反射推断Schema
val sc:SparkContext //定义一个SparkContext类型的常量sc,SparkContext是Spark中提交作业的唯一通道
val sqlContext = new SqlContext(sc)//根据sc new一个SqlContext对象,该对象是处理SparkSQL的
import sqlContext._ //引入sqlContext中的所有方法,这些方法是处理SQL语句的基础
case class Person(name:String,age:String)//定义一个Person类,case class是后面数据能够生产SchemaRDD的关键
val people:RDD[Person] = sc.textFile(“people.txt”).map(_.split(“,”)).map(p => Person(p(0),p(1).toInt))//定义一个RDD数组,类型为Person,从people.txt文件中读取数据生成RDD,根据,进行split之后进行map操作,将每一行记录都生成对应的Person对象
people.registerAsTable(“people”)//将得到的RDD数组注册为表“people”
val teenagers = sql(“select name from people where age >= 10 && age <= 19”)//定义要执行的sql语句
teenagers.map(t => “Name:” + t(0)).collect().foreach(println)//循环打印出teenagers中的每个对象的名字
创建一个object为cn.itcast.spark.sql.InferringSchema
package cn.itcast.spark.sql
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
object InferringSchema {
def main(args: Array[String]) {
//创建SparkConf()并设置App名称 val conf = new SparkConf().setAppName("SQL-1") //SQLContext要依赖SparkContext val sc = new SparkContext(conf) //创建SQLContext val sqlContext = new SQLContext(sc) //从指定的地址创建RDD val lineRDD = sc.textFile(args(0)).map(_.split(" ")) //创建case class //将RDD和case class关联 val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt)) //导入隐式转换,如果不到人无法将RDD转换成DataFrame //将RDD转换成DataFrame import sqlContext.implicits._ val personDF = personRDD.toDF //注册表 personDF.registerTempTable("t_person") //传入SQL val df = sqlContext.sql("select * from t_person order by age desc limit 2") //将结果以JSON的方式存储到指定位置 df.write.json(args(1)) //停止Spark Context sc.stop()
12345678910111213141516171819202122232425}
}
//case class一定要放到外面
case class Person(id: Int, name: String, age: Int)
将程序打成jar包,上传到spark集群,提交Spark任务
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit
–class cn.itcast.spark.sql.InferringSchema
–master spark://cosa:7077
/root/spark-mvn-1.0-SNAPSHOT.jar
hdfs://cosa:9000/person.txt
hdfs://cosa:9000/out
查看运行结果
hdfs dfs -cat hdfs://cosa:9000/out/part-r-*
通过StructType直接指定Schema
创建一个object为cn.itcast.spark.sql.SpecifyingSchema
package cn.itcast.spark.sql
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types._
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by ZX on 2015/12/11.
*/
object SpecifyingSchema {
def main(args: Array[String]) {
//创建SparkConf()并设置App名称
val conf = new SparkConf().setAppName(“SQL-2”)
//SQLContext要依赖SparkContext
val sc = new SparkContext(conf)
//创建SQLContext
val sqlContext = new SQLContext(sc)
//从指定的地址创建RDD
val personRDD = sc.textFile(args(0)).map(_.split(” “))
//通过StructType直接指定每个字段的schema
val schema = StructType(
List(
StructField(“id”, IntegerType, true),
StructField(“name”, StringType, true),
StructField(“age”, IntegerType, true)
)
)
//将RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))
//将schema信息应用到rowRDD上
val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//注册表
personDataFrame.registerTempTable(“t_person”)
//执行SQL
val df = sqlContext.sql(“select * from t_person order by age desc limit 4”)
//将结果以JSON的方式存储到指定位置
df.write.json(args(1))
//停止Spark Context
sc.stop()
}
}
将程序打成jar包,上传到spark集群,提交Spark任务
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit
–class cn.itcast.spark.sql.InferringSchema
–master spark://cosa:7077
/root/spark-mvn-1.0-SNAPSHOT.jar
hdfs://cosa:9000/person.txt
hdfs://cosa:9000/out1
查看结果
hdfs dfs -cat hdfs://cosa:9000/out1/part-r-*
相关知识
使用 SQL 查询编辑器进行查询
SQL语言艺术
分布式名词收集(二)
【转】SQL语言艺术
Spark SQL实验:鸢尾花、影评数据集分析存储
【SQL】已解决:SQL错误(208):对象名‘STRING
《sql 语言艺术》 概要
plsql连接oracle模糊查询中文不成功
全文搜索 (SQL Server)
出现“this is incompatible with sql
网址: 编写Spark SQL查询程序 https://www.huajiangbk.com/newsview1238901.html
上一篇: 甜菜种根收获及贮藏技术 |
下一篇: 燕麦什么时候收获?燕麦的储存方法 |
推荐分享

- 1君子兰什么品种最名贵 十大名 4012
- 2世界上最名贵的10种兰花图片 3364
- 3花圈挽联怎么写? 3286
- 4迷信说家里不能放假花 家里摆 1878
- 5香山红叶什么时候红 1493
- 6花的意思,花的解释,花的拼音 1210
- 7教师节送什么花最合适 1167
- 8勿忘我花图片 1103
- 9橄榄枝的象征意义 1093
- 10洛阳的市花 1039