combinebykey 《大数据处理技术Spark》--林子雨
combinebykey 《大数据处理技术Spark》--林子雨
从林子雨老师的网课上学到的东西,林老师讲的特别清晰,记录一下,防止忘记。
以下是资料的链接:
其他资料:
1. 概述
关系型数据库和非关系型数据库
五种主流的大数据计算框架:
大数据关键技术 分布式处理: 大数据计算模式:
代表性大数据技术之
Hive:会将sql语句转成底层的任务
:帮助选择主节点等
HBase:存储关系数据
Sqoop:完成从关系型数据库和数据之间的导入导出
:可视化的部署等等都归它管理
:可以将程序分发到不同的机器上(计算向数据靠拢)
- 缺点:任务必须要等待所有的map任务完成之后才能进行
- 缺点:每次数据都要写磁盘
Yarn:资源调度和管理框架,帮助调动底层cpu和内存资源用的
代表性大数据技术之Spark
最热门的主流技术。可以和兼容:可以读取HDFS,Hive/兼容;可以和noSQl兼容
spark克服了的操作的缺陷:
spark在的基础上的改进:
代表性大数据技术之Flink
flink性能好,为什么没有spark火?“既生瑜何生亮”+_+
代表性大数据技术之Beam
Beam提供了统一的编程接口起来,可以帮助转换成spark//flink
伪分布实例
伪分布式读取的则是 HDFS 上的数据。要使用 HDFS,首先需要在 HDFS 中创建用户目录:
hdfs dfs -mkdir -p /user/hadoop # 已经将hadoop中的bin加入到环境变量中
将本地的word.txt复制到分布式文件系统的/user//input中
hdfs dfs -mkdir input # 因为现在使用的是hadoop用户,因此可以使用相对路径
hdfs dfs -put ./word.txt input # put
hdfs dfs -ls input # 可以查看文件列表
将的运行结果取回到本地
hdfs dfs -get output ./output # get
cat ./output/* # cat查看
这里hdfs dfs可以换成:
2. scala 语法
写到了scala 中
5. RDD 5.1 RDD编程
rdd编程-林子雨老师
5.1.1 RDD创建 5.1.1.1 从文件系统中加载数据创建RDD 5.1.1.2 通过并行集合(数组)创建RDD 5.1.2 RDD操作 5.1.2.1 转换操作 5.1.2.2 行动操作 5.1.2.3 惰性机制
只有动作类型的操作才能真正触发计算
5.1.2.4 实例
找出文本文件中单行文本所包含的单词数量的最大值
val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines.map(line => line.split(" ").size).reduce((a,b) => if (a>b) a else b)
5.1.3 持久化
在Spark中,RDD采用惰性求值的机制combinebykey 《大数据处理技术Spark》--林子雨,每次遇到行动操作,都会从头开始执 行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计 算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。
()的圆括号中包含的是持久化级别参数:
val list = List("Hadoop","Spark","Hive")
val rdd = sc.parallelize(list)
rdd.cache()
println(rdd.count()) // 第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
5.1.4 分区
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。
为什么要分区?(1)增加并行度 (2)减少通信开销
手动设置分区:
自定义分区
import org.apache.spark.{Partitioner, SparkContext, SparkConf}
//自定义分区类,需继承Partitioner类
class UsridPartitioner(numParts:Int) extends Partitioner{
//覆盖分区数
override def numPartitions: Int = numParts
//覆盖分区号获取函数
override def getPartition(key: Any): Int = {
key.toString.toInt
}
}
object Test {
def main(args: Array[String]) {
val conf=new SparkConf()
val sc=new SparkContext(conf)
//模拟5个分区的数据
val data=sc.parallelize(1 to 10,5)
//根据尾号转变为10个分区,分写到10个文件
data.map((_,1)).partitionBy(new UsridPartitioner(10)).map(_._1).saveAsTextFile("/chenm/partition") //占位符 _
}
}
5.1.5 打印元素
概括下来是:
5.2 Pair RDD 5.2.1 Pair RDD的创建 第一种创建方式:从文件中加载第二种创建方式:通过并行集合(数组)创建RDD 5.2.2 常用的Pair RDD转换操作
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
keys:
pairRDD.keys
pairRDD.keys.foreach(println)
d1.collect
// res16: Array[(String, Int)] = Array((c,8), (b,25), (c,17), (a,42), (b,4), (d,9), (e,17), (c,2), (f,29), (g,21), (b,9))
d1.mapValues(_+1).collect
// res17: Array[(String, Int)] = Array((c,9), (b,26), (c,18), (a,43), (b,5), (d,10), (e,18), (c,3), (f,30), (g,22), (b,10))
5.2.3 一个综合实例
给定一组键值对(“spark”,2),(“”,6),(“”,4),(“spark”,6)combinebykey 《大数据处理技术Spark》--林子雨,键值 对的key表示图书名称,value表示某天图书销量combinebykey,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()
5.3 共享变量
广播变量( )
累加器()
5.4 数据读写 5.4.1 文件数据读写 5.4.1.1 本地文件系统的数据读写
val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")
textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback.txt")
5.4.1.2 分布式文件系统HDFS的数据读写
val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
// val textFile = sc.textFile("/user/hadoop/word.txt")
// val textFile = sc.textFile("word.txt")
textFile.first()
5.4.1.3 JSON文件的数据读写
import scala.util.parsing.json.JSON
val jsonStr = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
// jsonStr.foreach(println)
val result = jsonStrs.map(s => JSON.parseFull(s))
result.foreach( {r => r match {
case Some(map: Map[String, Any]) => println(map)
case None => println("Parsing failed")
case other => println("Unknown data structure: " + other)
剩下的部分直接参考林老师的ppt吧
5.4.2 读写HBase数据 5.5 程序解析 5.6 综合案例 6 Spark SQL 6.1 简介
hive on spark == Sharkcombinebykey,hive将SQL语句转为MR;Shark将SQL转为Spark的应用程序代码;Shark建立在hive上combinebykey,受限与hive,但是效率提升了10-100倍;MR是进程级别的并行,Shark是线程级别的并行,存在线程安全的保证,因此之后停止了更新Spark SQL。
spark SQL在兼容Hive基础上,只是借鉴了Hive的语法解析
6.2 和RDD区别
spark SQL采用的不是RDD,而是。是结构化的对象,查询效率更高。
6.3 的创建 6.4 从RDD到DF 6.4.1 利用反射机制推断RDD模式
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._ //导入包,支持把一个RDD隐式转换为一个DataFrame
case class Person(name: String, age: Long) // 定义一个case class
val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF.createOrReplaceTempView(“people”) // 必须注册为临时表才能供下面的查询使用
val personsRDD = spark.sql("select name,age from people where age > 20") // 最终生成一个DataFrame
personsRDD.map(t => “Name:”+t(0)+“,”+“Age:”+t(1)).show() // DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值
6.4.2 使用编程方式定义RDD模式
当无法提前定义case class时,就需要采用编程方式定义RDD模式
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//定义一个模式字符串
val schemaString = "name age"
//根据模式字符串生成模式
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
//从上面打印的信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
//对peopleRDD 这个RDD中的每一行元素都进行解析
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema)
//必须注册为临时表才能供下面查询使用
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT name,age FROM people")
results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()
6.4.3 把RDD保存成文件
val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.select("name","age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")
val textFile = sc.textFile("file:///usr/local/spark/mycode/newpeople.csv")
write.()支持输出 json,, jdbc, orc, , csv, text等格式文件
val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
df.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")
6.5 读取和保存数据 6.5.1 读写 6.5.2 通过JDBC连接数据库 安装MySQL及常用操作
mysql的jdbc驱动程序,下载地址
6.5.3 连接Hive读写数据 6.5.3.1 Hive简介和安装
《安装hive,并配置mysql作为元数据库》
6.5.3.2 让Spark包含Hive支持
测试spark版本是否支持Hive
import org.apache.spark.sql.hive.HiveContext
// 支持的输出:import org.apache.spark.sql.hive.HiveContext
6.5.3.3 在Hive中创建数据库和表 启动: start-all.sh(已经将的路径加入到环境变量中)启动Hive:hive, 添加数据表
// hive脚本下执行
create database if not exists sparktest;//创建数据库sparktest
show databases;
create table if not exists sparktest.student(id int,name string, gender string, age int);
use sparktest; //切换到sparktest
show tables; //显示sparktest数据库下面有哪些表
insert into student values(1,'Xueqian','F',23); //插入一条记录
insert into student values(2,'Weiliang','M',24); //再插入一条记录
select * from student; //显示student表中的记录
6.5.3.4 连接Hive读写数据 在spark-shell(包含Hive支持)中执行以下命令从Hive中读取数据
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
case class Record(key: Int, value: String)
val warehouseLocation = "spark-warehouse”
val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
import spark.implicits._
import spark.sql
sql("SELECT * FROM sparktest.student").show()
编写程序向Hive数据库的.表中插入两条数据
// 准备两条数据
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
// 设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
// 下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim,p(3).toInt))
// 建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
// 查看studentDF
studentDF.show()
// 注册临时表
studentDF.registerTempTable("tempTable")
// 插入
sql("insert into sparktest.student select * from tempTable")
- 随机文章
- 热门文章
- 热评文章