文章目录
本文内容第一部分给出Pyspark常见算子的用法,第二部分则参考书籍《Python spark2.0 Hadoop机器学习与大数据实战》的电影推荐章节。本文内容为大数据实时分析项目提供基本的入门知识。
1、PySpark简介
本节内容的图文一部分参考了这篇文章《PySpark 的背后原理 》,个人欣赏此博客作者,博文质量高,看完受益匪浅!Spark的内容不再累赘,可参考本博客《深入理解Spark》。PySpark的工作原理图示如下:
在这里,Py4J 是一个用 Python 和 Java 编写的库,它可以让Python代码实现动态访问JVM的Java对象,同时JVM也能够回调 Python对象。因此PySpark就是在Spark外围包装一层Python API,借助Py4j实现Python和Java的交互(这里的交互就是通过socket实现,传字节码),进而实现通过Python编写Spark应用程序。
在Driver端,PySparkContext通过Py4J启动一个JVM并产生一个JavaSparkContext;在Executor端,则不需要借助Py4j,因为Executor端运行的是由Driver传过来的Task业务逻辑(其实就是java的字节码)。
2、Pyspark接口用法
读取数据源
PySpark支持多种数据源读取,常见接口如下:
sc.pickleFile() # <class 'pyspark.rdd.RDD'>
sc.textFile() # <class 'pyspark.rdd.RDD'>
spark.read.json() # <class 'pyspark.sql.dataframe.DataFrame'>
spark.read.text() # <class 'pyspark.sql.dataframe.DataFrame'>
例如读取本地要注意,格式为file://+文件绝对路径
sc.textFile("file:///home/mparsian/dna_seq.txt")
# 读取hdfs上文件数据
sc.textFile("your_hadoop/data/moves.txt")
常用算子
Spark的算子分为两类:Transformation和Action。
Transformation仅仅是定义逻辑,并不会立即执行,有lazy特性,目的是将一个RDD转为新的RDD,可以基于RDDs形成lineage(DAG图);
Action:触发Job运行,真正触发driver运行job;
第一类算子:Transformation
- map(func): 返回一个新的RDD,func会作用于每个map的key,例如在wordcount例子要
rdd.map(lambda a, (a, 1))
将数据转换成(a, 1)的形式以便之后做reduce
word_rdd = sc.parallelize (
["foo", "bar", "foo", "pyspark", "kafka","kafka", 10,10]
)
word_map_rdd = word_rdd.map(lambda w: (w, 1))
mapping = word_map_rdd.collect()
print(mapping)
#输出
[('foo', 1), ('bar', 1), ('foo', 1), ('pyspark', 1), ('kafka', 1), ('kafka', 1), (10, 1), (10, 1)]
- mappartitions(func, partition): Return a new RDD by applying a function to each partition of this RDD.和map不同的地方在于map的func应用于每个元素,而这里的func会应用于每个分区,能够有效减少调用开销,减少func初始化次数。减少了初始化的内存开销。
例如将一个数据集合分成2个区,再对每个区进行累加,该方法适合对超大数据集合的分区累加处理,例如有1亿个item,分成100个分区,有10台服务器,那么每台服务器就可以负责自己10个分区的数据累加处理。
官方也提到mappartitions中如果一个分区太大,一次计算的话可能直接导致内存溢出。
rdd = sc.parallelize([10, 22, 3, 4], 2)
def f(each_partition):
yield sum(each_partition)
rdd.glom().collect()
#输出:
[[10, 22], [3, 4]]
rdd.mapPartitions(f).glom().collect()
[[32], [7]]
- filter(func): 返回一个新的RDD,func会作用于每个map的key,用于筛选数据集
rdd = sc.parallelize (["fooo", "bbbar", "foo", " ", "Aoo"])
rdd.filter(lambda x: 'foo' in x).collect()
# ['fooo', 'foo']
- flatMap(func): 返回一个新的RDD,func用在每个item,并把item切分为多个元素返回,例如wordcount例子的分类
rdd = sc.parallelize (["this is pyspark", "this is spark"])
rdd.flatMap(lambda line:line.split(' ')).collect()
#可以看到每个item为一句话,经过func后,分解为多个单词(多个元素)
# ['this', 'is', 'pyspark', 'this', 'is', 'spark']
rdd = sc.parallelize ((1,2,3))
rdd.flatMap(lambda x:(2*x,3*x)).collect()
# 对原来每个item分别乘2乘3,func返回两个item
# [2, 3, 4, 6, 6, 9]
- flatMapValues(func):flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为key-value对的RDD中Value。每个一kv对的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。
rdd = sc.parallelize([("name", ["foo", "bar", "aoo"]), ("age", ["12", "20"])])
rdd.flatMapValues(lambda x:x).collect()
# 输出结果
[('name', 'foo'),
('name', 'bar'),
('name', 'aoo'),
('age', '12'),
('age', '20')]
- mapValues(func): 返回一个新的RDD,对RDD中的每一个value应用函数func。
rdd = sc.parallelize([("name", ["foo", "bar", "aoo"]), ("age", ["12", "20"])])
rdd.mapValues(lambda value:len(value)).collect()
# [('name', 3), ('age', 2)]
- distinct(): 去除重复的元素
rdd = sc.parallelize([("a", 1),("a", 10) ,("b", 1), ("a", 1)])
rdd.distinct().collect()
# [('a', 1), ('a', 10), ('b', 1)]
- subtractByKey(other): 删除在RDD1与RDD2的key相同的项
rdd1 = sc.parallelize([("a", 1),("a", 10) ,("b", 1), ("a", 1)])
rdd2 = sc.parallelize([("a", 1),("a", 10) ,("c", 1), ("a", 1)])
rdd1.subtractByKey(rdd2).collect()
# [('b', 1)]
- subtract(other): 取差集
rdd1 = sc.parallelize([("a", 1),("a", 10) ,("b", 1), ("a", 1)])
rdd2 = sc.parallelize([("a", 1),("a", 10) ,("c", 1), ("a", 1)])
rdd1.subtract(rdd2).collect()
# [('b', 1)]
- intersection(other): 交集运算,保留在两个RDD中都有的元素
rdd1 = sc