From d58034492a2e8cc51c57d38d2019c93501c3023b Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Sat, 7 Sep 2024 02:47:19 +0000 Subject: [PATCH 01/12] feat: add string to sar --- .../python/synapse/ml/recommendation/SAR.py | 29 + .../synapse/ml/recommendation/SARModel.py | 5 + .../azure/synapse/ml/recommendation/SAR.scala | 39 +- .../synapse/ml/recommendation/SARModel.scala | 3 - .../recommendation/test_ranking.py | 64 ++- .../synapse/ml/recommendation/SARSpec.scala | 538 ++++++++++-------- .../estimators/core/_Recommendation.md | 119 ++++ 7 files changed, 531 insertions(+), 266 deletions(-) create mode 100644 core/src/main/python/synapse/ml/recommendation/SAR.py diff --git a/core/src/main/python/synapse/ml/recommendation/SAR.py b/core/src/main/python/synapse/ml/recommendation/SAR.py new file mode 100644 index 0000000000..f8b3c42ebe --- /dev/null +++ b/core/src/main/python/synapse/ml/recommendation/SAR.py @@ -0,0 +1,29 @@ +# Copyright (C) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE in the project root for information. + +import sys + +if sys.version >= "3": + basestring = str + +from synapse.ml.core.schema.Utils import * +from synapse.ml.recommendation._SAR import _SAR + +@inherit_doc +class SAR(_SAR): + def __init__(self, **kwargs): + _SAR.__init__(self, **kwargs) + + def calculateUserItemAffinities(self, dataset): + if dataset.schema[self.getUserCol()].dataType == StringType(): + dataset = dataset.withColumn(self.getUserCol(), dataset[self.getUserCol()].cast("int")) + if dataset.schema[self.getItemCol()].dataType == StringType(): + dataset = dataset.withColumn(self.getItemCol(), dataset[self.getItemCol()].cast("int")) + return self._call_java("calculateUserItemAffinities", dataset) + + def calculateItemItemSimilarity(self, dataset): + if dataset.schema[self.getUserCol()].dataType == StringType(): + dataset = dataset.withColumn(self.getUserCol(), dataset[self.getUserCol()].cast("int")) + if dataset.schema[self.getItemCol()].dataType == StringType(): + dataset = dataset.withColumn(self.getItemCol(), dataset[self.getItemCol()].cast("int")) + return self._call_java("calculateItemItemSimilarity", dataset) diff --git a/core/src/main/python/synapse/ml/recommendation/SARModel.py b/core/src/main/python/synapse/ml/recommendation/SARModel.py index 03162cbcc2..d2e777bbc5 100644 --- a/core/src/main/python/synapse/ml/recommendation/SARModel.py +++ b/core/src/main/python/synapse/ml/recommendation/SARModel.py @@ -15,3 +15,8 @@ class SARModel(_SARModel): def recommendForAllUsers(self, numItems): return self._call_java("recommendForAllUsers", numItems) + + def recommendForUserSubset(self, dataset, numItems): + if dataset.schema[self.getUserCol()].dataType == StringType(): + dataset = dataset.withColumn(self.getUserCol(), dataset[self.getUserCol()].cast("int")) + return self._call_java("recommendForUserSubset", dataset, numItems) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala index 6e9d0ace45..de32514492 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala @@ -1,6 +1,3 @@ -// Copyright (C) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See LICENSE in project root for information. - package com.microsoft.azure.synapse.ml.recommendation import breeze.linalg.{CSCMatrix => BSM} @@ -13,7 +10,7 @@ import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, I import org.apache.spark.mllib.linalg import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseMatrix} import org.apache.spark.sql.functions.{col, collect_list, sum, udf, _} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{DataFrame, Dataset} import java.text.SimpleDateFormat @@ -106,8 +103,22 @@ class SAR(override val uid: String) extends Estimator[SARModel] (0 to numItems.value).map(i => map.getOrElse(i, 0.0).toFloat).toArray }) - dataset - .withColumn(C.AffinityCol, (dataset.columns.contains(getTimeCol), dataset.columns.contains(getRatingCol)) match { + val userColType = dataset.schema(getUserCol).dataType + val itemColType = dataset.schema(getItemCol).dataType + + val castedDataset = (userColType, itemColType) match { + case (StringType, StringType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + .withColumn(getItemCol, col(getItemCol).cast("int")) + case (StringType, _) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + case (_, StringType) => + dataset.withColumn(getItemCol, col(getItemCol).cast("int")) + case _ => dataset + } + + castedDataset + .withColumn(C.AffinityCol, (castedDataset.columns.contains(getTimeCol), castedDataset.columns.contains(getRatingCol)) match { case (true, true) => blendWeights(timeDecay(col(getTimeCol)), col(getRatingCol)) case (true, false) => timeDecay(col(getTimeCol)) case (false, true) => col(getRatingCol) @@ -197,7 +208,21 @@ class SAR(override val uid: String) extends Estimator[SARModel] }) }) - dataset + val userColType = dataset.schema(getUserCol).dataType + val itemColType = dataset.schema(getItemCol).dataType + + val castedDataset = (userColType, itemColType) match { + case (StringType, StringType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + .withColumn(getItemCol, col(getItemCol).cast("int")) + case (StringType, _) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + case (_, StringType) => + dataset.withColumn(getItemCol, col(getItemCol).cast("int")) + case _ => dataset + } + + castedDataset .select(col(getItemCol), col(getUserCol)) .groupBy(getItemCol).agg(collect_list(getUserCol) as "collect_list") .withColumn(C.FeaturesCol, createItemFeaturesVector(col("collect_list"))) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala index 83a5de4022..818f9971da 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala @@ -1,6 +1,3 @@ -// Copyright (C) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See LICENSE in project root for information. - package com.microsoft.azure.synapse.ml.recommendation import com.microsoft.azure.synapse.ml.codegen.Wrappable diff --git a/core/src/test/python/synapsemltest/recommendation/test_ranking.py b/core/src/test/python/synapsemltest/recommendation/test_ranking.py index d2d439c374..c4b9aebff7 100644 --- a/core/src/test/python/synapsemltest/recommendation/test_ranking.py +++ b/core/src/test/python/synapsemltest/recommendation/test_ranking.py @@ -67,10 +67,52 @@ .cache() ) +ratings_with_strings = ( + spark.createDataFrame( + [ + ("user0", "item1", 4, 4), + ("user0", "item3", 1, 1), + ("user0", "item4", 5, 5), + ("user0", "item5", 3, 3), + ("user0", "item7", 3, 3), + ("user0", "item9", 3, 3), + ("user0", "item10", 3, 3), + ("user1", "item1", 4, 4), + ("user1", "item2", 5, 5), + ("user1", "item3", 1, 1), + ("user1", "item6", 4, 4), + ("user1", "item7", 5, 5), + ("user1", "item8", 1, 1), + ("user1", "item10", 3, 3), + ("user2", "item1", 4, 4), + ("user2", "item2", 1, 1), + ("user2", "item3", 1, 1), + ("user2", "item4", 5, 5), + ("user2", "item5", 3, 3), + ("user2", "item6", 4, 4), + ("user2", "item8", 1, 1), + ("user2", "item9", 5, 5), + ("user2", "item10", 3, 3), + ("user3", "item2", 5, 5), + ("user3", "item3", 1, 1), + ("user3", "item4", 5, 5), + ("user3", "item5", 3, 3), + ("user3", "item6", 4, 4), + ("user3", "item7", 5, 5), + ("user3", "item8", 1, 1), + ("user3", "item9", 5, 5), + ("user3", "item10", 3, 3), + ], + ["originalCustomerID", "newCategoryID", "rating", "notTime"], + ) + .coalesce(1) + .cache() +) + class RankingSpec(unittest.TestCase): @staticmethod - def adapter_evaluator(algo): + def adapter_evaluator(algo, data): recommendation_indexer = RecommendationIndexer( userInputCol=USER_ID, userOutputCol=USER_ID_INDEX, @@ -80,7 +122,7 @@ def adapter_evaluator(algo): adapter = RankingAdapter(mode="allUsers", k=5, recommender=algo) pipeline = Pipeline(stages=[recommendation_indexer, adapter]) - output = pipeline.fit(ratings).transform(ratings) + output = pipeline.fit(data).transform(data) print(str(output.take(1)) + "\n") metrics = ["ndcgAt", "fcp", "mrr"] @@ -91,13 +133,17 @@ def adapter_evaluator(algo): + str(RankingEvaluator(k=3, metricName=metric).evaluate(output)), ) - # def test_adapter_evaluator_als(self): - # als = ALS(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) - # self.adapter_evaluator(als) - # - # def test_adapter_evaluator_sar(self): - # sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) - # self.adapter_evaluator(sar) + def test_adapter_evaluator_als(self): + als = ALS(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(als, ratings) + + def test_adapter_evaluator_sar(self): + sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(sar, ratings) + + def test_adapter_evaluator_sar_with_strings(self): + sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(sar, ratings_with_strings) def test_all_tiny(self): customer_index = StringIndexer(inputCol=USER_ID, outputCol=USER_ID_INDEX) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala index 501652f68b..9fb1049a1c 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala @@ -1,247 +1,291 @@ -// Copyright (C) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See LICENSE in project root for information. - -package com.microsoft.azure.synapse.ml.recommendation - -import com.microsoft.azure.synapse.ml.core.test.fuzzing.{EstimatorFuzzing, TestObject, TransformerFuzzing} -import org.apache.spark.ml.Pipeline -import org.apache.spark.ml.util.MLReadable -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.functions.{col, udf} - -import scala.language.existentials - -class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] { - override def testObjects(): List[TestObject[SAR]] = { - List( - new TestObject(new SAR() - .setUserCol(recommendationIndexer.getUserOutputCol) - .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol), transformedDf) - ) - } - - override def reader: SAR.type = SAR - - override val epsilon = .3 - - override def modelReader: SARModel.type = SARModel - - test("SAR") { - - val algo = sar - .setSupportThreshold(1) - .setSimilarityFunction("jacccard") - .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") - - val adapter: RankingAdapter = new RankingAdapter() - .setK(5) - .setRecommender(algo) - - val recopipeline = new Pipeline() - .setStages(Array(recommendationIndexer, adapter)) - .fit(ratings) - - val output = recopipeline.transform(ratings) - - val evaluator: RankingEvaluator = new RankingEvaluator() - .setK(5) - .setNItems(10) - - assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) - assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || - evaluator.setMetricName("fcp").evaluate(output) === 0.1) - assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) - - val users: DataFrame = spark - .createDataFrame(Seq(("0","0"),("1","1"))) - .toDF(userColIndex, itemColIndex) - - val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel - .asInstanceOf[SARModel].recommendForUserSubset(users, 10) - assert(recs.count == 2) - } - - lazy val testFile: String = getClass.getResource("/demoUsage.csv.gz").getPath - lazy val simCount1: String = getClass.getResource("/sim_count1.csv.gz").getPath - lazy val simLift1: String = getClass.getResource("/sim_lift1.csv.gz").getPath - lazy val simJac1: String = getClass.getResource("/sim_jac1.csv.gz").getPath - lazy val simCount3: String = getClass.getResource("/sim_count3.csv.gz").getPath - lazy val simLift3: String = getClass.getResource("/sim_lift3.csv.gz").getPath - lazy val simJac3: String = getClass.getResource("/sim_jac3.csv.gz").getPath - lazy val userAff: String = getClass.getResource("/user_aff.csv.gz").getPath - lazy val userpredCount3: String = getClass.getResource("/userpred_count3_userid_only.csv.gz").getPath - lazy val userpredLift3: String = getClass.getResource("/userpred_lift3_userid_only.csv.gz").getPath - lazy val userpredJac3: String = getClass.getResource("/userpred_jac3_userid_only.csv.gz").getPath - - private lazy val tlcSampleData: DataFrame = spark.read - .option("header", "true") //reading the headers - .option("inferSchema", "true") - .csv(testFile).na.drop.cache - - test("tlc test sim count1")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "cooc", simCount1, userAff)) - - test("tlc test sim lift1")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "lift", simLift1, userAff)) - - test("tlc test sim jac1")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "jaccard", simJac1, userAff)) - - test("tlc test sim count3")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "cooc", simCount3, userAff)) - - test("tlc test sim lift3")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "lift", simLift3, userAff)) - - test("tlc test sim jac3")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "jaccard", simJac3, userAff)) - - test("tlc test userpred count3 userid only")( - SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "cooc", simCount3, userAff, userpredCount3)) - - test("tlc test userpred lift3 userid only")( - SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "lift", simLift3, userAff, userpredLift3)) - - test("tlc test userpred jac3 userid only")( - SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "jaccard", simJac3, userAff, userpredJac3)) - -} - -class SARModelSpec extends RankingTestBase with TransformerFuzzing[SARModel] { - override def testObjects(): Seq[TestObject[SARModel]] = { - List( - new TestObject(new SAR() - .setUserCol(recommendationIndexer.getUserOutputCol) - .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol) - .fit(transformedDf), transformedDf) - ) - } - - override def reader: MLReadable[_] = SARModel - -} - -object SarTLCSpec extends RankingTestBase { - //scalastyle:off field.name - override lazy val userCol = "userId" - override lazy val itemCol = "productId" - override lazy val ratingCol = "rating" - override lazy val userColIndex = "customerID" - override lazy val itemColIndex = "itemID" - //scalastyle:on field.name - - def testAffinityMatrices(tlcSampleData: DataFrame, - threshold: Int, - similarityFunction: String, - simFile: String, - user_aff: String): - (SARModel, RecommendationIndexerModel) = { - - val ratings = tlcSampleData - - val recommendationIndexerModel = recommendationIndexer.fit(ratings) - val transformedDf = recommendationIndexerModel.transform(ratings) - - val itemMap = recommendationIndexerModel.getItemIndex - - val model = sar - .setSupportThreshold(threshold) - .setSimilarityFunction(similarityFunction) - .setStartTime("2015/06/09T19:39:37") - .setStartTimeFormat("yyyy/MM/dd'T'h:mm:ss") - .fit(transformedDf) - - val simMap = model.getItemDataFrame.collect().map(row => { - val itemI = itemMap.getOrElse(row.getDouble(0).toInt, "-1") - val similarityVectorMap = row.getList(1).toArray.zipWithIndex.map(t => (itemMap.getOrElse(t._2, "-1"), t._1)) - .toMap - itemI -> similarityVectorMap - }).toMap - - val itemAff = spark.read.option("header", "true").csv(simFile) - itemAff.collect().foreach(row => { - val itemI = row.getString(0) - itemAff.drop("_c0").schema.fieldNames.foreach(itemJ => { - val groundTrueScore = row.getAs[String](itemJ).toFloat - val sparkSarScore = simMap.getOrElse(itemI, Map()).getOrElse(itemJ, "-1.0") - assert(groundTrueScore == sparkSarScore) - }) - }) - (model, recommendationIndexerModel) - } - - // scalastyle:off method.length - def testProductRecommendations(tlcSampleData: DataFrame, - threshold: Int, - similarityFunction: String, - simFile: String, - user_aff: String, - userPredFile: String): Unit = { - - val (model, recommendationIndexerModel) = testAffinityMatrices(tlcSampleData, threshold, similarityFunction, - simFile, - user_aff) - - val recoverUser = recommendationIndexerModel.recoverUser() - val recoverItem = recommendationIndexerModel.recoverItem() - - val usersProducts = tlcSampleData - .filter(col("userId") === "0003000098E85347") - .select("productId") - .distinct() - .collect() - .map(_.getString(0)) - - val usersProductsBC = spark.sparkContext.broadcast(usersProducts) - - val itemMapBC = spark.sparkContext.broadcast(recommendationIndexerModel.getItemIndex) - - val filterScore = udf((items: Seq[Int], ratings: Seq[Float]) => { - items.zipWithIndex - .filter(p => { - val itemId = itemMapBC.value.getOrElse[String](p._1, "-1") - val bol = usersProductsBC.value.contains(itemId) - !bol - }).map(p => (p._1, ratings.toList(p._2))) - }) - - val row = model.recommendForAllUsers(10 + usersProducts.length) - .select(col("customerID"), filterScore(col("recommendations.itemID"), col("recommendations.rating")) as - "recommendations") - .select(col("customerID"), col("recommendations._1") as "itemID", col("recommendations._2") as "rating") - .select( - recoverUser(col("customerID")) as "customerID", - recoverItem(col("itemID")(0)) as "rec1", - recoverItem(col("itemID")(1)) as "rec2", - recoverItem(col("itemID")(2)) as "rec3", - recoverItem(col("itemID")(3)) as "rec4", - recoverItem(col("itemID")(4)) as "rec5", - recoverItem(col("itemID")(5)) as "rec6", - recoverItem(col("itemID")(6)) as "rec7", - recoverItem(col("itemID")(7)) as "rec8", - recoverItem(col("itemID")(8)) as "rec9", - recoverItem(col("itemID")(9)) as "rec10", - col("rating")(0) as "score1", - col("rating")(1) as "score2", - col("rating")(2) as "score3", - col("rating")(3) as "score4", - col("rating")(4) as "score5", - col("rating")(5) as "score6", - col("rating")(6) as "score7", - col("rating")(7) as "score8", - col("rating")(8) as "score9", - col("rating")(9) as "score10") - .filter(col("customerID") === "0003000098E85347") - .take(1) - - val answer = spark.read.option("header", "true").csv(userPredFile).collect() - - assert(row(0).getString(0) == "0003000098E85347", "Assert Customer ID's Match") - (0 to 10).foreach(i => assert(row(0).getString(i) == answer(0).getString(i))) - (11 to 20).foreach(i => assert("%.3f".format(row(0).getFloat(i)) == "%.3f".format(answer(0).getString(i).toFloat))) - () - } - // scalastyle:on method.length -} + // Copyright (C) Microsoft Corporation. All rights reserved. + // Licensed under the MIT License. See LICENSE in the project root for information. + + package com.microsoft.azure.synapse.ml.recommendation + + import com.microsoft.azure.synapse.ml.core.test.fuzzing.{EstimatorFuzzing, TestObject, TransformerFuzzing} + import org.apache.spark.ml.Pipeline + import org.apache.spark.ml.util.MLReadable + import org.apache.spark.sql.DataFrame + import org.apache.spark.sql.functions.{col, udf} + + import scala.language.existentials + + class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] { + override def testObjects(): List[TestObject[SAR]] = { + List( + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol), transformedDf), + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol), transformedDfWithStrings) + ) + } + + override def reader: SAR.type = SAR + + override val epsilon = .3 + + override def modelReader: SARModel.type = SARModel + + test("SAR") { + + val algo = sar + .setSupportThreshold(1) + .setSimilarityFunction("jacccard") + .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") + + val adapter: RankingAdapter = new RankingAdapter() + .setK(5) + .setRecommender(algo) + + val recopipeline = new Pipeline() + .setStages(Array(recommendationIndexer, adapter)) + .fit(ratings) + + val output = recopipeline.transform(ratings) + + val evaluator: RankingEvaluator = new RankingEvaluator() + .setK(5) + .setNItems(10) + + assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) + assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || + evaluator.setMetricName("fcp").evaluate(output) === 0.1) + assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) + + val users: DataFrame = spark + .createDataFrame(Seq(("0","0"),("1","1"))) + .toDF(userColIndex, itemColIndex) + + val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel + .asInstanceOf[SARModel].recommendForUserSubset(users, 10) + assert(recs.count == 2) + } + + test("SAR with string userCol and itemCol") { + + val algo = sar + .setSupportThreshold(1) + .setSimilarityFunction("jacccard") + .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") + + val adapter: RankingAdapter = new RankingAdapter() + .setK(5) + .setRecommender(algo) + + val recopipeline = new Pipeline() + .setStages(Array(recommendationIndexer, adapter)) + .fit(ratingsWithStrings) + + val output = recopipeline.transform(ratingsWithStrings) + + val evaluator: RankingEvaluator = new RankingEvaluator() + .setK(5) + .setNItems(10) + + assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) + assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || + evaluator.setMetricName("fcp").evaluate(output) === 0.1) + assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) + + val users: DataFrame = spark + .createDataFrame(Seq(("user0","item0"),("user1","item1"))) + .toDF(userColIndex, itemColIndex) + + val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel + .asInstanceOf[SARModel].recommendForUserSubset(users, 10) + assert(recs.count == 2) + } + + lazy val testFile: String = getClass.getResource("/demoUsage.csv.gz").getPath + lazy val simCount1: String = getClass.getResource("/sim_count1.csv.gz").getPath + lazy val simLift1: String = getClass.getResource("/sim_lift1.csv.gz").getPath + lazy val simJac1: String = getClass.getResource("/sim_jac1.csv.gz").getPath + lazy val simCount3: String = getClass.getResource("/sim_count3.csv.gz").getPath + lazy val simLift3: String = getClass.getResource("/sim_lift3.csv.gz").getPath + lazy val simJac3: String = getClass.getResource("/sim_jac3.csv.gz").getPath + lazy val userAff: String = getClass.getResource("/user_aff.csv.gz").getPath + lazy val userpredCount3: String = getClass.getResource("/userpred_count3_userid_only.csv.gz").getPath + lazy val userpredLift3: String = getClass.getResource("/userpred_lift3_userid_only.csv.gz").getPath + lazy val userpredJac3: String = getClass.getResource("/userpred_jac3_userid_only.csv.gz").getPath + + private lazy val tlcSampleData: DataFrame = spark.read + .option("header", "true") //reading the headers + .option("inferSchema", "true") + .csv(testFile).na.drop.cache + + test("tlc test sim count1")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "cooc", simCount1, userAff)) + + test("tlc test sim lift1")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "lift", simLift1, userAff)) + + test("tlc test sim jac1")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "jaccard", simJac1, userAff)) + + test("tlc test sim count3")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "cooc", simCount3, userAff)) + + test("tlc test sim lift3")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "lift", simLift3, userAff)) + + test("tlc test sim jac3")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "jaccard", simJac3, userAff)) + + test("tlc test userpred count3 userid only")( + SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "cooc", simCount3, userAff, userpredCount3)) + + test("tlc test userpred lift3 userid only")( + SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "lift", simLift3, userAff, userpredLift3)) + + test("tlc test userpred jac3 userid only")( + SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "jaccard", simJac3, userAff, userpredJac3)) + + } + + class SARModelSpec extends RankingTestBase with TransformerFuzzing[SARModel] { + override def testObjects(): Seq[TestObject[SARModel]] = { + List( + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol) + .fit(transformedDf), transformedDf), + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol) + .fit(transformedDfWithStrings), transformedDfWithStrings) + ) + } + + override def reader: MLReadable[_] = SARModel + + } + + object SarTLCSpec extends RankingTestBase { + //scalastyle:off field.name + override lazy val userCol = "userId" + override lazy val itemCol = "productId" + override lazy val ratingCol = "rating" + override lazy val userColIndex = "customerID" + override lazy val itemColIndex = "itemID" + //scalastyle:on field.name + + def testAffinityMatrices(tlcSampleData: DataFrame, + threshold: Int, + similarityFunction: String, + simFile: String, + user_aff: String): + (SARModel, RecommendationIndexerModel) = { + + val ratings = tlcSampleData + + val recommendationIndexerModel = recommendationIndexer.fit(ratings) + val transformedDf = recommendationIndexerModel.transform(ratings) + + val itemMap = recommendationIndexerModel.getItemIndex + + val model = sar + .setSupportThreshold(threshold) + .setSimilarityFunction(similarityFunction) + .setStartTime("2015/06/09T19:39:37") + .setStartTimeFormat("yyyy/MM/dd'T'h:mm:ss") + .fit(transformedDf) + + val simMap = model.getItemDataFrame.collect().map(row => { + val itemI = itemMap.getOrElse(row.getDouble(0).toInt, "-1") + val similarityVectorMap = row.getList(1).toArray.zipWithIndex.map(t => (itemMap.getOrElse(t._2, "-1"), t._1)) + .toMap + itemI -> similarityVectorMap + }).toMap + + val itemAff = spark.read.option("header", "true").csv(simFile) + itemAff.collect().foreach(row => { + val itemI = row.getString(0) + itemAff.drop("_c0").schema.fieldNames.foreach(itemJ => { + val groundTrueScore = row.getAs[String](itemJ).toFloat + val sparkSarScore = simMap.getOrElse(itemI, Map()).getOrElse(itemJ, "-1.0") + assert(groundTrueScore == sparkSarScore) + }) + }) + (model, recommendationIndexerModel) + } + + // scalastyle:off method.length + def testProductRecommendations(tlcSampleData: DataFrame, + threshold: Int, + similarityFunction: String, + simFile: String, + user_aff: String, + userPredFile: String): Unit = { + + val (model, recommendationIndexerModel) = testAffinityMatrices(tlcSampleData, threshold, similarityFunction, + simFile, + user_aff) + + val recoverUser = recommendationIndexerModel.recoverUser() + val recoverItem = recommendationIndexerModel.recoverItem() + + val usersProducts = tlcSampleData + .filter(col("userId") === "0003000098E85347") + .select("productId") + .distinct() + .collect() + .map(_.getString(0)) + + val usersProductsBC = spark.sparkContext.broadcast(usersProducts) + + val itemMapBC = spark.sparkContext.broadcast(recommendationIndexerModel.getItemIndex) + + val filterScore = udf((items: Seq[Int], ratings: Seq[Float]) => { + items.zipWithIndex + .filter(p => { + val itemId = itemMapBC.value.getOrElse[String](p._1, "-1") + val bol = usersProductsBC.value.contains(itemId) + !bol + }).map(p => (p._1, ratings.toList(p._2))) + }) + + val row = model.recommendForAllUsers(10 + usersProducts.length) + .select(col("customerID"), filterScore(col("recommendations.itemID"), col("recommendations.rating")) as + "recommendations") + .select(col("customerID"), col("recommendations._1") as "itemID", col("recommendations._2") as "rating") + .select( + recoverUser(col("customerID")) as "customerID", + recoverItem(col("itemID")(0)) as "rec1", + recoverItem(col("itemID")(1)) as "rec2", + recoverItem(col("itemID")(2)) as "rec3", + recoverItem(col("itemID")(3)) as "rec4", + recoverItem(col("itemID")(4)) as "rec5", + recoverItem(col("itemID")(5)) as "rec6", + recoverItem(col("itemID")(6)) as "rec7", + recoverItem(col("itemID")(7)) as "rec8", + recoverItem(col("itemID")(8)) as "rec9", + recoverItem(col("itemID")(9)) as "rec10", + col("rating")(0) as "score1", + col("rating")(1) as "score2", + col("rating")(2) as "score3", + col("rating")(3) as "score4", + col("rating")(4) as "score5", + col("rating")(5) as "score6", + col("rating")(6) as "score7", + col("rating")(7) as "score8", + col("rating")(8) as "score9", + col("rating")(9) as "score10") + .filter(col("customerID") === "0003000098E85347") + .take(1) + + val answer = spark.read.option("header", "true").csv(userPredFile).collect() + + assert(row(0).getString(0) == "0003000098E85347", "Assert Customer ID's Match") + (0 to 10).foreach(i => assert(row(0).getString(i) == answer(0).getString(i))) + (11 to 20).foreach(i => assert("%.3f".format(row(0).getFloat(i)) == "%.3f".format(answer(0).getString(i).toFloat))) + () + } + // scalastyle:on method.length + } diff --git a/docs/Quick Examples/estimators/core/_Recommendation.md b/docs/Quick Examples/estimators/core/_Recommendation.md index 98f9501736..9be63c369e 100644 --- a/docs/Quick Examples/estimators/core/_Recommendation.md +++ b/docs/Quick Examples/estimators/core/_Recommendation.md @@ -61,6 +61,43 @@ ratings = (spark.createDataFrame([ .dropDuplicates() .cache()) +ratings_with_strings = (spark.createDataFrame([ + ("user0", "item1", 4, 4), + ("user0", "item3", 1, 1), + ("user0", "item4", 5, 5), + ("user0", "item5", 3, 3), + ("user0", "item7", 3, 3), + ("user0", "item9", 3, 3), + ("user0", "item10", 3, 3), + ("user1", "item1", 4, 4), + ("user1", "item2", 5, 5), + ("user1", "item3", 1, 1), + ("user1", "item6", 4, 4), + ("user1", "item7", 5, 5), + ("user1", "item8", 1, 1), + ("user1", "item10", 3, 3), + ("user2", "item1", 4, 4), + ("user2", "item2", 1, 1), + ("user2", "item3", 1, 1), + ("user2", "item4", 5, 5), + ("user2", "item5", 3, 3), + ("user2", "item6", 4, 4), + ("user2", "item8", 1, 1), + ("user2", "item9", 5, 5), + ("user2", "item10", 3, 3), + ("user3", "item2", 5, 5), + ("user3", "item3", 1, 1), + ("user3", "item4", 5, 5), + ("user3", "item5", 3, 3), + ("user3", "item6", 4, 4), + ("user3", "item7", 5, 5), + ("user3", "item8", 1, 1), + ("user3", "item9", 5, 5), + ("user3", "item10", 3, 3) + ], ["originalCustomerID", "newCategoryID", "rating", "notTime"]) + .coalesce(1) + .cache()) + recommendationIndexer = (RecommendationIndexer() .setUserInputCol("customerIDOrg") .setUserOutputCol("customerID") @@ -275,6 +312,43 @@ ratings = (spark.createDataFrame([ .dropDuplicates() .cache()) +ratings_with_strings = (spark.createDataFrame([ + ("user0", "item1", 4, 4), + ("user0", "item3", 1, 1), + ("user0", "item4", 5, 5), + ("user0", "item5", 3, 3), + ("user0", "item7", 3, 3), + ("user0", "item9", 3, 3), + ("user0", "item10", 3, 3), + ("user1", "item1", 4, 4), + ("user1", "item2", 5, 5), + ("user1", "item3", 1, 1), + ("user1", "item6", 4, 4), + ("user1", "item7", 5, 5), + ("user1", "item8", 1, 1), + ("user1", "item10", 3, 3), + ("user2", "item1", 4, 4), + ("user2", "item2", 1, 1), + ("user2", "item3", 1, 1), + ("user2", "item4", 5, 5), + ("user2", "item5", 3, 3), + ("user2", "item6", 4, 4), + ("user2", "item8", 1, 1), + ("user2", "item9", 5, 5), + ("user2", "item10", 3, 3), + ("user3", "item2", 5, 5), + ("user3", "item3", 1, 1), + ("user3", "item4", 5, 5), + ("user3", "item5", 3, 3), + ("user3", "item6", 4, 4), + ("user3", "item7", 5, 5), + ("user3", "item8", 1, 1), + ("user3", "item9", 5, 5), + ("user3", "item10", 3, 3) + ], ["originalCustomerID", "newCategoryID", "rating", "notTime"]) + .coalesce(1) + .cache()) + recommendationIndexer = (RecommendationIndexer() .setUserInputCol("customerIDOrg") .setUserOutputCol("customerID") @@ -298,6 +372,10 @@ adapter = (RankingAdapter() res1 = recommendationIndexer.fit(ratings).transform(ratings).cache() adapter.fit(res1).transform(res1).show() + +res2 = recommendationIndexer.fit(ratings_with_strings).transform(ratings_with_strings).cache() + +adapter.fit(res2).transform(res2).show() ``` @@ -344,6 +422,43 @@ val ratings = (Seq( .dropDuplicates() .cache()) +val ratings_with_strings = (Seq( + ("user0", "item1", 4, 4), + ("user0", "item3", 1, 1), + ("user0", "item4", 5, 5), + ("user0", "item5", 3, 3), + ("user0", "item7", 3, 3), + ("user0", "item9", 3, 3), + ("user0", "item10", 3, 3), + ("user1", "item1", 4, 4), + ("user1", "item2", 5, 5), + ("user1", "item3", 1, 1), + ("user1", "item6", 4, 4), + ("user1", "item7", 5, 5), + ("user1", "item8", 1, 1), + ("user1", "item10", 3, 3), + ("user2", "item1", 4, 4), + ("user2", "item2", 1, 1), + ("user2", "item3", 1, 1), + ("user2", "item4", 5, 5), + ("user2", "item5", 3, 3), + ("user2", "item6", 4, 4), + ("user2", "item8", 1, 1), + ("user2", "item9", 5, 5), + ("user2", "item10", 3, 3), + ("user3", "item2", 5, 5), + ("user3", "item3", 1, 1), + ("user3", "item4", 5, 5), + ("user3", "item5", 3, 3), + ("user3", "item6", 4, 4), + ("user3", "item7", 5, 5), + ("user3", "item8", 1, 1), + ("user3", "item9", 5, 5), + ("user3", "item10", 3, 3)) + .toDF("originalCustomerID", "newCategoryID", "rating", "notTime") + .coalesce(1) + .cache()) + val recommendationIndexer = (new RecommendationIndexer() .setUserInputCol("customerIDOrg") .setUserOutputCol("customerID") @@ -367,6 +482,10 @@ val adapter = (new RankingAdapter() val res1 = recommendationIndexer.fit(ratings).transform(ratings).cache() adapter.fit(res1).transform(res1).show() + +val res2 = recommendationIndexer.fit(ratings_with_strings).transform(ratings_with_strings).cache() + +adapter.fit(res2).transform(res2).show() ``` From 302fb7c689ff8b777b4668cf689191c4daad15ac Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Fri, 6 Sep 2024 22:48:29 -0400 Subject: [PATCH 02/12] Delete core/src/main/python/synapse/ml/recommendation/SAR.py --- .../python/synapse/ml/recommendation/SAR.py | 29 ------------------- 1 file changed, 29 deletions(-) delete mode 100644 core/src/main/python/synapse/ml/recommendation/SAR.py diff --git a/core/src/main/python/synapse/ml/recommendation/SAR.py b/core/src/main/python/synapse/ml/recommendation/SAR.py deleted file mode 100644 index f8b3c42ebe..0000000000 --- a/core/src/main/python/synapse/ml/recommendation/SAR.py +++ /dev/null @@ -1,29 +0,0 @@ -# Copyright (C) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See LICENSE in the project root for information. - -import sys - -if sys.version >= "3": - basestring = str - -from synapse.ml.core.schema.Utils import * -from synapse.ml.recommendation._SAR import _SAR - -@inherit_doc -class SAR(_SAR): - def __init__(self, **kwargs): - _SAR.__init__(self, **kwargs) - - def calculateUserItemAffinities(self, dataset): - if dataset.schema[self.getUserCol()].dataType == StringType(): - dataset = dataset.withColumn(self.getUserCol(), dataset[self.getUserCol()].cast("int")) - if dataset.schema[self.getItemCol()].dataType == StringType(): - dataset = dataset.withColumn(self.getItemCol(), dataset[self.getItemCol()].cast("int")) - return self._call_java("calculateUserItemAffinities", dataset) - - def calculateItemItemSimilarity(self, dataset): - if dataset.schema[self.getUserCol()].dataType == StringType(): - dataset = dataset.withColumn(self.getUserCol(), dataset[self.getUserCol()].cast("int")) - if dataset.schema[self.getItemCol()].dataType == StringType(): - dataset = dataset.withColumn(self.getItemCol(), dataset[self.getItemCol()].cast("int")) - return self._call_java("calculateItemItemSimilarity", dataset) From 8418cc06ac60f31dacc6865b7fd41356b74303eb Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Fri, 6 Sep 2024 22:49:56 -0400 Subject: [PATCH 03/12] Update core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala --- .../microsoft/azure/synapse/ml/recommendation/SARModel.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala index 818f9971da..83a5de4022 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SARModel.scala @@ -1,3 +1,6 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + package com.microsoft.azure.synapse.ml.recommendation import com.microsoft.azure.synapse.ml.codegen.Wrappable From f84e051e0ecfd429453715a1fbc6735485a9d4ba Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Fri, 6 Sep 2024 22:50:08 -0400 Subject: [PATCH 04/12] Update core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala --- .../com/microsoft/azure/synapse/ml/recommendation/SAR.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala index de32514492..c8fcf2830b 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala @@ -1,3 +1,6 @@ +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in project root for information. + package com.microsoft.azure.synapse.ml.recommendation import breeze.linalg.{CSCMatrix => BSM} From a38b134ca4a424f4175d3bc78f33348d7ec13333 Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Fri, 6 Sep 2024 23:48:08 -0400 Subject: [PATCH 05/12] Update SARSpec.scala --- .../synapse/ml/recommendation/SARSpec.scala | 582 +++++++++--------- 1 file changed, 291 insertions(+), 291 deletions(-) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala index 9fb1049a1c..0d41b29e28 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala @@ -1,291 +1,291 @@ - // Copyright (C) Microsoft Corporation. All rights reserved. - // Licensed under the MIT License. See LICENSE in the project root for information. - - package com.microsoft.azure.synapse.ml.recommendation - - import com.microsoft.azure.synapse.ml.core.test.fuzzing.{EstimatorFuzzing, TestObject, TransformerFuzzing} - import org.apache.spark.ml.Pipeline - import org.apache.spark.ml.util.MLReadable - import org.apache.spark.sql.DataFrame - import org.apache.spark.sql.functions.{col, udf} - - import scala.language.existentials - - class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] { - override def testObjects(): List[TestObject[SAR]] = { - List( - new TestObject(new SAR() - .setUserCol(recommendationIndexer.getUserOutputCol) - .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol), transformedDf), - new TestObject(new SAR() - .setUserCol(recommendationIndexer.getUserOutputCol) - .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol), transformedDfWithStrings) - ) - } - - override def reader: SAR.type = SAR - - override val epsilon = .3 - - override def modelReader: SARModel.type = SARModel - - test("SAR") { - - val algo = sar - .setSupportThreshold(1) - .setSimilarityFunction("jacccard") - .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") - - val adapter: RankingAdapter = new RankingAdapter() - .setK(5) - .setRecommender(algo) - - val recopipeline = new Pipeline() - .setStages(Array(recommendationIndexer, adapter)) - .fit(ratings) - - val output = recopipeline.transform(ratings) - - val evaluator: RankingEvaluator = new RankingEvaluator() - .setK(5) - .setNItems(10) - - assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) - assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || - evaluator.setMetricName("fcp").evaluate(output) === 0.1) - assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) - - val users: DataFrame = spark - .createDataFrame(Seq(("0","0"),("1","1"))) - .toDF(userColIndex, itemColIndex) - - val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel - .asInstanceOf[SARModel].recommendForUserSubset(users, 10) - assert(recs.count == 2) - } - - test("SAR with string userCol and itemCol") { - - val algo = sar - .setSupportThreshold(1) - .setSimilarityFunction("jacccard") - .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") - - val adapter: RankingAdapter = new RankingAdapter() - .setK(5) - .setRecommender(algo) - - val recopipeline = new Pipeline() - .setStages(Array(recommendationIndexer, adapter)) - .fit(ratingsWithStrings) - - val output = recopipeline.transform(ratingsWithStrings) - - val evaluator: RankingEvaluator = new RankingEvaluator() - .setK(5) - .setNItems(10) - - assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) - assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || - evaluator.setMetricName("fcp").evaluate(output) === 0.1) - assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) - - val users: DataFrame = spark - .createDataFrame(Seq(("user0","item0"),("user1","item1"))) - .toDF(userColIndex, itemColIndex) - - val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel - .asInstanceOf[SARModel].recommendForUserSubset(users, 10) - assert(recs.count == 2) - } - - lazy val testFile: String = getClass.getResource("/demoUsage.csv.gz").getPath - lazy val simCount1: String = getClass.getResource("/sim_count1.csv.gz").getPath - lazy val simLift1: String = getClass.getResource("/sim_lift1.csv.gz").getPath - lazy val simJac1: String = getClass.getResource("/sim_jac1.csv.gz").getPath - lazy val simCount3: String = getClass.getResource("/sim_count3.csv.gz").getPath - lazy val simLift3: String = getClass.getResource("/sim_lift3.csv.gz").getPath - lazy val simJac3: String = getClass.getResource("/sim_jac3.csv.gz").getPath - lazy val userAff: String = getClass.getResource("/user_aff.csv.gz").getPath - lazy val userpredCount3: String = getClass.getResource("/userpred_count3_userid_only.csv.gz").getPath - lazy val userpredLift3: String = getClass.getResource("/userpred_lift3_userid_only.csv.gz").getPath - lazy val userpredJac3: String = getClass.getResource("/userpred_jac3_userid_only.csv.gz").getPath - - private lazy val tlcSampleData: DataFrame = spark.read - .option("header", "true") //reading the headers - .option("inferSchema", "true") - .csv(testFile).na.drop.cache - - test("tlc test sim count1")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "cooc", simCount1, userAff)) - - test("tlc test sim lift1")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "lift", simLift1, userAff)) - - test("tlc test sim jac1")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "jaccard", simJac1, userAff)) - - test("tlc test sim count3")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "cooc", simCount3, userAff)) - - test("tlc test sim lift3")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "lift", simLift3, userAff)) - - test("tlc test sim jac3")( - SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "jaccard", simJac3, userAff)) - - test("tlc test userpred count3 userid only")( - SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "cooc", simCount3, userAff, userpredCount3)) - - test("tlc test userpred lift3 userid only")( - SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "lift", simLift3, userAff, userpredLift3)) - - test("tlc test userpred jac3 userid only")( - SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "jaccard", simJac3, userAff, userpredJac3)) - - } - - class SARModelSpec extends RankingTestBase with TransformerFuzzing[SARModel] { - override def testObjects(): Seq[TestObject[SARModel]] = { - List( - new TestObject(new SAR() - .setUserCol(recommendationIndexer.getUserOutputCol) - .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol) - .fit(transformedDf), transformedDf), - new TestObject(new SAR() - .setUserCol(recommendationIndexer.getUserOutputCol) - .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol) - .fit(transformedDfWithStrings), transformedDfWithStrings) - ) - } - - override def reader: MLReadable[_] = SARModel - - } - - object SarTLCSpec extends RankingTestBase { - //scalastyle:off field.name - override lazy val userCol = "userId" - override lazy val itemCol = "productId" - override lazy val ratingCol = "rating" - override lazy val userColIndex = "customerID" - override lazy val itemColIndex = "itemID" - //scalastyle:on field.name - - def testAffinityMatrices(tlcSampleData: DataFrame, - threshold: Int, - similarityFunction: String, - simFile: String, - user_aff: String): - (SARModel, RecommendationIndexerModel) = { - - val ratings = tlcSampleData - - val recommendationIndexerModel = recommendationIndexer.fit(ratings) - val transformedDf = recommendationIndexerModel.transform(ratings) - - val itemMap = recommendationIndexerModel.getItemIndex - - val model = sar - .setSupportThreshold(threshold) - .setSimilarityFunction(similarityFunction) - .setStartTime("2015/06/09T19:39:37") - .setStartTimeFormat("yyyy/MM/dd'T'h:mm:ss") - .fit(transformedDf) - - val simMap = model.getItemDataFrame.collect().map(row => { - val itemI = itemMap.getOrElse(row.getDouble(0).toInt, "-1") - val similarityVectorMap = row.getList(1).toArray.zipWithIndex.map(t => (itemMap.getOrElse(t._2, "-1"), t._1)) - .toMap - itemI -> similarityVectorMap - }).toMap - - val itemAff = spark.read.option("header", "true").csv(simFile) - itemAff.collect().foreach(row => { - val itemI = row.getString(0) - itemAff.drop("_c0").schema.fieldNames.foreach(itemJ => { - val groundTrueScore = row.getAs[String](itemJ).toFloat - val sparkSarScore = simMap.getOrElse(itemI, Map()).getOrElse(itemJ, "-1.0") - assert(groundTrueScore == sparkSarScore) - }) - }) - (model, recommendationIndexerModel) - } - - // scalastyle:off method.length - def testProductRecommendations(tlcSampleData: DataFrame, - threshold: Int, - similarityFunction: String, - simFile: String, - user_aff: String, - userPredFile: String): Unit = { - - val (model, recommendationIndexerModel) = testAffinityMatrices(tlcSampleData, threshold, similarityFunction, - simFile, - user_aff) - - val recoverUser = recommendationIndexerModel.recoverUser() - val recoverItem = recommendationIndexerModel.recoverItem() - - val usersProducts = tlcSampleData - .filter(col("userId") === "0003000098E85347") - .select("productId") - .distinct() - .collect() - .map(_.getString(0)) - - val usersProductsBC = spark.sparkContext.broadcast(usersProducts) - - val itemMapBC = spark.sparkContext.broadcast(recommendationIndexerModel.getItemIndex) - - val filterScore = udf((items: Seq[Int], ratings: Seq[Float]) => { - items.zipWithIndex - .filter(p => { - val itemId = itemMapBC.value.getOrElse[String](p._1, "-1") - val bol = usersProductsBC.value.contains(itemId) - !bol - }).map(p => (p._1, ratings.toList(p._2))) - }) - - val row = model.recommendForAllUsers(10 + usersProducts.length) - .select(col("customerID"), filterScore(col("recommendations.itemID"), col("recommendations.rating")) as - "recommendations") - .select(col("customerID"), col("recommendations._1") as "itemID", col("recommendations._2") as "rating") - .select( - recoverUser(col("customerID")) as "customerID", - recoverItem(col("itemID")(0)) as "rec1", - recoverItem(col("itemID")(1)) as "rec2", - recoverItem(col("itemID")(2)) as "rec3", - recoverItem(col("itemID")(3)) as "rec4", - recoverItem(col("itemID")(4)) as "rec5", - recoverItem(col("itemID")(5)) as "rec6", - recoverItem(col("itemID")(6)) as "rec7", - recoverItem(col("itemID")(7)) as "rec8", - recoverItem(col("itemID")(8)) as "rec9", - recoverItem(col("itemID")(9)) as "rec10", - col("rating")(0) as "score1", - col("rating")(1) as "score2", - col("rating")(2) as "score3", - col("rating")(3) as "score4", - col("rating")(4) as "score5", - col("rating")(5) as "score6", - col("rating")(6) as "score7", - col("rating")(7) as "score8", - col("rating")(8) as "score9", - col("rating")(9) as "score10") - .filter(col("customerID") === "0003000098E85347") - .take(1) - - val answer = spark.read.option("header", "true").csv(userPredFile).collect() - - assert(row(0).getString(0) == "0003000098E85347", "Assert Customer ID's Match") - (0 to 10).foreach(i => assert(row(0).getString(i) == answer(0).getString(i))) - (11 to 20).foreach(i => assert("%.3f".format(row(0).getFloat(i)) == "%.3f".format(answer(0).getString(i).toFloat))) - () - } - // scalastyle:on method.length - } +// Copyright (C) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. See LICENSE in the project root for information. + +package com.microsoft.azure.synapse.ml.recommendation + +import com.microsoft.azure.synapse.ml.core.test.fuzzing.{EstimatorFuzzing, TestObject, TransformerFuzzing} +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.util.MLReadable +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.functions.{col, udf} + +import scala.language.existentials + +class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] { + override def testObjects(): List[TestObject[SAR]] = { + List( + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol), transformedDf), + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol), transformedDfWithStrings) + ) + } + + override def reader: SAR.type = SAR + + override val epsilon = .3 + + override def modelReader: SARModel.type = SARModel + + test("SAR") { + + val algo = sar + .setSupportThreshold(1) + .setSimilarityFunction("jacccard") + .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") + + val adapter: RankingAdapter = new RankingAdapter() + .setK(5) + .setRecommender(algo) + + val recopipeline = new Pipeline() + .setStages(Array(recommendationIndexer, adapter)) + .fit(ratings) + + val output = recopipeline.transform(ratings) + + val evaluator: RankingEvaluator = new RankingEvaluator() + .setK(5) + .setNItems(10) + + assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) + assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || + evaluator.setMetricName("fcp").evaluate(output) === 0.1) + assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) + + val users: DataFrame = spark + .createDataFrame(Seq(("0","0"),("1","1"))) + .toDF(userColIndex, itemColIndex) + + val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel + .asInstanceOf[SARModel].recommendForUserSubset(users, 10) + assert(recs.count == 2) + } + + test("SAR with string userCol and itemCol") { + + val algo = sar + .setSupportThreshold(1) + .setSimilarityFunction("jacccard") + .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") + + val adapter: RankingAdapter = new RankingAdapter() + .setK(5) + .setRecommender(algo) + + val recopipeline = new Pipeline() + .setStages(Array(recommendationIndexer, adapter)) + .fit(ratingsWithStrings) + + val output = recopipeline.transform(ratingsWithStrings) + + val evaluator: RankingEvaluator = new RankingEvaluator() + .setK(5) + .setNItems(10) + + assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) + assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || + evaluator.setMetricName("fcp").evaluate(output) === 0.1) + assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) + + val users: DataFrame = spark + .createDataFrame(Seq(("user0","item0"),("user1","item1"))) + .toDF(userColIndex, itemColIndex) + + val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel + .asInstanceOf[SARModel].recommendForUserSubset(users, 10) + assert(recs.count == 2) + } + + lazy val testFile: String = getClass.getResource("/demoUsage.csv.gz").getPath + lazy val simCount1: String = getClass.getResource("/sim_count1.csv.gz").getPath + lazy val simLift1: String = getClass.getResource("/sim_lift1.csv.gz").getPath + lazy val simJac1: String = getClass.getResource("/sim_jac1.csv.gz").getPath + lazy val simCount3: String = getClass.getResource("/sim_count3.csv.gz").getPath + lazy val simLift3: String = getClass.getResource("/sim_lift3.csv.gz").getPath + lazy val simJac3: String = getClass.getResource("/sim_jac3.csv.gz").getPath + lazy val userAff: String = getClass.getResource("/user_aff.csv.gz").getPath + lazy val userpredCount3: String = getClass.getResource("/userpred_count3_userid_only.csv.gz").getPath + lazy val userpredLift3: String = getClass.getResource("/userpred_lift3_userid_only.csv.gz").getPath + lazy val userpredJac3: String = getClass.getResource("/userpred_jac3_userid_only.csv.gz").getPath + + private lazy val tlcSampleData: DataFrame = spark.read + .option("header", "true") //reading the headers + .option("inferSchema", "true") + .csv(testFile).na.drop.cache + + test("tlc test sim count1")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "cooc", simCount1, userAff)) + + test("tlc test sim lift1")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "lift", simLift1, userAff)) + + test("tlc test sim jac1")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 1, "jaccard", simJac1, userAff)) + + test("tlc test sim count3")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "cooc", simCount3, userAff)) + + test("tlc test sim lift3")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "lift", simLift3, userAff)) + + test("tlc test sim jac3")( + SarTLCSpec.testAffinityMatrices(tlcSampleData, 3, "jaccard", simJac3, userAff)) + + test("tlc test userpred count3 userid only")( + SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "cooc", simCount3, userAff, userpredCount3)) + + test("tlc test userpred lift3 userid only")( + SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "lift", simLift3, userAff, userpredLift3)) + + test("tlc test userpred jac3 userid only")( + SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "jaccard", simJac3, userAff, userpredJac3)) + +} + +class SARModelSpec extends RankingTestBase with TransformerFuzzing[SARModel] { + override def testObjects(): Seq[TestObject[SARModel]] = { + List( + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol) + .fit(transformedDf), transformedDf), + new TestObject(new SAR() + .setUserCol(recommendationIndexer.getUserOutputCol) + .setItemCol(recommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol) + .fit(transformedDfWithStrings), transformedDfWithStrings) + ) + } + + override def reader: MLReadable[_] = SARModel + +} + +object SarTLCSpec extends RankingTestBase { + //scalastyle:off field.name + override lazy val userCol = "userId" + override lazy val itemCol = "productId" + override lazy val ratingCol = "rating" + override lazy val userColIndex = "customerID" + override lazy val itemColIndex = "itemID" + //scalastyle:on field.name + + def testAffinityMatrices(tlcSampleData: DataFrame, + threshold: Int, + similarityFunction: String, + simFile: String, + user_aff: String): + (SARModel, RecommendationIndexerModel) = { + + val ratings = tlcSampleData + + val recommendationIndexerModel = recommendationIndexer.fit(ratings) + val transformedDf = recommendationIndexerModel.transform(ratings) + + val itemMap = recommendationIndexerModel.getItemIndex + + val model = sar + .setSupportThreshold(threshold) + .setSimilarityFunction(similarityFunction) + .setStartTime("2015/06/09T19:39:37") + .setStartTimeFormat("yyyy/MM/dd'T'h:mm:ss") + .fit(transformedDf) + + val simMap = model.getItemDataFrame.collect().map(row => { + val itemI = itemMap.getOrElse(row.getDouble(0).toInt, "-1") + val similarityVectorMap = row.getList(1).toArray.zipWithIndex.map(t => (itemMap.getOrElse(t._2, "-1"), t._1)) + .toMap + itemI -> similarityVectorMap + }).toMap + + val itemAff = spark.read.option("header", "true").csv(simFile) + itemAff.collect().foreach(row => { + val itemI = row.getString(0) + itemAff.drop("_c0").schema.fieldNames.foreach(itemJ => { + val groundTrueScore = row.getAs[String](itemJ).toFloat + val sparkSarScore = simMap.getOrElse(itemI, Map()).getOrElse(itemJ, "-1.0") + assert(groundTrueScore == sparkSarScore) + }) + }) + (model, recommendationIndexerModel) + } + + // scalastyle:off method.length + def testProductRecommendations(tlcSampleData: DataFrame, + threshold: Int, + similarityFunction: String, + simFile: String, + user_aff: String, + userPredFile: String): Unit = { + + val (model, recommendationIndexerModel) = testAffinityMatrices(tlcSampleData, threshold, similarityFunction, + simFile, + user_aff) + + val recoverUser = recommendationIndexerModel.recoverUser() + val recoverItem = recommendationIndexerModel.recoverItem() + + val usersProducts = tlcSampleData + .filter(col("userId") === "0003000098E85347") + .select("productId") + .distinct() + .collect() + .map(_.getString(0)) + + val usersProductsBC = spark.sparkContext.broadcast(usersProducts) + + val itemMapBC = spark.sparkContext.broadcast(recommendationIndexerModel.getItemIndex) + + val filterScore = udf((items: Seq[Int], ratings: Seq[Float]) => { + items.zipWithIndex + .filter(p => { + val itemId = itemMapBC.value.getOrElse[String](p._1, "-1") + val bol = usersProductsBC.value.contains(itemId) + !bol + }).map(p => (p._1, ratings.toList(p._2))) + }) + + val row = model.recommendForAllUsers(10 + usersProducts.length) + .select(col("customerID"), filterScore(col("recommendations.itemID"), col("recommendations.rating")) as + "recommendations") + .select(col("customerID"), col("recommendations._1") as "itemID", col("recommendations._2") as "rating") + .select( + recoverUser(col("customerID")) as "customerID", + recoverItem(col("itemID")(0)) as "rec1", + recoverItem(col("itemID")(1)) as "rec2", + recoverItem(col("itemID")(2)) as "rec3", + recoverItem(col("itemID")(3)) as "rec4", + recoverItem(col("itemID")(4)) as "rec5", + recoverItem(col("itemID")(5)) as "rec6", + recoverItem(col("itemID")(6)) as "rec7", + recoverItem(col("itemID")(7)) as "rec8", + recoverItem(col("itemID")(8)) as "rec9", + recoverItem(col("itemID")(9)) as "rec10", + col("rating")(0) as "score1", + col("rating")(1) as "score2", + col("rating")(2) as "score3", + col("rating")(3) as "score4", + col("rating")(4) as "score5", + col("rating")(5) as "score6", + col("rating")(6) as "score7", + col("rating")(7) as "score8", + col("rating")(8) as "score9", + col("rating")(9) as "score10") + .filter(col("customerID") === "0003000098E85347") + .take(1) + + val answer = spark.read.option("header", "true").csv(userPredFile).collect() + + assert(row(0).getString(0) == "0003000098E85347", "Assert Customer ID's Match") + (0 to 10).foreach(i => assert(row(0).getString(i) == answer(0).getString(i))) + (11 to 20).foreach(i => assert("%.3f".format(row(0).getFloat(i)) == "%.3f".format(answer(0).getString(i).toFloat))) + () + } + // scalastyle:on method.length +} From 129081da34f5c22eee8a2efef0456fa1cac412fe Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Fri, 6 Sep 2024 23:48:29 -0400 Subject: [PATCH 06/12] Apply suggestions from code review --- .../com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala index 0d41b29e28..ae9bda71da 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala @@ -1,5 +1,5 @@ // Copyright (C) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See LICENSE in the project root for information. +// Licensed under the MIT License. See LICENSE in project root for information. package com.microsoft.azure.synapse.ml.recommendation From fc8a97fa5d381e52c0a65c815d83da6c867981aa Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Fri, 6 Sep 2024 23:52:57 -0400 Subject: [PATCH 07/12] Apply suggestions from code review --- .../estimators/core/_Recommendation.md | 82 ------------------- 1 file changed, 82 deletions(-) diff --git a/docs/Quick Examples/estimators/core/_Recommendation.md b/docs/Quick Examples/estimators/core/_Recommendation.md index 9be63c369e..3a0d8d0fc8 100644 --- a/docs/Quick Examples/estimators/core/_Recommendation.md +++ b/docs/Quick Examples/estimators/core/_Recommendation.md @@ -312,43 +312,6 @@ ratings = (spark.createDataFrame([ .dropDuplicates() .cache()) -ratings_with_strings = (spark.createDataFrame([ - ("user0", "item1", 4, 4), - ("user0", "item3", 1, 1), - ("user0", "item4", 5, 5), - ("user0", "item5", 3, 3), - ("user0", "item7", 3, 3), - ("user0", "item9", 3, 3), - ("user0", "item10", 3, 3), - ("user1", "item1", 4, 4), - ("user1", "item2", 5, 5), - ("user1", "item3", 1, 1), - ("user1", "item6", 4, 4), - ("user1", "item7", 5, 5), - ("user1", "item8", 1, 1), - ("user1", "item10", 3, 3), - ("user2", "item1", 4, 4), - ("user2", "item2", 1, 1), - ("user2", "item3", 1, 1), - ("user2", "item4", 5, 5), - ("user2", "item5", 3, 3), - ("user2", "item6", 4, 4), - ("user2", "item8", 1, 1), - ("user2", "item9", 5, 5), - ("user2", "item10", 3, 3), - ("user3", "item2", 5, 5), - ("user3", "item3", 1, 1), - ("user3", "item4", 5, 5), - ("user3", "item5", 3, 3), - ("user3", "item6", 4, 4), - ("user3", "item7", 5, 5), - ("user3", "item8", 1, 1), - ("user3", "item9", 5, 5), - ("user3", "item10", 3, 3) - ], ["originalCustomerID", "newCategoryID", "rating", "notTime"]) - .coalesce(1) - .cache()) - recommendationIndexer = (RecommendationIndexer() .setUserInputCol("customerIDOrg") .setUserOutputCol("customerID") @@ -372,10 +335,6 @@ adapter = (RankingAdapter() res1 = recommendationIndexer.fit(ratings).transform(ratings).cache() adapter.fit(res1).transform(res1).show() - -res2 = recommendationIndexer.fit(ratings_with_strings).transform(ratings_with_strings).cache() - -adapter.fit(res2).transform(res2).show() ``` @@ -422,43 +381,6 @@ val ratings = (Seq( .dropDuplicates() .cache()) -val ratings_with_strings = (Seq( - ("user0", "item1", 4, 4), - ("user0", "item3", 1, 1), - ("user0", "item4", 5, 5), - ("user0", "item5", 3, 3), - ("user0", "item7", 3, 3), - ("user0", "item9", 3, 3), - ("user0", "item10", 3, 3), - ("user1", "item1", 4, 4), - ("user1", "item2", 5, 5), - ("user1", "item3", 1, 1), - ("user1", "item6", 4, 4), - ("user1", "item7", 5, 5), - ("user1", "item8", 1, 1), - ("user1", "item10", 3, 3), - ("user2", "item1", 4, 4), - ("user2", "item2", 1, 1), - ("user2", "item3", 1, 1), - ("user2", "item4", 5, 5), - ("user2", "item5", 3, 3), - ("user2", "item6", 4, 4), - ("user2", "item8", 1, 1), - ("user2", "item9", 5, 5), - ("user2", "item10", 3, 3), - ("user3", "item2", 5, 5), - ("user3", "item3", 1, 1), - ("user3", "item4", 5, 5), - ("user3", "item5", 3, 3), - ("user3", "item6", 4, 4), - ("user3", "item7", 5, 5), - ("user3", "item8", 1, 1), - ("user3", "item9", 5, 5), - ("user3", "item10", 3, 3)) - .toDF("originalCustomerID", "newCategoryID", "rating", "notTime") - .coalesce(1) - .cache()) - val recommendationIndexer = (new RecommendationIndexer() .setUserInputCol("customerIDOrg") .setUserOutputCol("customerID") @@ -482,10 +404,6 @@ val adapter = (new RankingAdapter() val res1 = recommendationIndexer.fit(ratings).transform(ratings).cache() adapter.fit(res1).transform(res1).show() - -val res2 = recommendationIndexer.fit(ratings_with_strings).transform(ratings_with_strings).cache() - -adapter.fit(res2).transform(res2).show() ``` From 92685b824cdce007eac834347ad70de6216cb697 Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Fri, 6 Sep 2024 23:53:23 -0400 Subject: [PATCH 08/12] Update docs/Quick Examples/estimators/core/_Recommendation.md --- .../estimators/core/_Recommendation.md | 37 ------------------- 1 file changed, 37 deletions(-) diff --git a/docs/Quick Examples/estimators/core/_Recommendation.md b/docs/Quick Examples/estimators/core/_Recommendation.md index 3a0d8d0fc8..98f9501736 100644 --- a/docs/Quick Examples/estimators/core/_Recommendation.md +++ b/docs/Quick Examples/estimators/core/_Recommendation.md @@ -61,43 +61,6 @@ ratings = (spark.createDataFrame([ .dropDuplicates() .cache()) -ratings_with_strings = (spark.createDataFrame([ - ("user0", "item1", 4, 4), - ("user0", "item3", 1, 1), - ("user0", "item4", 5, 5), - ("user0", "item5", 3, 3), - ("user0", "item7", 3, 3), - ("user0", "item9", 3, 3), - ("user0", "item10", 3, 3), - ("user1", "item1", 4, 4), - ("user1", "item2", 5, 5), - ("user1", "item3", 1, 1), - ("user1", "item6", 4, 4), - ("user1", "item7", 5, 5), - ("user1", "item8", 1, 1), - ("user1", "item10", 3, 3), - ("user2", "item1", 4, 4), - ("user2", "item2", 1, 1), - ("user2", "item3", 1, 1), - ("user2", "item4", 5, 5), - ("user2", "item5", 3, 3), - ("user2", "item6", 4, 4), - ("user2", "item8", 1, 1), - ("user2", "item9", 5, 5), - ("user2", "item10", 3, 3), - ("user3", "item2", 5, 5), - ("user3", "item3", 1, 1), - ("user3", "item4", 5, 5), - ("user3", "item5", 3, 3), - ("user3", "item6", 4, 4), - ("user3", "item7", 5, 5), - ("user3", "item8", 1, 1), - ("user3", "item9", 5, 5), - ("user3", "item10", 3, 3) - ], ["originalCustomerID", "newCategoryID", "rating", "notTime"]) - .coalesce(1) - .cache()) - recommendationIndexer = (RecommendationIndexer() .setUserInputCol("customerIDOrg") .setUserOutputCol("customerID") From dbade6e5faed148ad6d9c1ddd2c5e1e172bddcf1 Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Fri, 6 Sep 2024 23:54:58 -0400 Subject: [PATCH 09/12] Apply suggestions from code review --- core/src/main/python/synapse/ml/recommendation/SARModel.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/src/main/python/synapse/ml/recommendation/SARModel.py b/core/src/main/python/synapse/ml/recommendation/SARModel.py index d2e777bbc5..03162cbcc2 100644 --- a/core/src/main/python/synapse/ml/recommendation/SARModel.py +++ b/core/src/main/python/synapse/ml/recommendation/SARModel.py @@ -15,8 +15,3 @@ class SARModel(_SARModel): def recommendForAllUsers(self, numItems): return self._call_java("recommendForAllUsers", numItems) - - def recommendForUserSubset(self, dataset, numItems): - if dataset.schema[self.getUserCol()].dataType == StringType(): - dataset = dataset.withColumn(self.getUserCol(), dataset[self.getUserCol()].cast("int")) - return self._call_java("recommendForUserSubset", dataset, numItems) From 09557ea7bafd2c0c129acc40afd79ab6b3e54d28 Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Sat, 7 Sep 2024 00:25:44 -0400 Subject: [PATCH 10/12] Update SAR.scala to handle integer types for userId and itemId * **SAR.scala** - Update `calculateUserItemAffinities` method to handle integer types for `userId` and `itemId` - Update `calculateItemItemSimilarity` method to handle integer types for `userId` and `itemId` * **test_ranking.py** - Add test cases to verify the functionality of SAR model with integer types for `userId` and `itemId` --- .../azure/synapse/ml/recommendation/SAR.scala | 14 +++++- .../recommendation/test_ranking.py | 46 +++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala index c8fcf2830b..1661db2e28 100644 --- a/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala +++ b/core/src/main/scala/com/microsoft/azure/synapse/ml/recommendation/SAR.scala @@ -13,7 +13,7 @@ import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, I import org.apache.spark.mllib.linalg import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseMatrix} import org.apache.spark.sql.functions.{col, collect_list, sum, udf, _} -import org.apache.spark.sql.types.{StringType, StructType} +import org.apache.spark.sql.types.{StringType, StructType, IntegerType, LongType} import org.apache.spark.sql.{DataFrame, Dataset} import java.text.SimpleDateFormat @@ -117,6 +117,12 @@ class SAR(override val uid: String) extends Estimator[SARModel] dataset.withColumn(getUserCol, col(getUserCol).cast("int")) case (_, StringType) => dataset.withColumn(getItemCol, col(getItemCol).cast("int")) + case (IntegerType, IntegerType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + .withColumn(getItemCol, col(getItemCol).cast("int")) + case (LongType, LongType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("long")) + .withColumn(getItemCol, col(getItemCol).cast("long")) case _ => dataset } @@ -222,6 +228,12 @@ class SAR(override val uid: String) extends Estimator[SARModel] dataset.withColumn(getUserCol, col(getUserCol).cast("int")) case (_, StringType) => dataset.withColumn(getItemCol, col(getItemCol).cast("int")) + case (IntegerType, IntegerType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("int")) + .withColumn(getItemCol, col(getItemCol).cast("int")) + case (LongType, LongType) => + dataset.withColumn(getUserCol, col(getUserCol).cast("long")) + .withColumn(getItemCol, col(getItemCol).cast("long")) case _ => dataset } diff --git a/core/src/test/python/synapsemltest/recommendation/test_ranking.py b/core/src/test/python/synapsemltest/recommendation/test_ranking.py index c4b9aebff7..88f015d0d4 100644 --- a/core/src/test/python/synapsemltest/recommendation/test_ranking.py +++ b/core/src/test/python/synapsemltest/recommendation/test_ranking.py @@ -109,6 +109,48 @@ .cache() ) +ratings_with_integers = ( + spark.createDataFrame( + [ + (0, 1, 4, 4), + (0, 3, 1, 1), + (0, 4, 5, 5), + (0, 5, 3, 3), + (0, 7, 3, 3), + (0, 9, 3, 3), + (0, 10, 3, 3), + (1, 1, 4, 4), + (1, 2, 5, 5), + (1, 3, 1, 1), + (1, 6, 4, 4), + (1, 7, 5, 5), + (1, 8, 1, 1), + (1, 10, 3, 3), + (2, 1, 4, 4), + (2, 2, 1, 1), + (2, 3, 1, 1), + (2, 4, 5, 5), + (2, 5, 3, 3), + (2, 6, 4, 4), + (2, 8, 1, 1), + (2, 9, 5, 5), + (2, 10, 3, 3), + (3, 2, 5, 5), + (3, 3, 1, 1), + (3, 4, 5, 5), + (3, 5, 3, 3), + (3, 6, 4, 4), + (3, 7, 5, 5), + (3, 8, 1, 1), + (3, 9, 5, 5), + (3, 10, 3, 3), + ], + ["originalCustomerID", "newCategoryID", "rating", "notTime"], + ) + .coalesce(1) + .cache() +) + class RankingSpec(unittest.TestCase): @staticmethod @@ -145,6 +187,10 @@ def test_adapter_evaluator_sar_with_strings(self): sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) self.adapter_evaluator(sar, ratings_with_strings) + def test_adapter_evaluator_sar_with_integers(self): + sar = SAR(userCol=USER_ID_INDEX, itemCol=ITEM_ID_INDEX, ratingCol=RATING_ID) + self.adapter_evaluator(sar, ratings_with_integers) + def test_all_tiny(self): customer_index = StringIndexer(inputCol=USER_ID, outputCol=USER_ID_INDEX) ratings_index = StringIndexer(inputCol=ITEM_ID, outputCol=ITEM_ID_INDEX) From 11dae039e7da44d86bd90b8734bde15202e1ae94 Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Sat, 7 Sep 2024 15:29:47 -0400 Subject: [PATCH 11/12] Discard changes to core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala --- .../synapse/ml/recommendation/SARSpec.scala | 48 +------------------ 1 file changed, 2 insertions(+), 46 deletions(-) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala index ae9bda71da..501652f68b 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala @@ -17,11 +17,7 @@ class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] { new TestObject(new SAR() .setUserCol(recommendationIndexer.getUserOutputCol) .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol), transformedDf), - new TestObject(new SAR() - .setUserCol(recommendationIndexer.getUserOutputCol) - .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol), transformedDfWithStrings) + .setRatingCol(ratingCol), transformedDf) ) } @@ -66,41 +62,6 @@ class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] { assert(recs.count == 2) } - test("SAR with string userCol and itemCol") { - - val algo = sar - .setSupportThreshold(1) - .setSimilarityFunction("jacccard") - .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") - - val adapter: RankingAdapter = new RankingAdapter() - .setK(5) - .setRecommender(algo) - - val recopipeline = new Pipeline() - .setStages(Array(recommendationIndexer, adapter)) - .fit(ratingsWithStrings) - - val output = recopipeline.transform(ratingsWithStrings) - - val evaluator: RankingEvaluator = new RankingEvaluator() - .setK(5) - .setNItems(10) - - assert(evaluator.setMetricName("ndcgAt").evaluate(output) === 0.602819875812812) - assert(evaluator.setMetricName("fcp").evaluate(output) === 0.05 || - evaluator.setMetricName("fcp").evaluate(output) === 0.1) - assert(evaluator.setMetricName("mrr").evaluate(output) === 1.0) - - val users: DataFrame = spark - .createDataFrame(Seq(("user0","item0"),("user1","item1"))) - .toDF(userColIndex, itemColIndex) - - val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel - .asInstanceOf[SARModel].recommendForUserSubset(users, 10) - assert(recs.count == 2) - } - lazy val testFile: String = getClass.getResource("/demoUsage.csv.gz").getPath lazy val simCount1: String = getClass.getResource("/sim_count1.csv.gz").getPath lazy val simLift1: String = getClass.getResource("/sim_lift1.csv.gz").getPath @@ -154,12 +115,7 @@ class SARModelSpec extends RankingTestBase with TransformerFuzzing[SARModel] { .setUserCol(recommendationIndexer.getUserOutputCol) .setItemCol(recommendationIndexer.getItemOutputCol) .setRatingCol(ratingCol) - .fit(transformedDf), transformedDf), - new TestObject(new SAR() - .setUserCol(recommendationIndexer.getUserOutputCol) - .setItemCol(recommendationIndexer.getItemOutputCol) - .setRatingCol(ratingCol) - .fit(transformedDfWithStrings), transformedDfWithStrings) + .fit(transformedDf), transformedDf) ) } From 9981bee2df43056e302cc48a530965f3727b8d63 Mon Sep 17 00:00:00 2001 From: Daniel Ciborowski Date: Sat, 7 Sep 2024 15:39:12 -0400 Subject: [PATCH 12/12] Add new test cases for User Column with Strings and other datatypes in SARSpec.scala * Add a test case for handling User Column with Strings * Add a test case for handling User Column with different datatypes * Verify the handling of User Column with Strings and other datatypes in SAR.scala * Ensure the new test cases are concise and focused on the new code * Place the new test cases in an appropriate location within the file --- .../synapse/ml/recommendation/SARSpec.scala | 125 +++++++++++++++++- 1 file changed, 122 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala index 501652f68b..e3abbb0423 100644 --- a/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala +++ b/core/src/test/scala/com/microsoft/azure/synapse/ml/recommendation/SARSpec.scala @@ -1,6 +1,3 @@ -// Copyright (C) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. See LICENSE in project root for information. - package com.microsoft.azure.synapse.ml.recommendation import com.microsoft.azure.synapse.ml.core.test.fuzzing.{EstimatorFuzzing, TestObject, TransformerFuzzing} @@ -106,6 +103,128 @@ class SARSpec extends RankingTestBase with EstimatorFuzzing[SAR] { test("tlc test userpred jac3 userid only")( SarTLCSpec.testProductRecommendations(tlcSampleData, 3, "jaccard", simJac3, userAff, userpredJac3)) + test("SAR with String User Column") { + val stringUserCol = "stringUserId" + val stringItemCol = "stringItemId" + + val stringRatings: DataFrame = spark + .createDataFrame(Seq( + ("user1", "item1", 2), + ("user1", "item3", 1), + ("user1", "item4", 5), + ("user2", "item1", 4), + ("user2", "item2", 5), + ("user2", "item3", 1), + ("user3", "item1", 4), + ("user3", "item3", 1), + ("user3", "item4", 5) + )) + .toDF(stringUserCol, stringItemCol, ratingCol) + .dropDuplicates() + .cache() + + val stringRecommendationIndexer: RecommendationIndexer = new RecommendationIndexer() + .setUserInputCol(stringUserCol) + .setUserOutputCol(userColIndex) + .setItemInputCol(stringItemCol) + .setItemOutputCol(itemColIndex) + .setRatingCol(ratingCol) + + val transformedStringDf: DataFrame = stringRecommendationIndexer.fit(stringRatings) + .transform(stringRatings).cache() + + val algo = new SAR() + .setUserCol(stringRecommendationIndexer.getUserOutputCol) + .setItemCol(stringRecommendationIndexer.getItemOutputCol) + .setRatingCol(ratingCol) + .setSupportThreshold(1) + .setSimilarityFunction("jaccard") + .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") + + val adapter: RankingAdapter = new RankingAdapter() + .setK(5) + .setRecommender(algo) + + val recopipeline = new Pipeline() + .setStages(Array(stringRecommendationIndexer, adapter)) + .fit(stringRatings) + + val output = recopipeline.transform(stringRatings) + + val evaluator: RankingEvaluator = new RankingEvaluator() + .setK(5) + .setNItems(10) + + assert(evaluator.setMetricName("ndcgAt").evaluate(output) > 0.0) + assert(evaluator.setMetricName("fcp").evaluate(output) > 0.0) + assert(evaluator.setMetricName("mrr").evaluate(output) > 0.0) + + val users: DataFrame = spark + .createDataFrame(Seq(("user1", "item1"), ("user2", "item2"))) + .toDF(stringUserCol, stringItemCol) + + val recs = recopipeline.stages(1).asInstanceOf[RankingAdapterModel].getRecommenderModel + .asInstanceOf[SARModel].recommendForUserSubset(users, 10) + assert(recs.count == 2) + } + + test("SAR with Different DataTypes in User Column") { + val mixedUserCol = "mixedUserId" + val mixedItemCol = "mixedItemId" + + val mixedRatings: DataFrame = spark + .createDataFrame(Seq( + (1, "item1", 2), + (1, "item3", 1), + (1, "item4", 5), + (2, "item1", 4), + (2, "item2", 5), + (2, "item3", 1), + (3, "item1", 4), + (3, "item3", 1), + (3, "item4", 5), + ("user4", "item1", 3), + ("user4", "item2", 2), + ("user4", "item3", 4) + )) + .toDF(mixedUserCol, mixedItemCol, ratingCol) + .dropDuplicates() + .cache() + + val algo = new SAR() + .setUserCol(mixedUserCol) + .setItemCol(mixedItemCol) + .setRatingCol(ratingCol) + .setSupportThreshold(1) + .setSimilarityFunction("jaccard") + .setActivityTimeFormat("EEE MMM dd HH:mm:ss Z yyyy") + + val adapter: RankingAdapter = new RankingAdapter() + .setK(5) + .setRecommender(algo) + + val recopipeline = new Pipeline() + .setStages(Array(adapter)) + .fit(mixedRatings) + + val output = recopipeline.transform(mixedRatings) + + val evaluator: RankingEvaluator = new RankingEvaluator() + .setK(5) + .setNItems(10) + + assert(evaluator.setMetricName("ndcgAt").evaluate(output) > 0.0) + assert(evaluator.setMetricName("fcp").evaluate(output) > 0.0) + assert(evaluator.setMetricName("mrr").evaluate(output) > 0.0) + + val users: DataFrame = spark + .createDataFrame(Seq((1, "item1"), (2, "item2"), ("user4", "item3"))) + .toDF(mixedUserCol, mixedItemCol) + + val recs = recopipeline.stages(0).asInstanceOf[RankingAdapterModel].getRecommenderModel + .asInstanceOf[SARModel].recommendForUserSubset(users, 10) + assert(recs.count == 3) + } } class SARModelSpec extends RankingTestBase with TransformerFuzzing[SARModel] {