推荐系统-ALS协同过滤算法代码实现

该文详细介绍了如何利用Spark的mllib库实现交替最小二乘(ALS)算法,该算法用于协同过滤,处理用户-商品评分矩阵的缺失值。数据从Cassandra数据库读取,经过训练和测试数据的划分,通过遍历不同的模型参数找到最佳的隐含因子数、迭代次数和惩罚值,以最小化均方误差(RMSE)。最终,ALS模型用于为用户推荐商品,并将结果保存回Cassandra。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

从协同过滤的分类来说,ALS(Alternating Least Squares,交替最小二乘)算法属于User-Item CF,也叫做混合CF,它同时考虑了User和Item两个方面,通过数量相对少的未被观察到的隐藏因子,来解释大量用户和物品之间潜在联系。ALS基于矩阵分解通过降维的方法来补全用户-物品矩阵,对矩阵中没有出现的值进行估计。

用户和物品的关系,可以抽象为如下的三元组:<User,Item,Rating>。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。

ALS基本假设:任何一个评分矩阵均可近似分解成两个低维的用户特征矩阵和物品特征矩阵。矩阵分解过程可理解成将用户和物品均抽象的映射到相同的低维潜在特征空间中。因此其基本思想是对稀疏矩阵进行模型分解,评估出缺失项的值,以此来得到一个基本的训练模型,然后依照此模型可以针对新的用户和物品数据进行评估。ALS是采用交替的最小二乘法来算出缺失项,交替最小二乘法是在最小二乘法的基础上发展而来的。

1、spark代码实现

1.1 数据入口

case class ProductRating(userId:Int, productId:Int, score:Double)

/** 训练最好模型输出

  • @param bestModel 模型
  • @param bestRanks 隐含因子
  • @param bestIters 迭代次数
  • @param bestLambdas 惩罚值
  • @param bestRmse 最佳方差值**/
    case class BestModel(bestModel:Option[MatrixFactorizationModel], bestRanks:Int, bestIters:Int, bestLambdas:Double, bestRmse:Double)

def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ALSTrainer")
    //创建sparkSession
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    //加载数据,作为rating, rdd需要应用asl
    val ratingRDD = getDFFromCass(spark, "cdp", "t_user_item_rating")
      .as[ProductRating]
      .rdd
      .map(
        rating => Rating(rating.userId, rating.productId, rating.score)
      )
    //数据切分为训练集合测试集
    val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
    val trainingRDD = splits(0)
    val testingRDD = splits(1)

    //核心实现,输出最优参数
    val bestModel = RmseUtil.predictBestRmse(trainingRDD, testingRDD)
    println("bestModel" + bestModel.bestRmse)

    val itemRecs = recommender(spark, ratingRDD, 10)
    //output result to cassandra
    saveToCass(itemRecs.toDF(), "cdp", "t_user_recs")

    spark.stop()
  }

1.2 数据加载

我们使用cassandra大数据库,实现数据的输入与存储;

 def saveToCass(saveDF: DataFrame, keyspace: String, tableName: String): Unit = {
    saveDF.write
      .format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> keyspace, "table" -> tableName))
      .mode(SaveMode.Append)
      .option("spark.cassandra.output.consistency.level", "ONE")
      .save()
  }

  def getDFFromCass(spark: SparkSession, keyspace: String, tableName: String): DataFrame = {
    val userItemDF = spark.read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("keyspace" -> keyspace, "table" -> tableName))
      .load()
      .toDF("userId", "itemId", "rating")
    userItemDF
  }

1.3 基于spark mllib物品推荐

建立ALS算法模型,设置模型参数(通过模型参数评估获得最优解),调用recommendProductsForUsers方法为用户推荐指定数量的物品。

  def recommender(spark: SparkSession, ratingRDD: RDD[Rating],  recommendNum: Int): DataFrame={
    val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
    val trainRDD= splits(0)
    val testRDD = splits(1)
    //建立ALS推荐模型
    val model = new ALS()
      .setRank(5)
      .setIterations(20)
      .setLambda(0.01)
      .setImplicitPrefs(false)
      .setUserBlocks(-1)
      .setProductBlocks(-1)
      //设置ratingRDD为所有用户推荐
      .run(trainRDD)

    val testUsersProductRDD = testRDD.map { case Rating(user, product, rate) => (user, product) }
    //得到预测评分的数据集
    val predictionRDD = model.predict(testUsersProductRDD).map {
      case Rating(user, product, rate) => ((user, product), rate)
    }
    //真实评分数据集与预测评分数据集进行合并
    val ratesAndPreds = testRDD.map { case Rating(user, product, rate) => ((user, product), rate) }.join(predictionRDD)
    //计算RMSE,这里的r1就是真实结果,r2就是预测结果
    val MSE = ratesAndPreds.map {
      case ((user, product), (r1, r2)) =>
        val err = (r1 - r2)
        err * err
    }.mean()
    println("Mean Squared Error = " + MSE)

    //用户推荐recommendNum个商品
    val userSubsetRecs = model.recommendProductsForUsers(recommendNum)
    //推荐商品列表
    val itemRecDF = userSubsetRecs.toDF("userId", "recommends")
    itemRecDF.show(5)
    itemRecDF
  }

1.4 模型参数评估

预测模型评估,预测出最好的模型参数BestModel

object RmseUtil {
  /**
   * 训练集合
   * @param trainingData 训练集合
   * @param testingData 测试集合
   * @return
   */
  def predictBestRmse(trainingData:RDD[Rating], testingData:RDD[Rating]): BestModel = {
    var bestModel: Option[MatrixFactorizationModel] = None
    var bestRanks = -1
    var bestIters = 0
    var bestLambdas = -1.0
    var bestRmse = Double.MaxValue
    //多重迭代法求最佳参数模型
    //迭代次数
    val numIters = List(5, 10, 20)
    //隐含因子
    val numRanks = List(8, 10, 12)
    //惩罚值(正则化值)
    val numLambdas = List(0.01, 0.1, 1)
    //共3*3*3种组合,每种组合迭代次数又不一样,在此会消耗大量时间
    for (rank <- numRanks; iter <- numIters; lambdas <- numLambdas) {
      //als参数为 训练集合 隐含因子 迭代次数 惩罚因子
      val model = ALS.train(trainingData, rank, iter, lambdas)
      val validationRmse = rmseComputer(model, testingData)
      //逐步迭代
      if (validationRmse < bestRmse) {
        bestModel = Some(model)
        bestRmse = validationRmse
        bestIters = iter
        bestLambdas = lambdas
        bestRanks = rank
      }
    }
    BestModel(bestModel, bestRanks, bestIters, bestLambdas, bestRmse)
  }
}

/**
   *
   * @param model       训练模型
   * @param dataOfTest  用于测试数据集合(一般是笛卡尔积)
   * @return
   */
  def rmseComputer(model: MatrixFactorizationModel, dataOfTest: RDD[Rating]):Double= {
    //预测评分矩阵:预测返回结果<user product rating>
    val predictResult = model.predict(dataOfTest.map(item => (item.user, item.product)))
    //将预测值和测试值组成一个map然后比较预测的评分值和实际值
    val predict = predictResult.map(item => ((item.user, item.product), item.rating))
    val actual = dataOfTest.map(item => ((item.user, item.product), item.rating))
    val predJoinPrevActual = predict.join(actual).values
    //直接调用回归库函数需要传入一个(prediction,actualValue)
    val evaluator = new RegressionMetrics(predJoinPrevActual)
    evaluator.meanAbsoluteError
  }
ALS(Alternating Least Squares)协同过滤算法是一种常用的推荐算法,可以用于商品推荐系统。下面我们将介绍如何使用ALS算法实现商品推荐系统。 1. 数据准备 首先需要准备数据,包括用户、商品和评分。通常评分可以是1到5的整数,表示用户对商品的兴趣程度。 2. 将数据转换为矩阵 将数据转换为一个矩阵,其中行表示用户,列表示商品,矩阵中的元素表示用户对商品的评分。 3. 分解矩阵 使用ALS算法将矩阵分解为用户矩阵和商品矩阵,其中用户矩阵包含用户的特征向量,商品矩阵包含商品的特征向量。 4. 计算预测评分 使用用户矩阵和商品矩阵计算预测评分矩阵,其中预测评分矩阵中的元素表示对应用户对对应商品的预测评分。 5. 推荐商品 根据预测评分矩阵推荐商品给用户,通常选择预测评分最高的商品作为推荐结果。 下面是一个基于ALS算法的商品推荐系统的示例代码: ```python import numpy as np from scipy.sparse.linalg import spsolve class ALSRecommender: def __init__(self, n_users, n_items, n_factors=20, reg=0.01): self.n_users = n_users self.n_items = n_items self.n_factors = n_factors self.reg = reg def fit(self, X, n_iterations=10): self.user_factors = np.random.normal(size=(self.n_users, self.n_factors)) self.item_factors = np.random.normal(size=(self.n_items, self.n_factors)) X = X.tocoo() for i in range(n_iterations): self.user_factors = self._als_step(X, self.user_factors, self.item_factors) self.item_factors = self._als_step(X.T, self.item_factors, self.user_factors) def _als_step(self, X, solve_vecs, fixed_vecs): A = fixed_vecs.T.dot(fixed_vecs) + np.eye(self.n_factors) * self.reg b = X.dot(fixed_vecs) solve_vecs = spsolve(A, b.T) return solve_vecs.T def predict(self, user_id, item_id): return self.user_factors[user_id].dot(self.item_factors[item_id]) def recommend(self, user_id, n=10): scores = np.dot(self.user_factors[user_id], self.item_factors.T) top_items = np.argsort(scores)[::-1][:n] return top_items ``` 这个代码中,我们使用了numpy和scipy库来进行矩阵计算和求解线性方程组。在fit函数中,我们使用ALS算法迭代更新用户矩阵和商品矩阵,直到收敛。在predict函数中,我们使用用户矩阵和商品矩阵预测对应用户对对应商品的评分。在recommend函数中,我们根据预测评分矩阵推荐商品给用户。 使用这个类可以很方便地实现基于ALS算法的商品推荐系统
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

琉璃梦境

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值