当前位置:首页 > 竞技风云 > 正文内容

combinebykey 《大数据处理技术Spark》--林子雨

admin22小时前竞技风云4

combinebykey 《大数据处理技术Spark》--林子雨

从林子雨老师的网课上学到的东西,林老师讲的特别清晰,记录一下,防止忘记。

以下是资料的链接:

其他资料:

1. 概述

关系型数据库和非关系型数据库

五种主流的大数据计算框架:

大数据关键技术 分布式处理: 大数据计算模式:

代表性大数据技术之

Hive:会将sql语句转成底层的任务

:帮助选择主节点等

HBase:存储关系数据

Sqoop:完成从关系型数据库和数据之间的导入导出

:可视化的部署等等都归它管理

:可以将程序分发到不同的机器上(计算向数据靠拢)

- 缺点:任务必须要等待所有的map任务完成之后才能进行

- 缺点:每次数据都要写磁盘

Yarn:资源调度和管理框架,帮助调动底层cpu和内存资源用的

代表性大数据技术之Spark

最热门的主流技术。可以和兼容:可以读取HDFS,Hive/兼容;可以和noSQl兼容

spark克服了的操作的缺陷:

spark在的基础上的改进:

代表性大数据技术之Flink

flink性能好,为什么没有spark火?“既生瑜何生亮”+_+

代表性大数据技术之Beam

Beam提供了统一的编程接口起来,可以帮助转换成spark//flink

伪分布实例

伪分布式读取的则是 HDFS 上的数据。要使用 HDFS,首先需要在 HDFS 中创建用户目录:

hdfs dfs -mkdir -p /user/hadoop  # 已经将hadoop中的bin加入到环境变量中

将本地的word.txt复制到分布式文件系统的/user//input中

hdfs dfs -mkdir input # 因为现在使用的是hadoop用户,因此可以使用相对路径
hdfs dfs -put ./word.txt input # put
hdfs dfs -ls input # 可以查看文件列表

将的运行结果取回到本地

hdfs dfs -get output ./output # get
cat ./output/* # cat查看

这里hdfs dfs可以换成:

2. scala 语法

写到了scala 中

5. RDD 5.1 RDD编程

rdd编程-林子雨老师

5.1.1 RDD创建 5.1.1.1 从文件系统中加载数据创建RDD 5.1.1.2 通过并行集合(数组)创建RDD 5.1.2 RDD操作 5.1.2.1 转换操作 5.1.2.2 行动操作 5.1.2.3 惰性机制

只有动作类型的操作才能真正触发计算

5.1.2.4 实例

找出文本文件中单行文本所包含的单词数量的最大值

val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines.map(line => line.split(" ").size).reduce((a,b) => if (a>b) a else b)

5.1.3 持久化

在Spark中,RDD采用惰性求值的机制combinebykey 《大数据处理技术Spark》--林子雨,每次遇到行动操作,都会从头开始执 行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计 算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。

()的圆括号中包含的是持久化级别参数:

val list = List("Hadoop","Spark","Hive")
val rdd = sc.parallelize(list)
rdd.cache() 
println(rdd.count()) // 第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd

5.1.4 分区

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。

为什么要分区?(1)增加并行度 (2)减少通信开销

手动设置分区:

自定义分区

import org.apache.spark.{Partitioner, SparkContext, SparkConf}
//自定义分区类,需继承Partitioner类
class UsridPartitioner(numParts:Int) extends Partitioner{
 //覆盖分区数
 override def numPartitions: Int = numParts
 //覆盖分区号获取函数
 override def getPartition(key: Any): Int = {

combinebykey 《大数据处理技术Spark》--林子雨 第1张

key.toString.toInt } } object Test { def main(args: Array[String]) { val conf=new SparkConf() val sc=new SparkContext(conf) //模拟5个分区的数据 val data=sc.parallelize(1 to 10,5) //根据尾号转变为10个分区,分写到10个文件 data.map((_,1)).partitionBy(new UsridPartitioner(10)).map(_._1).saveAsTextFile("/chenm/partition") //占位符 _ } }

5.1.5 打印元素

概括下来是:

5.2 Pair RDD 5.2.1 Pair RDD的创建 第一种创建方式:从文件中加载第二种创建方式:通过并行集合(数组)创建RDD 5.2.2 常用的Pair RDD转换操作

(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)

keys:

pairRDD.keys
pairRDD.keys.foreach(println)

d1.collect
// res16: Array[(String, Int)] = Array((c,8), (b,25), (c,17), (a,42), (b,4), (d,9), (e,17), (c,2), (f,29), (g,21), (b,9))
d1.mapValues(_+1).collect
// res17: Array[(String, Int)] = Array((c,9), (b,26), (c,18), (a,43), (b,5), (d,10), (e,18), (c,3), (f,30), (g,22), (b,10))

5.2.3 一个综合实例

给定一组键值对(“spark”,2),(“”,6),(“”,4),(“spark”,6)combinebykey 《大数据处理技术Spark》--林子雨,键值 对的key表示图书名称,value表示某天图书销量combinebykey,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。

val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()

5.3 共享变量

广播变量( )

累加器()

5.4 数据读写 5.4.1 文件数据读写 5.4.1.1 本地文件系统的数据读写

val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")
textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback.txt")

5.4.1.2 分布式文件系统HDFS的数据读写

val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
// val textFile = sc.textFile("/user/hadoop/word.txt")
// val textFile = sc.textFile("word.txt")
textFile.first()

5.4.1.3 JSON文件的数据读写

import scala.util.parsing.json.JSON
val jsonStr = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
// jsonStr.foreach(println)
val result = jsonStrs.map(s => JSON.parseFull(s))
result.foreach( {r => r match {
  case Some(map: Map[String, Any]) => println(map)
  case None => println("Parsing failed")
  case other => println("Unknown data structure: " + other)

剩下的部分直接参考林老师的ppt吧

5.4.2 读写HBase数据 5.5 程序解析 5.6 综合案例 6 Spark SQL 6.1 简介

hive on spark == Sharkcombinebykey,hive将SQL语句转为MR;Shark将SQL转为Spark的应用程序代码;Shark建立在hive上combinebykey,受限与hive,但是效率提升了10-100倍;MR是进程级别的并行,Shark是线程级别的并行,存在线程安全的保证,因此之后停止了更新Spark SQL。

spark SQL在兼容Hive基础上,只是借鉴了Hive的语法解析

6.2 和RDD区别

spark SQL采用的不是RDD,而是。是结构化的对象,查询效率更高。

6.3 的创建 6.4 从RDD到DF 6.4.1 利用反射机制推断RDD模式

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._ //导入包,支持把一个RDD隐式转换为一个DataFrame
case class Person(name: String, age: Long) // 定义一个case class
val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF.createOrReplaceTempView(“people”) // 必须注册为临时表才能供下面的查询使用
val personsRDD = spark.sql("select name,age from people where age > 20") // 最终生成一个DataFrame
personsRDD.map(t => “Name:+t(0)+,+“Age:+t(1)).show() // DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值

combinebykey 《大数据处理技术Spark》--林子雨 第2张

6.4.2 使用编程方式定义RDD模式

当无法提前定义case class时,就需要采用编程方式定义RDD模式

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//定义一个模式字符串
val schemaString = "name age"
//根据模式字符串生成模式
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
//从上面打印的信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
//对peopleRDD 这个RDD中的每一行元素都进行解析
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema)
//必须注册为临时表才能供下面查询使用
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT name,age FROM people")
results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()

6.4.3 把RDD保存成文件

val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.select("name","age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")
val textFile = sc.textFile("file:///usr/local/spark/mycode/newpeople.csv")

write.()支持输出 json,, jdbc, orc, , csv, text等格式文件

val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
df.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")

6.5 读取和保存数据 6.5.1 读写 6.5.2 通过JDBC连接数据库 安装MySQL及常用操作

mysql的jdbc驱动程序,下载地址

6.5.3 连接Hive读写数据 6.5.3.1 Hive简介和安装

《安装hive,并配置mysql作为元数据库》

6.5.3.2 让Spark包含Hive支持

测试spark版本是否支持Hive

import org.apache.spark.sql.hive.HiveContext
// 支持的输出:import org.apache.spark.sql.hive.HiveContext 

6.5.3.3 在Hive中创建数据库和表 启动: start-all.sh(已经将的路径加入到环境变量中)启动Hive:hive, 添加数据表

// hive脚本下执行
create database if not exists sparktest;//创建数据库sparktest
show databases; 
create table if not exists sparktest.student(id int,name string, gender string, age int);
use sparktest; //切换到sparktest
show tables; //显示sparktest数据库下面有哪些表
insert into student values(1,'Xueqian','F',23); //插入一条记录
insert into student values(2,'Weiliang','M',24); //再插入一条记录
select * from student; //显示student表中的记录

6.5.3.4 连接Hive读写数据 在spark-shell(包含Hive支持)中执行以下命令从Hive中读取数据

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
case class Record(key: Int, value: String)
val warehouseLocation = "spark-warehouse”
val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
import spark.implicits._
import spark.sql
sql("SELECT * FROM sparktest.student").show()

编写程序向Hive数据库的.表中插入两条数据

// 准备两条数据
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
// 设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
// 下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim,p(3).toInt))
// 建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
// 查看studentDF
studentDF.show()
// 注册临时表
studentDF.registerTempTable("tempTable")
// 插入
sql("insert into sparktest.student select * from tempTable")

加入微信交流群:************ ,请猛戳这里→点击入群

扫描二维码推送至手机访问。

版权声明:本文由前沿科技娱乐汇发布,如需转载请注明出处。

本文链接:https://kejiyl.com/post/5228.html

分享给朋友:

“combinebykey 《大数据处理技术Spark》--林子雨” 的相关文章

某著名品牌生产商或经销商,将其品牌及技术

某著名品牌生产商或经销商,将其品牌及技术

在当今的消费市场中,品牌的声誉犹如其生命之线,一旦这条线出现断裂,品牌便可能面临着前所未有的危机。近期,某知名品牌就深陷质量门事件,引发了消费者的强烈不满和纷纷抵制,这一事件犹如一颗巨石投入平静的湖面,激起了层层涟漪,让整个行业都为之震动。该知名品牌一直以来以其高品质、高知名度和广泛的市场份额而著称...

羽毛球汤尤杯赛程

羽毛球汤尤杯赛程

在体育的浩瀚星空中,有这样一支队伍,始终如同一颗璀璨的巨星,散发着无与伦比的光芒,那便是中国羽毛球队。在 2023 年汤尤杯的赛场上,他们再次书写了辉煌的篇章,包揽了两项冠军,王者归来的气势令人震撼。汤姆斯杯,被誉为世界男子羽毛球团体锦标赛的巅峰之战,每一届都吸引着全球羽毛球爱好者的目光。这一次,中...

直播电商法规

直播电商法规

在当今数字化时代,直播电商以其独特的魅力和强大的影响力,迅速崛起成为电商领域的一颗璀璨明星。随着直播电商的飞速发展,一些问题也逐渐暴露出来,诸如虚假宣传、产品质量参差不齐、主播素质良莠不齐等,这些问题不仅损害了消费者的利益,也影响了直播电商行业的健康发展。正是在这样的背景下,直播电商规范出台,行业迎...

中国游泳比赛奥运会

中国游泳比赛奥运会

游泳,作为奥运会的传统大项,一直以来都是各国体育健儿们争夺荣誉的重要战场。在过去的岁月里,中国游泳队在世界舞台上不断努力拼搏,虽取得了不少成绩,但始终与顶尖水平存在一定差距。近年来,中国游泳队在大项上取得了突破性的进展,正以崭新的姿态迈向巴黎奥运会,展现出令人瞩目的风采。中国游泳大项的突破并非一蹴而...

地摊经济的烟火气

地摊经济的烟火气

“夜市千灯照碧云,高楼红袖客纷纷。”这句古诗仿佛在描绘着如今的景象,地摊经济的复苏,让城市的烟火气又回来了。曾经,地摊经济在一些城市被视为影响市容市貌的“顽疾”,遭到了严厉的打击和取缔。随着经济形势的变化和人们生活需求的多样化,地摊经济以其独特的魅力重新走进了人们的视野。地摊经济的复苏,为城市带来了...

国产 3A 电竞大作《破界之战》发布,开启竞技新时代

国产 3A 电竞大作《破界之战》发布,开启竞技新时代

在电竞领域的浩瀚星空中,一颗璀璨的新星正在冉冉升起,那便是国产 3A 电竞大作《破界之战》。这款游戏的发布,宛如一道划破天际的光芒,为中国电竞事业注入了全新的活力与希望,正式开启了竞技的新时代。《破界之战》以其无与伦比的制作水准,展现出了国产 3A 游戏的强大实力。从精美的画面到细腻的场景设计,每一...