基于PySpark和ALS算法实现基本的电影推荐流程

本文介绍了PySpark的基本概念和接口用法,重点讲解了如何使用PySpark和ALS算法实现电影推荐流程。从Py4J的原理到Spark算子的分类,再到数据读取和转换,详细阐述了数据预处理过程。通过读取用户数据,训练ALS模型,调用模型进行推荐,并讨论了项目中的难点和推荐系统的架构设计。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >


  本文内容第一部分给出Pyspark常见算子的用法,第二部分则参考书籍《Python spark2.0 Hadoop机器学习与大数据实战》的电影推荐章节。本文内容为大数据实时分析项目提供基本的入门知识。

1、PySpark简介

  本节内容的图文一部分参考了这篇文章《PySpark 的背后原理 》,个人欣赏此博客作者,博文质量高,看完受益匪浅!Spark的内容不再累赘,可参考本博客《深入理解Spark》。PySpark的工作原理图示如下:
在这里插入图片描述  在这里,Py4J 是一个用 Python 和 Java 编写的库,它可以让Python代码实现动态访问JVM的Java对象,同时JVM也能够回调 Python对象。因此PySpark就是在Spark外围包装一层Python API,借助Py4j实现Python和Java的交互(这里的交互就是通过socket实现,传字节码),进而实现通过Python编写Spark应用程序。
在这里插入图片描述  在Driver端,PySparkContext通过Py4J启动一个JVM并产生一个JavaSparkContext;在Executor端,则不需要借助Py4j,因为Executor端运行的是由Driver传过来的Task业务逻辑(其实就是java的字节码)。

2、Pyspark接口用法
读取数据源

PySpark支持多种数据源读取,常见接口如下:

sc.pickleFile() # <class 'pyspark.rdd.RDD'>
sc.textFile() # <class 'pyspark.rdd.RDD'>
spark.read.json() # <class 'pyspark.sql.dataframe.DataFrame'>
spark.read.text() # <class 'pyspark.sql.dataframe.DataFrame'>

例如读取本地要注意,格式为file://+文件绝对路径

sc.textFile("file:///home/mparsian/dna_seq.txt")
# 读取hdfs上文件数据
sc.textFile("your_hadoop/data/moves.txt")
常用算子

Spark的算子分为两类:Transformation和Action。
Transformation仅仅是定义逻辑,并不会立即执行,有lazy特性,目的是将一个RDD转为新的RDD,可以基于RDDs形成lineage(DAG图);
Action:触发Job运行,真正触发driver运行job;

第一类算子:Transformation

  • map(func): 返回一个新的RDD,func会作用于每个map的key,例如在wordcount例子要rdd.map(lambda a, (a, 1))将数据转换成(a, 1)的形式以便之后做reduce
word_rdd = sc.parallelize (
   ["foo", "bar", "foo", "pyspark", "kafka","kafka", 10,10]
   )
word_map_rdd = word_rdd.map(lambda w: (w, 1))
mapping = word_map_rdd.collect()
print(mapping)
#输出
[('foo', 1), ('bar', 1), ('foo', 1), ('pyspark', 1), ('kafka', 1), ('kafka', 1), (10, 1), (10, 1)]
  • mappartitions(func, partition): Return a new RDD by applying a function to each partition of this RDD.和map不同的地方在于map的func应用于每个元素,而这里的func会应用于每个分区,能够有效减少调用开销,减少func初始化次数。减少了初始化的内存开销。
    例如将一个数据集合分成2个区,再对每个区进行累加,该方法适合对超大数据集合的分区累加处理,例如有1亿个item,分成100个分区,有10台服务器,那么每台服务器就可以负责自己10个分区的数据累加处理。
    官方也提到mappartitions中如果一个分区太大,一次计算的话可能直接导致内存溢出。
  rdd = sc.parallelize([10, 22, 3, 4], 2)
  def f(each_partition): 
  yield sum(each_partition)
  rdd.glom().collect()
  #输出:
  [[10, 22], [3, 4]]
  rdd.mapPartitions(f).glom().collect()
  [[32], [7]]
  • filter(func): 返回一个新的RDD,func会作用于每个map的key,用于筛选数据集
 rdd = sc.parallelize (["fooo", "bbbar", "foo", " ", "Aoo"])
 rdd.filter(lambda x: 'foo' in x).collect()
 # ['fooo', 'foo']
  • flatMap(func): 返回一个新的RDD,func用在每个item,并把item切分为多个元素返回,例如wordcount例子的分类
  rdd = sc.parallelize (["this is pyspark", "this is spark"])
  rdd.flatMap(lambda line:line.split(' ')).collect()
  #可以看到每个item为一句话,经过func后,分解为多个单词(多个元素)
  # ['this', 'is', 'pyspark', 'this', 'is', 'spark']
rdd = sc.parallelize ((1,2,3))
rdd.flatMap(lambda x:(2*x,3*x)).collect()
# 对原来每个item分别乘2乘3,func返回两个item
# [2, 3, 4, 6, 6, 9]
  • flatMapValues(func):flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为key-value对的RDD中Value。每个一kv对的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。
rdd = sc.parallelize([("name", ["foo", "bar", "aoo"]), ("age", ["12", "20"])])
rdd.flatMapValues(lambda x:x).collect()
# 输出结果
[('name', 'foo'),
 ('name', 'bar'),
 ('name', 'aoo'),
 ('age', '12'),
 ('age', '20')]
  • mapValues(func): 返回一个新的RDD,对RDD中的每一个value应用函数func。
 rdd = sc.parallelize([("name", ["foo", "bar", "aoo"]), ("age", ["12", "20"])])
 rdd.mapValues(lambda value:len(value)).collect()
 # [('name', 3), ('age', 2)]
  • distinct(): 去除重复的元素
  rdd = sc.parallelize([("a", 1),("a", 10) ,("b", 1), ("a", 1)])
  rdd.distinct().collect()
  # [('a', 1), ('a', 10), ('b', 1)]
  • subtractByKey(other): 删除在RDD1与RDD2的key相同的项
  rdd1 = sc.parallelize([("a", 1),("a", 10) ,("b", 1), ("a", 1)])
  rdd2 = sc.parallelize([("a", 1),("a", 10) ,("c", 1), ("a", 1)])
  rdd1.subtractByKey(rdd2).collect()
  # [('b', 1)]
  • subtract(other): 取差集
  rdd1 = sc.parallelize([("a", 1),("a", 10) ,("b", 1), ("a", 1)])
  rdd2 = sc.parallelize([("a", 1),("a", 10) ,("c", 1), ("a", 1)])
  rdd1.subtract(rdd2).collect()
  # [('b', 1)]
  • intersection(other): 交集运算,保留在两个RDD中都有的元素
rdd1 = sc
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值