协同过滤推荐算法用户特征(协同过滤算法分布式实现)
协同过滤推荐算法用户特征(协同过滤算法分布式实现)1. 同现相似度 ▌ 相似度计算公式 图1 基于用户的协同过滤示例 基于物品的协同过滤(ItemCF),就是基于物品所对应的用户评分得到物品向量(以用户为维度,得到物品向量),计算相似物品,然后再进行推荐,如图2所示。 图2 基于物品的协同过滤示例
导读: 本文主要介绍协同过滤基础知识,以及分布式实现设计,并最终在 Spark 平台上对同现相似度、Cosine 相似度、欧几里得距离相似度、关联规则进行代码实现。
▌ 协同过滤推荐介绍
协同过滤推荐是最经典、最常用的推荐算法,该算法通过分析用户的兴趣,在用户群中找到指定用户的相似用户,综合这些相似用户对某一信息的评价,形成系统对该指定用户针对此信息的喜好程度的预测。常见的协同过滤有以下2种。
基于用户的协同过滤(UserCF),就是基于用户对物品的评分得到用户向量(以物品为维度,得到用户向量),计算相似用户,然后再进行推荐,如图1所示。
图1 基于用户的协同过滤示例
基于物品的协同过滤(ItemCF),就是基于物品所对应的用户评分得到物品向量(以用户为维度,得到物品向量),计算相似物品,然后再进行推荐,如图2所示。
图2 基于物品的协同过滤示例
▌ 相似度计算公式
1. 同现相似度
物品 A 和物品 B 的同现相似度公式定义如下:
其中分母 N(A) 是喜欢物品 A 的用户数、N(B) 是喜欢物品 B 的用户数,而分子
则是同时喜欢物品 A 和物品 B 的用户数。
2. 欧 几里得 距离
该距离最初用于计算欧几里得空间中两个点的距离,假设 x、y 是 n 维空间中的两个点,它们之间的欧几里得距离如下:
可以看出,当 n=2 时,欧几里得距离就是平面上两个点的距离。
当用欧几里得距离表示相似度时,一般采用以下公式进行转换:距离越小,相似度值越大。
3. 皮尔逊相关系数
皮尔逊相关系数一般用于计算两个定距变量间联系的紧密程度,它的取值范围为 [ - 1 1]。
是 x 和 y 的样品标准偏差。
4. Cosine 相似度
Cosine 相似度被广泛应用于计算文档数据的相似度:
▌ 相似度分布式实现
1. 同现相似度 分布式设计
对于同现相似度矩阵计算,首先以用户 id 为 key 进行 group by 操作,得到每个用户的所有物品集合,然后对每个用户的物品集合进行 flatMap 操作:对物品集合生成两两物品对(物品,物品),其中只生成上三角部分;之后对(物品,物品)对进行 group by 操作,得到物品与物品的总出现次数,随后再根据同现相似度公式:
w(i j) = N(i)∩N(j)/sqrt(N(i)×N(j))
其中分子是 i 与 j 的同现频次,分母的 N(i) 是 i 频次、N(j) 是 j 频次。计算物品与物品的相似度,最终得到所有上三角部分的相似度。过程如图3所示。
图3 分布式同现相似度矩阵计算过程
2. 欧几里得距离相似度分布式设计
对于欧几里得距离相似度的计算,采用离散计算公式:
d(x y) = sqrt(∑((x(i) - y(i))×(x(i) - y(i))))
其中 i 只取 x、y 同现的点,未同现的点不参与相似度计算;
sim(x y) = m / (1 d(x y))
m 为 x、y 重叠数(同现次数)。
对于 Cosine 相似度的计算,采用离散计算公式,其中 i 只取 x、y 同现的点,未同现的点不参与相似度计算。
之后的欧几里得距离相似度计算如下:对(物品,物品)对进行 group by 操作,得到物品与物品的总出现次数,以及(评分 i - 评分 j)平方累加值,并且对累加值开方,最后根据公式 m / (1 d(x y)) 计算相似度。过程如图4所示。
图4 分布式欧几里得距离相似度和 Cosine 相似度计算过程
3. Cosine 相似度计算分布式设计
Cosine 相似度计算如下:对(物品,物品)对进行 group by 操作,得到 x×y=sum(评分i× 评分j),|x|^2=sum(评分i^2),|y|^2=sum(评分j^2),最后根据公式计算相似度。过程如图4所示。
▌ 相似度计算代码实现
对同现相似度、Cosine 相似度、欧几里得距离相似度、关联规则进行代码实现,实现语言为 Scala,实现平台为 Spark,其实现代码如下:
1. 同现相似度计算
/** * 同现相似度计算 * w(i j) = N(i)∩N(j)/sqrt(N(i)*N(j)) * @param user_rdd 用户评分 * @param RDD[ItemSimi] 返回物品相似度 * */ def CooccurrenceSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = { import user_ds.sparkSession.implicits._ // 1 (用户:物品) => (用户:(物品集合)) val user_ds1 = user_ds.groupBy("userid").agg(collect_set("itemid")). withColumnRenamed ("collect_set(itemid)" "itemid_set") // 2 物品:物品,上三角数据 val user_ds2 = user_ds1.flatMap { row => val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]](1).toArray. sorted val result = new ArrayBuffer[(String String Double)]() for (i <- 0 to itemlist.length - 2) { for (j <- i 1 to itemlist.length - 1) { result = ((itemlist(i) itemlist(j) 1.0)) } } result }.withColumnRenamed("_1" "itemidI").withColumnRenamed("_2" "itemidJ"). withColumnRenamed("_3" "score") // 3 计算物品与物品,上三角,同现频次 val user_ds3 = user_ds2.groupBy("itemidI" "itemidJ").agg(sum("score").as("sumIJ")) // 4 计算物品总共出现的频次 val user_ds0 = user_ds.withColumn("score" lit(1)).groupBy("itemid").agg(sum ("score").as("score")) // 5 计算同现相似度 val user_ds4 = user_ds3.join(user_ds0.withColumnRenamed("itemid" "itemidJ"). withColumnRenamed("score" "sumJ").select("itemidJ" "sumJ") "itemidJ") val user_ds5 = user_ds4.join(user_ds0.withColumnRenamed("itemid" "itemidI").withColumnRenamed("score" "sumI").select("itemidI" "sumI") "itemidI") // 根据公式N(i)∩N(j)/sqrt(N(i)*N(j)) 计算 val user_ds6 = user_ds5.withColumn("result" col("sumIJ") / sqrt(col("sumI") * col("sumJ"))) // 6 上、下三角合并 println(s"user_ds6.count(): ${user_ds6.count()}")val user_ds8 = user_ds6. select("itemidI" "itemidJ" "result").union(user_ds6.select($"itemidJ".as("itemidI") $"itemidI".as("itemidJ") $"result")) println(s"user_ds8.count(): ${user_ds8.count()}") // 7 结果返回 val out = user_ds8.select("itemidI" "itemidJ" "result").map { row => val itemidI = row.getString(0) val itemidJ = row.getString(1) val similar = row.getDouble(2) ItemSimi(itemidI itemidJ similar) } out }
2. Cosine 相似度计算
/** * Cosine相似度计算 * T(x y) = ∑x(i)y(i) / sqrt(∑(x(i)*x(i))) * sqrt(∑(y(i)*y(i))) * @param user_rdd 用户评分 * @param RDD[ItemSimi] 返回物品相似度 * */ def CosineSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = { import user_ds.sparkSession.implicits._ // 1 数据准备 val user_ds1 = user_ds. withColumn("iv" concat_ws(":" $"itemid" $"pref")). groupBy("userid").agg(collect_set("iv")). withColumnRenamed("collect_set(iv)" "itemid_set"). select("userid" "itemid_set") // 2 物品:物品,上三角数据 val user_ds2 = user_ds1.flatMap { row => val itemlist = row.getAs[scala.collection.mutable.WrappedArray [String]](1).toArray. sorted val result = new ArrayBuffer[(String String Double Double)]() for (i <- 0 to itemlist.length - 2) { for (j <- i 1 to itemlist.length - 1) { result = ((itemlist(i).split(":")(0) itemlist(j).split(":")(0) itemlist(i). split(":")(1).toDouble itemlist(j).split(":")(1).toDouble)) } } result }.withColumnRenamed("_1" "itemidI").withColumnRenamed("_2" "itemidJ").withColumnRenamed("_3" "scoreI").withColumnRenamed("_4" "scoreJ") // 3 按照公式计算相似度 // x*y = ∑x(i)y(i) // |x|^2 = ∑(x(i)*x(i)) // |y|^2 = ∑(y(i)*y(i)) // result = x*y / sqrt(|x|^2) * sqrt(|y|^2) val user_ds3 = user_ds2. withColumn("cnt" lit(1)). groupBy("itemidI" "itemidJ"). agg(sum(($"scoreI" * $"scoreJ")).as("sum_xy") sum(($"scoreI" * $"scoreI")).as("sum_x") sum(($"scoreJ" * $"scoreJ")).as("sum_y")). withColumn("result" $"sum_xy" / (sqrt($"sum_x") * sqrt($"sum_y"))) // 4 上、下三角合并 val user_ds8 = user_ds3.select("itemidI" "itemidJ" "result"). union(user_ds3.select($"itemidJ".as("itemidI") $"itemidI".as("itemidJ") $"result")) // 5 结果返回 val out = user_ds8.select("itemidI" "itemidJ" "result").map { row => val itemidI = row.getString(0) val itemidJ = row.getString(1) val similar = row.getDouble(2) ItemSimi(itemidI itemidJ similar) } out }
3. 欧 几里得距离相似度计算
/** * 欧几里得距离相似度计算 * d(x y) = sqrt(∑((x(i)-y(i)) * (x(i)-y(i)))) * sim(x y) = n / (1 d(x y)) * @param user_rdd 用户评分 * @param RDD[ItemSimi] 返回物品相似度 * */ def EuclideanDistanceSimilarity(user_ds: Dataset[ItemPref]): Dataset[ItemSimi] = { import user_ds.sparkSession.implicits._ // 1 数据准备 val user_ds1 = user_ds. withColumn("iv" concat_ws(":" $"itemid" $"pref")). groupBy("userid").agg(collect_set("iv")). withColumnRenamed("collect_set(iv)" "itemid_set"). select("userid" "itemid_set") // 2 物品:物品,上三角数据 val user_ds2 = user_ds1.flatMap { row => val itemlist = row.getAs[scala.collection.mutable.WrappedArray[String]] (1).toArray.sorted val result = new ArrayBuffer[(String String Double Double)]() for (i <- 0 to itemlist.length - 2) { for (j <- i 1 to itemlist.length - 1) { result = ((itemlist(i).split(":")(0) itemlist(j).split(":")(0) itemlist (i).split(":")(1).toDouble itemlist(j).split(":")(1).toDouble)) } } result }.withColumnRenamed("_1" "itemidI").withColumnRenamed("_2" "itemidJ").withColumnRenamed("_3" "scoreI").withColumnRenamed("_4" "scoreJ") // 3 按照公式计算相似度 // dist = sqrt(∑((x(i)-y(i)) * (x(i)-y(i)))) // cntSum = sum(1) // result = cntSum / (1 dist) val user_ds3 = user_ds2. withColumn("cnt" lit(1)). groupBy("itemidI" "itemidJ"). agg(sqrt(sum(($"scoreI" - $"scoreJ") * ($"scoreI" - $"scoreJ"))).as("dist") sum($"cnt").as("cntSum")). withColumn("result" $"cntSum" / (lit(1.0) $"dist")) // 4 上、下三角合并 val user_ds8 = user_ds3.select("itemidI" "itemidJ" "result").union(user_ds3.select ($"itemidJ".as("itemidI") $"itemidI".as("itemidJ") $"result")) // 5 结果返回 val out = user_ds8.select("itemidI" "itemidJ" "result").map { row => val itemidI = row.getString(0) val itemidJ = row.getString(1) val similar = row.getDouble(2) ItemSimi(itemidI itemidJ similar) } out }
4. 关联规则计算
/** * 关联规则计算 * 支持度(support):在所有项集中{X Y}出现的可能性,即项集中同时含有X和Y的概率P(X U Y)/P(I) I表 * 示全部事务 * 置信度(Confidence):在先决条件X发生的条件下,关联结果Y发生的概率,即P(X U Y)/P(X) * 提升度(lift):在含有X的条件下同时含有Y的可能性与在没有X的条件下项集中含有Y的可能性之比,即 * confidence(X => Y)/P(Y) * @param user_rdd 用户评分 * @param RDD[ItemAssociation] 返回物品相似度 * */ def AssociationRules(user_ds: Dataset[ItemPref]): Dataset[ItemAssociation] = { import user_ds.sparkSession.implicits._ // 1 (用户:物品) => (用户:(物品集合)) val user_ds1 = user_ds.groupBy("userid").agg(collect_set("itemid")).withColumnRenamed ("collect_set(itemid)" "itemid_set") // 2 物品:物品,上三角数据 val user_ds2 = user_ds1.flatMap { row => val itemlist = row.getAs[WrappedArray[String]](1).toArray.sorted val result = new ArrayBuffer[(String String Double)]() for (i <- 0 to itemlist.length - 2) { for (j <- i 1 to itemlist.length - 1) { result = ((itemlist(i) itemlist(j) 1.0)) } } result }.withColumnRenamed("_1" "itemidI").withColumnRenamed("_2" "itemidJ"). withColumnRenamed("_3" "score") // 3 计算物品与物品,上三角,同现频次 val user_ds3 = user_ds2.groupBy("itemidI" "itemidJ").agg(sum("score").as("sumIJ")) // 4 计算物品总共出现的频次 val user_ds0 = user_ds.withColumn("score" lit(1)).groupBy("itemid").agg(sum ("score").as("score")) val user_all = user_ds1.count // 5 计算支持度(Support) val user_ds4 = user_ds3.select("itemidI" "itemidJ" "sumIJ"). union(user_ds3.select($"itemidJ".as("itemidI") $"itemidI".as("itemidJ") $"sumIJ")). withColumn("support" $"sumIJ" / user_all.toDouble) // user_ds4.orderBy($"support".desc).show // 6 置信度(Confidence) val user_ds5 = user_ds4. join(user_ds0.withColumnRenamed("itemid" "itemidI").withColumnRenamed("score" "sumI") "itemidI"). withColumn("confidence" $"sumIJ" / $"sumI") // user_ds5.orderBy($"confidence".desc).show // 7 提升度(lift) val user_ds6 = user_ds5. join(user_ds0.withColumnRenamed("itemid" "itemidJ").withColumnRenamed("score" "sumJ") "itemidJ"). withColumn("lift" $"confidence" / ($"sumJ" / user_all.toDouble)) // user_ds6.orderBy($"lift".desc).show // 计算同现相似度 val user_ds8 = user_ds6.withColumn("similar" col("sumIJ") / sqrt(col("sumI") * col("sumJ"))) // user_ds8.orderBy($"similar".desc).show // 8 结果返回 val out = user_ds8.select("itemidI" "itemidJ" "support" "confidence" "lift" "similar").map { row => val itemidI = row.getString(0) val itemidJ = row.getString(1) val support = row.getDouble(2) val confidence = row.getDouble(3) val lift = row.getDouble(4) val similar = row.getDouble(5) ItemAssociation(itemidI itemidJ support confidence lift similar) } out }
▌ 总结
在大规模分布式工程实践中,当样本量级比较大的时候,会导致物品向量或者用户向量维度很高(比如1亿用户,那物品的向量维度会有1亿维),会导致计算性能问题。这里在工程实践中,如何解决这个问题呢,最简单粗暴的方案就是考虑采样方法,进行降维,其中采样包括:对用户进行采样策略和对物品采样策略,最终目的使得计算性能满足所需要的性能要求。当然还有一些高级方案,比如可以借鉴 Facebook 的 Faiss 原理,这里就不具体展开讲解。