diff --git a/connector/examples/scala/LogisticRegressionTesting.ipynb b/connector/examples/scala/LogisticRegressionTesting.ipynb new file mode 100644 index 0000000..cc62aba --- /dev/null +++ b/connector/examples/scala/LogisticRegressionTesting.ipynb @@ -0,0 +1,507 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "*Copyright (c) Microsoft Corporation. All rights reserved.*\n", + "\n", + "*Licensed under the MIT License.*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Baseline Spark Model for AI on Accumulo\n", + "\n", + "\n", + "In this notebook, we test a Spark MLlib logistic regression model using the manual test set provided in [sentiment140](http://help.sentiment140.com/for-students/?source=post_page---------------------------) twitter data. [Microsoft Accumulo Spark Connector (MASC)](https://github.com/microsoft/masc) is used for handling data IO between Accumulo and Spark. \n", + "\n", + "Before running this notebook, please\n", + "* make sure you have Accumulo 2.0.0 and Spark 2.4.3 installed\n", + "* create and activate a conda environment with Apache Toree installed\n", + "* download accumulo-spark-datasource jar and accumulo-spark-iterator jar\n", + "* run commands like the following to install a Jupyter toree kernel\n", + "```\n", + "# Replace the jar file path based on your situation\n", + "JAR=\"file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar\"\n", + "jupyter toree install \\\n", + " --replace \\\n", + " --user \\\n", + " --kernel_name=accumulo \\\n", + " --spark_home=${SPARK_HOME} \\\n", + " --spark_opts=\"--master yarn --jars $JAR \\\n", + " --packages org.apache.spark:spark-avro_2.11:2.4.3,com.microsoft.ml.spark:mmlspark_2.11:0.18.1 \\\n", + " --driver-memory 8g \\\n", + " --executor-memory 6g \\\n", + " --driver-cores 2 \\\n", + " --executor-cores 2 \\\n", + " --num-executors 16\"\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Waiting for a Spark session to start..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Spark version 2.4.3\n", + "Scala version 2.11.12\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "Waiting for a Spark session to start..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(spark.eventLog.enabled,true)\n", + "(spark.repl.local.jars,file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,file:///home/rba1/.ivy2/jars/org.apache.spark_spark-avro_2.11-2.4.3.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,file:///home/rba1/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///home/rba1/.ivy2/jars/org.scalactic_scalactic_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/org.scalatest_scalatest_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/io.spray_spray-json_2.11-1.3.2.jar,file:///home/rba1/.ivy2/jars/com.microsoft.cntk_cntk-2.4.jar,file:///home/rba1/.ivy2/jars/org.openpnp_opencv-3.2.0-1.jar,file:///home/rba1/.ivy2/jars/com.jcraft_jsch-0.1.54.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpclient-4.5.6.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,file:///home/rba1/.ivy2/jars/com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,file:///home/rba1/.ivy2/jars/org.scala-lang_scala-reflect-2.11.12.jar,file:///home/rba1/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpcore-4.4.10.jar,file:///home/rba1/.ivy2/jars/commons-logging_commons-logging-1.2.jar,file:///home/rba1/.ivy2/jars/commons-codec_commons-codec-1.10.jar)\n", + "(spark.driver.host,rbaaccucluster2-0)\n", + "(spark.eventLog.dir,hdfs://rbaaccucluster2/spark/history)\n", + "(spark.app.name,TwitterSentimentClassification)\n", + "(spark.jars,file:/home/rba1/.local/share/jupyter/kernels/accumulo_scala/lib/toree-assembly-0.3.0-incubating.jar)\n", + "(spark.executor.id,driver)\n", + "(spark.executor.instances,16)\n", + "(spark.driver.port,43618)\n", + "(spark.executor.cores,2)\n", + "(spark.ui.proxyBase,/proxy/application_1574703914462_0011)\n", + "(spark.driver.appUIAddress,http://rbaaccucluster2-0:4040)\n", + "(spark.executor.memory,6g)\n", + "(spark.master,yarn)\n", + "(spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS,rbaaccucluster2-0)\n", + "(spark.yarn.dist.jars,file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,file:///home/rba1/.ivy2/jars/org.apache.spark_spark-avro_2.11-2.4.3.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,file:///home/rba1/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///home/rba1/.ivy2/jars/org.scalactic_scalactic_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/org.scalatest_scalatest_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/io.spray_spray-json_2.11-1.3.2.jar,file:///home/rba1/.ivy2/jars/com.microsoft.cntk_cntk-2.4.jar,file:///home/rba1/.ivy2/jars/org.openpnp_opencv-3.2.0-1.jar,file:///home/rba1/.ivy2/jars/com.jcraft_jsch-0.1.54.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpclient-4.5.6.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,file:///home/rba1/.ivy2/jars/com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,file:///home/rba1/.ivy2/jars/org.scala-lang_scala-reflect-2.11.12.jar,file:///home/rba1/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpcore-4.4.10.jar,file:///home/rba1/.ivy2/jars/commons-logging_commons-logging-1.2.jar,file:///home/rba1/.ivy2/jars/commons-codec_commons-codec-1.10.jar)\n", + "(spark.yarn.secondary.jars,accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,org.apache.spark_spark-avro_2.11-2.4.3.jar,com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,org.spark-project.spark_unused-1.0.0.jar,org.scalactic_scalactic_2.11-3.0.5.jar,org.scalatest_scalatest_2.11-3.0.5.jar,io.spray_spray-json_2.11-1.3.2.jar,com.microsoft.cntk_cntk-2.4.jar,org.openpnp_opencv-3.2.0-1.jar,com.jcraft_jsch-0.1.54.jar,org.apache.httpcomponents_httpclient-4.5.6.jar,com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,org.scala-lang_scala-reflect-2.11.12.jar,org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,org.apache.httpcomponents_httpcore-4.4.10.jar,commons-logging_commons-logging-1.2.jar,commons-codec_commons-codec-1.10.jar)\n", + "(spark.history.fs.logDirectory,hdfs://rbaaccucluster2/spark/history)\n", + "(spark.app.id,application_1574703914462_0012)\n", + "(spark.driver.memory,8g)\n", + "(spark.repl.class.outputDir,/tmp/spark-1b7837b6-868e-46a1-b913-ad380765c939/repl-e4a8c358-6a71-4be6-8662-84d916fede29)\n", + "(spark.submit.deployMode,client)\n", + "(spark.repl.class.uri,spark://rbaaccucluster2-0:43618/classes)\n", + "(spark.ui.filters,org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)\n", + "(spark.yarn.historyServer.address,rbaaccucluster2-0:18080)\n", + "(spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES,http://rbaaccucluster2-0:8088/proxy/application_1574703914462_0012)\n" + ] + }, + { + "data": { + "text/plain": [ + "conf = org.apache.spark.SparkConf@67ed829\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "org.apache.spark.SparkConf@67ed829" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.{SparkConf, SparkContext}\n", + "\n", + "// Stop existing spark context and create new one\n", + "sc.stop()\n", + "\n", + "val conf = new SparkConf()\n", + "conf.setAppName(\"TwitterSentimentClassification\")\n", + "\n", + "new SparkContext(conf)\n", + "\n", + "println(\"Spark version %s\".format(sc.version))\n", + "println(\"Scala %s\".format(util.Properties.versionString))\n", + "println\n", + "sc.getConf.getAll.foreach(println)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "PROPS_PATH = /home/rba1/install/accumulo-2.0.0/conf/accumulo-client.properties\n", + "TEST_TABLE_NAME = twitter_test_data\n", + "sqlContext = org.apache.spark.sql.SQLContext@f6c16c1\n", + "schema = StructType(StructField(sentiment,DoubleType,true), StructField(id,StringType,true), StructField(date,StringType,true), StructField(query_string,StringType,true), StructField(user,StringType,true), StructField(text,StringType,true))\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "warning: there was one deprecation warning; re-run with -deprecation for details\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "StructType(StructField(sentiment,DoubleType,true), StructField(id,StringType,true), StructField(date,StringType,true), StructField(query_string,StringType,true), StructField(user,StringType,true), StructField(text,StringType,true))" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.sql.types.{LongType, DoubleType, StringType, StructField, StructType}\n", + "import org.apache.accumulo.core.client.Accumulo\n", + "import scala.collection.JavaConverters._\n", + "\n", + "// client property file path\n", + "val PROPS_PATH = \"/home/rba1/install/accumulo-2.0.0/conf/accumulo-client.properties\"\n", + "val TEST_TABLE_NAME = \"twitter_test_data\"\n", + "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n", + "val schema = StructType(Array(\n", + " StructField(\"sentiment\", DoubleType),\n", + " StructField(\"id\", StringType),\n", + " StructField(\"date\", StringType),\n", + " StructField(\"query_string\", StringType),\n", + " StructField(\"user\", StringType),\n", + " StructField(\"text\", StringType)\n", + "))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Ingest Twitter Data to Accumulo" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Time to ingest twitter data to Accumulo: 7.089090169s\n" + ] + }, + { + "data": { + "text/plain": [ + "file_path = testdata_manual.csv\n", + "df = [sentiment: double, id: string ... 4 more fields]\n", + "t0 = 17497687973225\n", + "props = {auth.type=password, auth.principal=root, table=twitter_test_data, instance.zookeepers=rbaaccucluster2-0:2181,rbaaccucluster2-1:2181,rbaaccucluster2-2:2181, instance.name=muchos, rowKey=id, auth.token=secret}\n", + "t1 = 17504777063394\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "17504777063394" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// need to upload data to hdfs first via \n", + "// hdfs dfs -put /home/rba1/twitter-sentiment/testdata_manual.csv testdata_manual.csv\n", + "val file_path = \"testdata_manual.csv\"\n", + "val df = spark.read.format(\"csv\").schema(schema).load(file_path)\n", + "\n", + "var t0 = System.nanoTime()\n", + "val props = Accumulo.newClientProperties().from(PROPS_PATH).build()\n", + "props.put(\"table\", TEST_TABLE_NAME)\n", + "props.put(\"rowKey\", \"id\")\n", + "df.write.format(\"org.apache.accumulo\").options(props.asScala).save()\n", + "var t1 = System.nanoTime()\n", + "println(\"Time to ingest twitter data to Accumulo: \" + (t1 - t0)*1e-9 + \"s\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load Test Data from Accumulo" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading test data from Accumulo...\n", + "Time to load test data: 3.8040814810000003s\n" + ] + }, + { + "data": { + "text/plain": [ + "t0 = 17506429770434\n", + "test_df = [sentiment: double, id: string ... 5 more fields]\n", + "t1 = 17510233851915\n", + "read_time = 3.8040814810000003\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "3.8040814810000003" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "println(\"Reading test data from Accumulo...\")\n", + "var t0 = System.nanoTime()\n", + "var test_df = spark.read\n", + " .format(\"org.apache.accumulo\")\n", + " .options(props.asScala)\n", + " .schema(schema)\n", + " .load()\n", + "test_df.cache().count()\n", + "var t1 = System.nanoTime()\n", + "val read_time = (t1 - t0)*1e-9\n", + "println(\"Time to load test data: \" + read_time + \"s\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Data Preparation" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "test_df = [label: double, text: string]\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "[label: double, text: string]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.sql.functions.{rand, when}\n", + "\n", + "test_df = test_df.withColumn(\"label\", 'sentiment.cast(\"Int\"))\n", + " .select('label as 'label, 'text as 'text)\n", + " .withColumn(\"label\", when('label > 0, 1.0D).otherwise(0.0D))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load Model" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "lrModel = pipeline_4bbac66c2954\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "pipeline_4bbac66c2954" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.ml.classification.LogisticRegressionModel\n", + "import org.apache.spark.ml.PipelineModel\n", + "\n", + "val lrModel = PipelineModel.load(\"./model/lrModel_twitter_sentiment\")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Time to make prediction: 1.0488603810000001s\n" + ] + }, + { + "data": { + "text/plain": [ + "t0 = 17522302272295\n", + "lrPred = [label: double, text: string ... 5 more fields]\n", + "t1 = 17523351132676\n", + "infer_time = 1.0488603810000001\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "1.0488603810000001" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "var t0 = System.nanoTime()\n", + "val lrPred = lrModel.transform(test_df)\n", + "lrPred.select(\"text\", \"label\", \"prediction\").cache().count()\n", + "var t1 = System.nanoTime()\n", + "val infer_time = (t1 - t0)*1e-9\n", + "println(\"Time to make prediction: \" + infer_time + \"s\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Compute AUC" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Test AUC = 0.779740\n" + ] + }, + { + "data": { + "text/plain": [ + "sparkEvaluator = binEval_20214645f5fa\n", + "test_AUC = 0.7797402185965467\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "0.7797402185965467" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator\n", + "\n", + "val sparkEvaluator = new BinaryClassificationEvaluator()\n", + " .setRawPredictionCol(\"prediction\")\n", + " .setLabelCol(\"label\")\n", + "val test_AUC = sparkEvaluator.evaluate(lrPred)\n", + "println(\"Test AUC = %f\".format(test_AUC))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "accumulo - Scala", + "language": "scala", + "name": "accumulo_scala" + }, + "language_info": { + "codemirror_mode": "text/x-scala", + "file_extension": ".scala", + "mimetype": "text/x-scala", + "name": "scala", + "pygments_lexer": "scala", + "version": "2.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/connector/examples/scala/LogisticRegressionTraining.ipynb b/connector/examples/scala/LogisticRegressionTraining.ipynb new file mode 100644 index 0000000..8fe19f3 --- /dev/null +++ b/connector/examples/scala/LogisticRegressionTraining.ipynb @@ -0,0 +1,489 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "*Copyright (c) Microsoft Corporation. All rights reserved.*\n", + "\n", + "*Licensed under the MIT License.*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Baseline Spark Model for AI on Accumulo\n", + "\n", + "\n", + "In this notebook, we train a Spark MLlib logistic regression model using [sentiment140](http://help.sentiment140.com/for-students/?source=post_page---------------------------) twitter data. [Microsoft Accumulo Spark Connector (MASC)](https://github.com/microsoft/masc) is used for handling data IO between Accumulo and Spark. \n", + "\n", + "Before running this notebook, please\n", + "* make sure you have Accumulo 2.0.0 and Spark 2.4.3 installed\n", + "* create and activate a conda environment with Apache Toree installed\n", + "* download accumulo-spark-datasource jar and accumulo-spark-iterator jar\n", + "* run commands like the following to install a Jupyter toree kernel\n", + "```\n", + "# Replace the jar file path based on your situation\n", + "JAR=\"file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar\"\n", + "jupyter toree install \\\n", + " --replace \\\n", + " --user \\\n", + " --kernel_name=accumulo \\\n", + " --spark_home=${SPARK_HOME} \\\n", + " --spark_opts=\"--master yarn --jars $JAR \\\n", + " --packages org.apache.spark:spark-avro_2.11:2.4.3,com.microsoft.ml.spark:mmlspark_2.11:0.18.1 \\\n", + " --driver-memory 8g \\\n", + " --executor-memory 6g \\\n", + " --driver-cores 2 \\\n", + " --executor-cores 2 \\\n", + " --num-executors 16\"\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Waiting for a Spark session to start..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Spark version 2.4.3\n", + "Scala version 2.11.12\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "Waiting for a Spark session to start..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(spark.eventLog.enabled,true)\n", + "(spark.driver.port,41187)\n", + "(spark.repl.local.jars,file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,file:///home/rba1/.ivy2/jars/org.apache.spark_spark-avro_2.11-2.4.3.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,file:///home/rba1/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///home/rba1/.ivy2/jars/org.scalactic_scalactic_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/org.scalatest_scalatest_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/io.spray_spray-json_2.11-1.3.2.jar,file:///home/rba1/.ivy2/jars/com.microsoft.cntk_cntk-2.4.jar,file:///home/rba1/.ivy2/jars/org.openpnp_opencv-3.2.0-1.jar,file:///home/rba1/.ivy2/jars/com.jcraft_jsch-0.1.54.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpclient-4.5.6.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,file:///home/rba1/.ivy2/jars/com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,file:///home/rba1/.ivy2/jars/org.scala-lang_scala-reflect-2.11.12.jar,file:///home/rba1/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpcore-4.4.10.jar,file:///home/rba1/.ivy2/jars/commons-logging_commons-logging-1.2.jar,file:///home/rba1/.ivy2/jars/commons-codec_commons-codec-1.10.jar)\n", + "(spark.driver.host,rbaaccucluster2-0)\n", + "(spark.eventLog.dir,hdfs://rbaaccucluster2/spark/history)\n", + "(spark.app.name,TwitterSentimentClassification)\n", + "(spark.jars,file:/home/rba1/.local/share/jupyter/kernels/accumulo_scala/lib/toree-assembly-0.3.0-incubating.jar)\n", + "(spark.app.id,application_1574703914462_0010)\n", + "(spark.executor.id,driver)\n", + "(spark.executor.instances,16)\n", + "(spark.driver.appUIAddress,http://rbaaccucluster2-0:4040)\n", + "(spark.executor.cores,2)\n", + "(spark.repl.class.uri,spark://rbaaccucluster2-0:41187/classes)\n", + "(spark.executor.memory,6g)\n", + "(spark.master,yarn)\n", + "(spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS,rbaaccucluster2-0)\n", + "(spark.yarn.dist.jars,file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,file:///home/rba1/.ivy2/jars/org.apache.spark_spark-avro_2.11-2.4.3.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,file:///home/rba1/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///home/rba1/.ivy2/jars/org.scalactic_scalactic_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/org.scalatest_scalatest_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/io.spray_spray-json_2.11-1.3.2.jar,file:///home/rba1/.ivy2/jars/com.microsoft.cntk_cntk-2.4.jar,file:///home/rba1/.ivy2/jars/org.openpnp_opencv-3.2.0-1.jar,file:///home/rba1/.ivy2/jars/com.jcraft_jsch-0.1.54.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpclient-4.5.6.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,file:///home/rba1/.ivy2/jars/com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,file:///home/rba1/.ivy2/jars/org.scala-lang_scala-reflect-2.11.12.jar,file:///home/rba1/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpcore-4.4.10.jar,file:///home/rba1/.ivy2/jars/commons-logging_commons-logging-1.2.jar,file:///home/rba1/.ivy2/jars/commons-codec_commons-codec-1.10.jar)\n", + "(spark.yarn.secondary.jars,accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,org.apache.spark_spark-avro_2.11-2.4.3.jar,com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,org.spark-project.spark_unused-1.0.0.jar,org.scalactic_scalactic_2.11-3.0.5.jar,org.scalatest_scalatest_2.11-3.0.5.jar,io.spray_spray-json_2.11-1.3.2.jar,com.microsoft.cntk_cntk-2.4.jar,org.openpnp_opencv-3.2.0-1.jar,com.jcraft_jsch-0.1.54.jar,org.apache.httpcomponents_httpclient-4.5.6.jar,com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,org.scala-lang_scala-reflect-2.11.12.jar,org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,org.apache.httpcomponents_httpcore-4.4.10.jar,commons-logging_commons-logging-1.2.jar,commons-codec_commons-codec-1.10.jar)\n", + "(spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES,http://rbaaccucluster2-0:8088/proxy/application_1574703914462_0010)\n", + "(spark.repl.class.outputDir,/tmp/spark-5fa440df-d9f2-4249-800e-292c5f42435e/repl-c02b6092-3f21-4a43-9276-2acb332b8768)\n", + "(spark.history.fs.logDirectory,hdfs://rbaaccucluster2/spark/history)\n", + "(spark.driver.memory,8g)\n", + "(spark.submit.deployMode,client)\n", + "(spark.ui.proxyBase,/proxy/application_1574703914462_0009)\n", + "(spark.ui.filters,org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)\n", + "(spark.yarn.historyServer.address,rbaaccucluster2-0:18080)\n" + ] + }, + { + "data": { + "text/plain": [ + "conf = org.apache.spark.SparkConf@3f3f7d39\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "org.apache.spark.SparkConf@3f3f7d39" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.{SparkConf, SparkContext}\n", + "\n", + "// Stop existing spark context and create new one\n", + "sc.stop()\n", + "\n", + "val conf = new SparkConf()\n", + "conf.setAppName(\"TwitterSentimentClassification\")\n", + "\n", + "new SparkContext(conf)\n", + "\n", + "println(\"Spark version %s\".format(sc.version))\n", + "println(\"Scala %s\".format(util.Properties.versionString))\n", + "println\n", + "sc.getConf.getAll.foreach(println)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "PROPS_PATH = /home/rba1/install/accumulo-2.0.0/conf/accumulo-client.properties\n", + "TRAIN_TABLE_NAME = twitter_train_data\n", + "sqlContext = org.apache.spark.sql.SQLContext@7d175c9d\n", + "schema = StructType(StructField(sentiment,DoubleType,true), StructField(id,StringType,true), StructField(date,StringType,true), StructField(query_string,StringType,true), StructField(user,StringType,true), StructField(text,StringType,true))\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "warning: there was one deprecation warning; re-run with -deprecation for details\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "StructType(StructField(sentiment,DoubleType,true), StructField(id,StringType,true), StructField(date,StringType,true), StructField(query_string,StringType,true), StructField(user,StringType,true), StructField(text,StringType,true))" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.sql.types.{LongType, DoubleType, StringType, StructField, StructType}\n", + "import org.apache.accumulo.core.client.Accumulo\n", + "import scala.collection.JavaConverters._\n", + "\n", + "// client property file path\n", + "val PROPS_PATH = \"/home/rba1/install/accumulo-2.0.0/conf/accumulo-client.properties\"\n", + "val TRAIN_TABLE_NAME = \"twitter_train_data\"\n", + "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n", + "val schema = StructType(Array(\n", + " StructField(\"sentiment\", DoubleType),\n", + " StructField(\"id\", StringType),\n", + " StructField(\"date\", StringType),\n", + " StructField(\"query_string\", StringType),\n", + " StructField(\"user\", StringType),\n", + " StructField(\"text\", StringType)\n", + "))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Ingest Twitter Data to Accumulo" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Time to ingest twitter data to Accumulo: 25.651309321000003s\n" + ] + }, + { + "data": { + "text/plain": [ + "file_path = sentiment140_prefix.csv\n", + "df = [sentiment: double, id: string ... 4 more fields]\n", + "t0 = 17142067724217\n", + "props = {auth.type=password, auth.principal=root, table=twitter_train_data, instance.zookeepers=rbaaccucluster2-0:2181,rbaaccucluster2-1:2181,rbaaccucluster2-2:2181, instance.name=muchos, rowKey=id, auth.token=secret}\n", + "t1 = 17167719033538\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "17167719033538" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// need to upload data to hdfs first via \n", + "// hdfs dfs -put /home/rba1/twitter-sentiment/sentiment140_prefix.csv sentiment140_prefix.csv\n", + "val file_path = \"sentiment140_prefix.csv\"\n", + "val df = spark.read.format(\"csv\").schema(schema).load(file_path)\n", + "\n", + "var t0 = System.nanoTime()\n", + "val props = Accumulo.newClientProperties().from(PROPS_PATH).build()\n", + "props.put(\"table\", TRAIN_TABLE_NAME)\n", + "props.put(\"rowKey\", \"id\")\n", + "df.write.format(\"org.apache.accumulo\").options(props.asScala).save()\n", + "var t1 = System.nanoTime()\n", + "println(\"Time to ingest twitter data to Accumulo: \" + (t1 - t0)*1e-9 + \"s\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load Training Data from Accumulo" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading training data from Accumulo...\n", + "Time to load training data: 22.800518474s\n" + ] + }, + { + "data": { + "text/plain": [ + "t0 = 17169477786088\n", + "train_df = [sentiment: double, id: string ... 5 more fields]\n", + "t1 = 17192278304562\n", + "read_time = 22.800518474\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "22.800518474" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "println(\"Reading training data from Accumulo...\")\n", + "var t0 = System.nanoTime()\n", + "var train_df = spark.read\n", + " .format(\"org.apache.accumulo\")\n", + " .options(props.asScala)\n", + " .schema(schema)\n", + " .load()\n", + "train_df.cache().count()\n", + "var t1 = System.nanoTime()\n", + "val read_time = (t1 - t0)*1e-9\n", + "println(\"Time to load training data: \" + read_time + \"s\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Data Preparation" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "train_df = [label: double, text: string]\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "[label: double, text: string]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.sql.functions.{rand, when}\n", + "\n", + "train_df = train_df.orderBy(rand()) // Randomly permute the data for online training\n", + " .withColumn(\"label\", 'sentiment.cast(\"Int\"))\n", + " .select('label as 'label, 'text as 'text)\n", + " .withColumn(\"label\", when('label > 0, 1.0D).otherwise(0.0D))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Feature Engineering and Model Training" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "tokenizer = regexTok_3b80a218d04e\n", + "hashingTF = hashingTF_c9a0382c9e47\n", + "lr = logreg_ea82587065e1\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "logreg_ea82587065e1" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.ml.feature.{RegexTokenizer, HashingTF}\n", + "import org.apache.spark.ml.classification.LogisticRegression\n", + "import org.apache.spark.ml.Pipeline\n", + "import scala.math.pow\n", + "\n", + "val tokenizer = new RegexTokenizer()\n", + " .setGaps(false)\n", + " .setPattern(\"\\\\p{L}+\")\n", + " .setInputCol(\"text\")\n", + " .setOutputCol(\"words\")\n", + "\n", + "val hashingTF = new HashingTF()\n", + " .setInputCol(\"words\")\n", + " .setOutputCol(\"features\")\n", + " .setNumFeatures(pow(2, 18).toInt)\n", + "\n", + "val lr = new LogisticRegression()\n", + " .setMaxIter(1)\n", + " .setRegParam(0.2)\n", + " .setElasticNetParam(0.0)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Time to train logistic regression model: 29.547905881000002s\n" + ] + }, + { + "data": { + "text/plain": [ + "lr_pipeline = pipeline_4bbac66c2954\n", + "t0 = 17196179663420\n", + "lrModel = pipeline_4bbac66c2954\n", + "t1 = 17225727569301\n", + "train_time = 29.547905881000002\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "29.547905881000002" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// define a training pipeline\n", + "val lr_pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr))\n", + "\n", + "var t0 = System.nanoTime()\n", + "val lrModel = lr_pipeline.fit(train_df)\n", + "var t1 = System.nanoTime()\n", + "val train_time = (t1 - t0)*1e-9\n", + "println(\"Time to train logistic regression model: \" + train_time + \"s\")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "// Save model to hdfs\n", + "lrModel.write.overwrite().save(\"./model/lrModel_twitter_sentiment\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "accumulo - Scala", + "language": "scala", + "name": "accumulo_scala" + }, + "language_info": { + "codemirror_mode": "text/x-scala", + "file_extension": ".scala", + "mimetype": "text/x-scala", + "name": "scala", + "pygments_lexer": "scala", + "version": "2.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/connector/examples/scala/VowpalWabbitTesting.ipynb b/connector/examples/scala/VowpalWabbitTesting.ipynb new file mode 100644 index 0000000..b792ca8 --- /dev/null +++ b/connector/examples/scala/VowpalWabbitTesting.ipynb @@ -0,0 +1,507 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "*Copyright (c) Microsoft Corporation. All rights reserved.*\n", + "\n", + "*Licensed under the MIT License.*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# VW MMLSpark Model for AI on Accumulo\n", + "\n", + "\n", + "In this notebook, we test a Vowpal Wabbit model in MMLSpark using the manual test set provided in [sentiment140](http://help.sentiment140.com/for-students/?source=post_page---------------------------) twitter data. [Microsoft Accumulo Spark Connector (MASC)](https://github.com/microsoft/masc) is used for handling data IO between Accumulo and Spark. \n", + "\n", + "Before running this notebook, please\n", + "* make sure you have Accumulo 2.0.0 and Spark 2.4.3 installed\n", + "* create and activate a conda environment with Apache Toree installed\n", + "* download accumulo-spark-datasource jar and accumulo-spark-iterator jar\n", + "* run commands like the following to install a Jupyter toree kernel\n", + "```\n", + "# Replace the jar file path based on your situation\n", + "JAR=\"file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar\"\n", + "jupyter toree install \\\n", + " --replace \\\n", + " --user \\\n", + " --kernel_name=accumulo \\\n", + " --spark_home=${SPARK_HOME} \\\n", + " --spark_opts=\"--master yarn --jars $JAR \\\n", + " --packages org.apache.spark:spark-avro_2.11:2.4.3,com.microsoft.ml.spark:mmlspark_2.11:0.18.1 \\\n", + " --driver-memory 8g \\\n", + " --executor-memory 6g \\\n", + " --driver-cores 2 \\\n", + " --executor-cores 2 \\\n", + " --num-executors 16\"\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Waiting for a Spark session to start..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Spark version 2.4.3\n", + "Scala version 2.11.12\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "Waiting for a Spark session to start..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(spark.eventLog.enabled,true)\n", + "(spark.repl.local.jars,file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,file:///home/rba1/.ivy2/jars/org.apache.spark_spark-avro_2.11-2.4.3.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,file:///home/rba1/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///home/rba1/.ivy2/jars/org.scalactic_scalactic_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/org.scalatest_scalatest_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/io.spray_spray-json_2.11-1.3.2.jar,file:///home/rba1/.ivy2/jars/com.microsoft.cntk_cntk-2.4.jar,file:///home/rba1/.ivy2/jars/org.openpnp_opencv-3.2.0-1.jar,file:///home/rba1/.ivy2/jars/com.jcraft_jsch-0.1.54.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpclient-4.5.6.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,file:///home/rba1/.ivy2/jars/com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,file:///home/rba1/.ivy2/jars/org.scala-lang_scala-reflect-2.11.12.jar,file:///home/rba1/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpcore-4.4.10.jar,file:///home/rba1/.ivy2/jars/commons-logging_commons-logging-1.2.jar,file:///home/rba1/.ivy2/jars/commons-codec_commons-codec-1.10.jar)\n", + "(spark.ui.proxyBase,/proxy/application_1574703914462_0017)\n", + "(spark.repl.class.uri,spark://rbaaccucluster2-0:32956/classes)\n", + "(spark.driver.host,rbaaccucluster2-0)\n", + "(spark.eventLog.dir,hdfs://rbaaccucluster2/spark/history)\n", + "(spark.app.name,TwitterSentimentClassification)\n", + "(spark.jars,file:/home/rba1/.local/share/jupyter/kernels/accumulo_scala/lib/toree-assembly-0.3.0-incubating.jar)\n", + "(spark.executor.id,driver)\n", + "(spark.repl.class.outputDir,/tmp/spark-432ecf38-b9e8-4f71-8167-804c44347573/repl-6264d938-7c5a-493f-af70-311fa661f3f8)\n", + "(spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES,http://rbaaccucluster2-0:8088/proxy/application_1574703914462_0018)\n", + "(spark.executor.instances,16)\n", + "(spark.executor.cores,2)\n", + "(spark.driver.port,32956)\n", + "(spark.driver.appUIAddress,http://rbaaccucluster2-0:4040)\n", + "(spark.app.id,application_1574703914462_0018)\n", + "(spark.executor.memory,6g)\n", + "(spark.master,yarn)\n", + "(spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS,rbaaccucluster2-0)\n", + "(spark.yarn.dist.jars,file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,file:///home/rba1/.ivy2/jars/org.apache.spark_spark-avro_2.11-2.4.3.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,file:///home/rba1/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///home/rba1/.ivy2/jars/org.scalactic_scalactic_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/org.scalatest_scalatest_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/io.spray_spray-json_2.11-1.3.2.jar,file:///home/rba1/.ivy2/jars/com.microsoft.cntk_cntk-2.4.jar,file:///home/rba1/.ivy2/jars/org.openpnp_opencv-3.2.0-1.jar,file:///home/rba1/.ivy2/jars/com.jcraft_jsch-0.1.54.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpclient-4.5.6.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,file:///home/rba1/.ivy2/jars/com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,file:///home/rba1/.ivy2/jars/org.scala-lang_scala-reflect-2.11.12.jar,file:///home/rba1/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpcore-4.4.10.jar,file:///home/rba1/.ivy2/jars/commons-logging_commons-logging-1.2.jar,file:///home/rba1/.ivy2/jars/commons-codec_commons-codec-1.10.jar)\n", + "(spark.yarn.secondary.jars,accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,org.apache.spark_spark-avro_2.11-2.4.3.jar,com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,org.spark-project.spark_unused-1.0.0.jar,org.scalactic_scalactic_2.11-3.0.5.jar,org.scalatest_scalatest_2.11-3.0.5.jar,io.spray_spray-json_2.11-1.3.2.jar,com.microsoft.cntk_cntk-2.4.jar,org.openpnp_opencv-3.2.0-1.jar,com.jcraft_jsch-0.1.54.jar,org.apache.httpcomponents_httpclient-4.5.6.jar,com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,org.scala-lang_scala-reflect-2.11.12.jar,org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,org.apache.httpcomponents_httpcore-4.4.10.jar,commons-logging_commons-logging-1.2.jar,commons-codec_commons-codec-1.10.jar)\n", + "(spark.history.fs.logDirectory,hdfs://rbaaccucluster2/spark/history)\n", + "(spark.driver.memory,8g)\n", + "(spark.submit.deployMode,client)\n", + "(spark.ui.filters,org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)\n", + "(spark.yarn.historyServer.address,rbaaccucluster2-0:18080)\n" + ] + }, + { + "data": { + "text/plain": [ + "conf = org.apache.spark.SparkConf@4d03b080\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "org.apache.spark.SparkConf@4d03b080" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.{SparkConf, SparkContext}\n", + "\n", + "// Stop existing spark context and create new one\n", + "sc.stop()\n", + "\n", + "val conf = new SparkConf()\n", + "conf.setAppName(\"TwitterSentimentClassification\")\n", + "\n", + "new SparkContext(conf)\n", + "\n", + "println(\"Spark version %s\".format(sc.version))\n", + "println(\"Scala %s\".format(util.Properties.versionString))\n", + "println\n", + "sc.getConf.getAll.foreach(println)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "PROPS_PATH = /home/rba1/install/accumulo-2.0.0/conf/accumulo-client.properties\n", + "TEST_TABLE_NAME = twitter_test_data\n", + "sqlContext = org.apache.spark.sql.SQLContext@2eed267d\n", + "schema = StructType(StructField(sentiment,DoubleType,true), StructField(id,StringType,true), StructField(date,StringType,true), StructField(query_string,StringType,true), StructField(user,StringType,true), StructField(text,StringType,true))\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "warning: there was one deprecation warning; re-run with -deprecation for details\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "StructType(StructField(sentiment,DoubleType,true), StructField(id,StringType,true), StructField(date,StringType,true), StructField(query_string,StringType,true), StructField(user,StringType,true), StructField(text,StringType,true))" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.sql.types.{LongType, DoubleType, StringType, StructField, StructType}\n", + "import org.apache.accumulo.core.client.Accumulo\n", + "import scala.collection.JavaConverters._\n", + "\n", + "// client property file path\n", + "val PROPS_PATH = \"/home/rba1/install/accumulo-2.0.0/conf/accumulo-client.properties\"\n", + "val TEST_TABLE_NAME = \"twitter_test_data\"\n", + "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n", + "val schema = StructType(Array(\n", + " StructField(\"sentiment\", DoubleType),\n", + " StructField(\"id\", StringType),\n", + " StructField(\"date\", StringType),\n", + " StructField(\"query_string\", StringType),\n", + " StructField(\"user\", StringType),\n", + " StructField(\"text\", StringType)\n", + "))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Ingest Twitter Data to Accumulo" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Time to ingest twitter data to Accumulo: 4.65183017s\n" + ] + }, + { + "data": { + "text/plain": [ + "file_path = testdata_manual.csv\n", + "df = [sentiment: double, id: string ... 4 more fields]\n", + "t0 = 19161648859768\n", + "props = {auth.type=password, auth.principal=root, table=twitter_test_data, instance.zookeepers=rbaaccucluster2-0:2181,rbaaccucluster2-1:2181,rbaaccucluster2-2:2181, instance.name=muchos, rowKey=id, auth.token=secret}\n", + "t1 = 19166300689938\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "19166300689938" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// need to upload data to hdfs first via \n", + "// hdfs dfs -put /home/rba1/twitter-sentiment/testdata_manual.csv testdata_manual.csv\n", + "val file_path = \"testdata_manual.csv\"\n", + "val df = spark.read.format(\"csv\").schema(schema).load(file_path)\n", + "\n", + "var t0 = System.nanoTime()\n", + "val props = Accumulo.newClientProperties().from(PROPS_PATH).build()\n", + "props.put(\"table\", TEST_TABLE_NAME)\n", + "props.put(\"rowKey\", \"id\")\n", + "df.write.format(\"org.apache.accumulo\").options(props.asScala).save()\n", + "var t1 = System.nanoTime()\n", + "println(\"Time to ingest twitter data to Accumulo: \" + (t1 - t0)*1e-9 + \"s\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load Test Data from Accumulo" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading test data from Accumulo...\n", + "Time to load test data: 3.6648699220000003s\n" + ] + }, + { + "data": { + "text/plain": [ + "t0 = 19168230563355\n", + "test_df = [sentiment: double, id: string ... 5 more fields]\n", + "t1 = 19171895433277\n", + "read_time = 3.6648699220000003\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "3.6648699220000003" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "println(\"Reading test data from Accumulo...\")\n", + "var t0 = System.nanoTime()\n", + "var test_df = spark.read\n", + " .format(\"org.apache.accumulo\")\n", + " .options(props.asScala)\n", + " .schema(schema)\n", + " .load()\n", + "test_df.cache().count()\n", + "var t1 = System.nanoTime()\n", + "val read_time = (t1 - t0)*1e-9\n", + "println(\"Time to load test data: \" + read_time + \"s\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Data Preparation" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "test_df = [label: double, text: string]\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "[label: double, text: string]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.sql.functions.{rand, when}\n", + "\n", + "test_df = test_df.withColumn(\"label\", 'sentiment.cast(\"Int\"))\n", + " .select('label as 'label, 'text as 'text)\n", + " .withColumn(\"label\", when('label > 0, 1.0D).otherwise(0.0D))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load Model" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "vwModel = pipeline_3d0899e64c3e\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "pipeline_3d0899e64c3e" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.ml.PipelineModel\n", + "import com.microsoft.ml.spark.vw.VowpalWabbitClassificationModel\n", + "\n", + "val vwModel = PipelineModel.load(\"./model/vwModel_twitter_sentiment\")" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Time to make prediction: 2.606229307s\n" + ] + }, + { + "data": { + "text/plain": [ + "t0 = 19180899641455\n", + "vwPred = [label: double, text: string ... 4 more fields]\n", + "t1 = 19183505870762\n", + "infer_time = 2.606229307\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "2.606229307" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "var t0 = System.nanoTime()\n", + "val vwPred = vwModel.transform(test_df)\n", + "vwPred.select(\"text\", \"label\", \"prediction\").cache().count()\n", + "var t1 = System.nanoTime()\n", + "val infer_time = (t1 - t0)*1e-9\n", + "println(\"Time to make prediction: \" + infer_time + \"s\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Compute AUC" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Test AUC = 0.792175\n" + ] + }, + { + "data": { + "text/plain": [ + "sparkEvaluator = binEval_5c9a350032fc\n", + "test_AUC = 0.7921748772374465\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "0.7921748772374465" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator\n", + "\n", + "val sparkEvaluator = new BinaryClassificationEvaluator()\n", + " .setRawPredictionCol(\"prediction\")\n", + " .setLabelCol(\"label\")\n", + "val test_AUC = sparkEvaluator.evaluate(vwPred)\n", + "println(\"Test AUC = %f\".format(test_AUC))" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "accumulo - Scala", + "language": "scala", + "name": "accumulo_scala" + }, + "language_info": { + "codemirror_mode": "text/x-scala", + "file_extension": ".scala", + "mimetype": "text/x-scala", + "name": "scala", + "pygments_lexer": "scala", + "version": "2.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/connector/examples/scala/VowpalWabbitTraining.ipynb b/connector/examples/scala/VowpalWabbitTraining.ipynb new file mode 100644 index 0000000..08f2c29 --- /dev/null +++ b/connector/examples/scala/VowpalWabbitTraining.ipynb @@ -0,0 +1,481 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "*Copyright (c) Microsoft Corporation. All rights reserved.*\n", + "\n", + "*Licensed under the MIT License.*" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# VW MMLSpark Model for AI on Accumulo\n", + "\n", + "\n", + "In this notebook, we train a Vowpal Wabbit model in MMLSpark using [sentiment140](http://help.sentiment140.com/for-students/?source=post_page---------------------------) twitter data. [Microsoft Accumulo Spark Connector (MASC)](https://github.com/microsoft/masc) is used for handling data IO between Accumulo and Spark. \n", + "\n", + "Before running this notebook, please\n", + "* make sure you have Accumulo 2.0.0 and Spark 2.4.3 installed\n", + "* create and activate a conda environment with Apache Toree installed\n", + "* download accumulo-spark-datasource jar and accumulo-spark-iterator jar\n", + "* run commands like the following to install a Jupyter toree kernel\n", + "```\n", + "# Replace the jar file path based on your situation\n", + "JAR=\"file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar\"\n", + "jupyter toree install \\\n", + " --replace \\\n", + " --user \\\n", + " --kernel_name=accumulo \\\n", + " --spark_home=${SPARK_HOME} \\\n", + " --spark_opts=\"--master yarn --jars $JAR \\\n", + " --packages org.apache.spark:spark-avro_2.11:2.4.3,com.microsoft.ml.spark:mmlspark_2.11:0.18.1 \\\n", + " --driver-memory 8g \\\n", + " --executor-memory 6g \\\n", + " --driver-cores 2 \\\n", + " --executor-cores 2 \\\n", + " --num-executors 16\"\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Waiting for a Spark session to start..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Spark version 2.4.3\n", + "Scala version 2.11.12\n", + "\n" + ] + }, + { + "data": { + "text/plain": [ + "Waiting for a Spark session to start..." + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "(spark.eventLog.enabled,true)\n", + "(spark.repl.local.jars,file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,file:///home/rba1/.ivy2/jars/org.apache.spark_spark-avro_2.11-2.4.3.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,file:///home/rba1/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///home/rba1/.ivy2/jars/org.scalactic_scalactic_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/org.scalatest_scalatest_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/io.spray_spray-json_2.11-1.3.2.jar,file:///home/rba1/.ivy2/jars/com.microsoft.cntk_cntk-2.4.jar,file:///home/rba1/.ivy2/jars/org.openpnp_opencv-3.2.0-1.jar,file:///home/rba1/.ivy2/jars/com.jcraft_jsch-0.1.54.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpclient-4.5.6.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,file:///home/rba1/.ivy2/jars/com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,file:///home/rba1/.ivy2/jars/org.scala-lang_scala-reflect-2.11.12.jar,file:///home/rba1/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpcore-4.4.10.jar,file:///home/rba1/.ivy2/jars/commons-logging_commons-logging-1.2.jar,file:///home/rba1/.ivy2/jars/commons-codec_commons-codec-1.10.jar)\n", + "(spark.repl.class.uri,spark://rbaaccucluster2-0:44299/classes)\n", + "(spark.driver.host,rbaaccucluster2-0)\n", + "(spark.eventLog.dir,hdfs://rbaaccucluster2/spark/history)\n", + "(spark.app.name,TwitterSentimentClassification)\n", + "(spark.jars,file:/home/rba1/.local/share/jupyter/kernels/accumulo_scala/lib/toree-assembly-0.3.0-incubating.jar)\n", + "(spark.executor.id,driver)\n", + "(spark.executor.instances,16)\n", + "(spark.driver.appUIAddress,http://rbaaccucluster2-0:4040)\n", + "(spark.executor.cores,2)\n", + "(spark.driver.port,44299)\n", + "(spark.executor.memory,6g)\n", + "(spark.master,yarn)\n", + "(spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS,rbaaccucluster2-0)\n", + "(spark.yarn.dist.jars,file:///home/rba1/twitter-sentiment/lib/accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,file:///home/rba1/.ivy2/jars/org.apache.spark_spark-avro_2.11-2.4.3.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,file:///home/rba1/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///home/rba1/.ivy2/jars/org.scalactic_scalactic_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/org.scalatest_scalatest_2.11-3.0.5.jar,file:///home/rba1/.ivy2/jars/io.spray_spray-json_2.11-1.3.2.jar,file:///home/rba1/.ivy2/jars/com.microsoft.cntk_cntk-2.4.jar,file:///home/rba1/.ivy2/jars/org.openpnp_opencv-3.2.0-1.jar,file:///home/rba1/.ivy2/jars/com.jcraft_jsch-0.1.54.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpclient-4.5.6.jar,file:///home/rba1/.ivy2/jars/com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,file:///home/rba1/.ivy2/jars/com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,file:///home/rba1/.ivy2/jars/org.scala-lang_scala-reflect-2.11.12.jar,file:///home/rba1/.ivy2/jars/org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,file:///home/rba1/.ivy2/jars/org.apache.httpcomponents_httpcore-4.4.10.jar,file:///home/rba1/.ivy2/jars/commons-logging_commons-logging-1.2.jar,file:///home/rba1/.ivy2/jars/commons-codec_commons-codec-1.10.jar)\n", + "(spark.yarn.secondary.jars,accumulo-spark-datasource-1.0.0-SNAPSHOT-shaded.jar,org.apache.spark_spark-avro_2.11-2.4.3.jar,com.microsoft.ml.spark_mmlspark_2.11-0.18.1.jar,org.spark-project.spark_unused-1.0.0.jar,org.scalactic_scalactic_2.11-3.0.5.jar,org.scalatest_scalatest_2.11-3.0.5.jar,io.spray_spray-json_2.11-1.3.2.jar,com.microsoft.cntk_cntk-2.4.jar,org.openpnp_opencv-3.2.0-1.jar,com.jcraft_jsch-0.1.54.jar,org.apache.httpcomponents_httpclient-4.5.6.jar,com.microsoft.ml.lightgbm_lightgbmlib-2.2.350.jar,com.github.vowpalwabbit_vw-jni-8.7.0.2.jar,org.scala-lang_scala-reflect-2.11.12.jar,org.scala-lang.modules_scala-xml_2.11-1.0.6.jar,org.apache.httpcomponents_httpcore-4.4.10.jar,commons-logging_commons-logging-1.2.jar,commons-codec_commons-codec-1.10.jar)\n", + "(spark.ui.proxyBase,/proxy/application_1574703914462_0013)\n", + "(spark.history.fs.logDirectory,hdfs://rbaaccucluster2/spark/history)\n", + "(spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES,http://rbaaccucluster2-0:8088/proxy/application_1574703914462_0014)\n", + "(spark.driver.memory,8g)\n", + "(spark.app.id,application_1574703914462_0014)\n", + "(spark.submit.deployMode,client)\n", + "(spark.ui.filters,org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)\n", + "(spark.yarn.historyServer.address,rbaaccucluster2-0:18080)\n", + "(spark.repl.class.outputDir,/tmp/spark-d2e52880-61e4-46e6-a918-5e2a333695e1/repl-ddfc57e1-7aeb-43e0-8606-db2f241d25cf)\n" + ] + }, + { + "data": { + "text/plain": [ + "conf = org.apache.spark.SparkConf@3440396b\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "org.apache.spark.SparkConf@3440396b" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.{SparkConf, SparkContext}\n", + "\n", + "// Stop existing spark context and create new one\n", + "sc.stop()\n", + "\n", + "val conf = new SparkConf()\n", + "conf.setAppName(\"TwitterSentimentClassification\")\n", + "\n", + "new SparkContext(conf)\n", + "\n", + "println(\"Spark version %s\".format(sc.version))\n", + "println(\"Scala %s\".format(util.Properties.versionString))\n", + "println\n", + "sc.getConf.getAll.foreach(println)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "PROPS_PATH = /home/rba1/install/accumulo-2.0.0/conf/accumulo-client.properties\n", + "TRAIN_TABLE_NAME = twitter_train_data\n", + "sqlContext = org.apache.spark.sql.SQLContext@74dbb18b\n", + "schema = StructType(StructField(sentiment,DoubleType,true), StructField(id,StringType,true), StructField(date,StringType,true), StructField(query_string,StringType,true), StructField(user,StringType,true), StructField(text,StringType,true))\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "warning: there was one deprecation warning; re-run with -deprecation for details\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "StructType(StructField(sentiment,DoubleType,true), StructField(id,StringType,true), StructField(date,StringType,true), StructField(query_string,StringType,true), StructField(user,StringType,true), StructField(text,StringType,true))" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.sql.types.{LongType, DoubleType, StringType, StructField, StructType}\n", + "import org.apache.accumulo.core.client.Accumulo\n", + "import scala.collection.JavaConverters._\n", + "\n", + "// client property file path\n", + "val PROPS_PATH = \"/home/rba1/install/accumulo-2.0.0/conf/accumulo-client.properties\"\n", + "val TRAIN_TABLE_NAME = \"twitter_train_data\"\n", + "val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n", + "val schema = StructType(Array(\n", + " StructField(\"sentiment\", DoubleType),\n", + " StructField(\"id\", StringType),\n", + " StructField(\"date\", StringType),\n", + " StructField(\"query_string\", StringType),\n", + " StructField(\"user\", StringType),\n", + " StructField(\"text\", StringType)\n", + "))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Ingest Twitter Data to Accumulo" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Time to ingest twitter data to Accumulo: 22.002847971s\n" + ] + }, + { + "data": { + "text/plain": [ + "file_path = sentiment140_prefix.csv\n", + "df = [sentiment: double, id: string ... 4 more fields]\n", + "t0 = 18420090028815\n", + "props = {auth.type=password, auth.principal=root, table=twitter_train_data, instance.zookeepers=rbaaccucluster2-0:2181,rbaaccucluster2-1:2181,rbaaccucluster2-2:2181, instance.name=muchos, rowKey=id, auth.token=secret}\n", + "t1 = 18442092876786\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "18442092876786" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// need to upload data to hdfs first via \n", + "// hdfs dfs -put /home/rba1/twitter-sentiment/sentiment140_prefix.csv sentiment140_prefix.csv\n", + "val file_path = \"sentiment140_prefix.csv\"\n", + "val df = spark.read.format(\"csv\").schema(schema).load(file_path)\n", + "\n", + "var t0 = System.nanoTime()\n", + "val props = Accumulo.newClientProperties().from(PROPS_PATH).build()\n", + "props.put(\"table\", TRAIN_TABLE_NAME)\n", + "props.put(\"rowKey\", \"id\")\n", + "df.write.format(\"org.apache.accumulo\").options(props.asScala).save()\n", + "var t1 = System.nanoTime()\n", + "println(\"Time to ingest twitter data to Accumulo: \" + (t1 - t0)*1e-9 + \"s\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Load Training Data from Accumulo" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading training data from Accumulo...\n", + "Time to load training data: 34.577519024000004s\n" + ] + }, + { + "data": { + "text/plain": [ + "t0 = 18443853066868\n", + "train_df = [sentiment: double, id: string ... 5 more fields]\n", + "t1 = 18478430585892\n", + "read_time = 34.577519024000004\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "34.577519024000004" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "println(\"Reading training data from Accumulo...\")\n", + "var t0 = System.nanoTime()\n", + "var train_df = spark.read\n", + " .format(\"org.apache.accumulo\")\n", + " .options(props.asScala)\n", + " .schema(schema)\n", + " .load()\n", + "train_df.cache().count()\n", + "var t1 = System.nanoTime()\n", + "val read_time = (t1 - t0)*1e-9\n", + "println(\"Time to load training data: \" + read_time + \"s\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Data Preparation" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "train_df = [label: double, text: string]\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "[label: double, text: string]" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.sql.functions.{rand, when}\n", + "\n", + "train_df = train_df.orderBy(rand()) // Randomly permute the data for online training\n", + " .withColumn(\"label\", 'sentiment.cast(\"Int\"))\n", + " .select('label as 'label, 'text as 'text)\n", + " .withColumn(\"label\", when('label > 0, 1.0D).otherwise(-1.0D))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Feature Engineering and Model Training" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "vwFeaturizer = VowpalWabbitFeaturizer_f1627b144bf8\n", + "vwParams = --loss_function=logistic --quiet --holdout_off\n", + "vw = VowpalWabbitClassifier_62338413e2e0\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "VowpalWabbitClassifier_62338413e2e0" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import org.apache.spark.ml.Pipeline\n", + "import com.microsoft.ml.spark.vw.{VowpalWabbitFeaturizer, VowpalWabbitClassifier}\n", + "\n", + "val vwFeaturizer = new VowpalWabbitFeaturizer()\n", + " .setStringSplitInputCols(Array(\"text\"))\n", + " .setOutputCol(\"features\")\n", + "\n", + "val vwParams = \"--loss_function=logistic --quiet --holdout_off\"\n", + "val vw = new VowpalWabbitClassifier()\n", + " .setLabelCol(\"label\")\n", + " .setArgs(vwParams)\n", + " .setNumPasses(1)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Time to train Vowpal Wabbit model: 19.832663679s\n" + ] + }, + { + "data": { + "text/plain": [ + "vw_pipeline = pipeline_3d0899e64c3e\n", + "t0 = 18482474609412\n", + "vwModel = pipeline_3d0899e64c3e\n", + "t1 = 18502307273091\n", + "train_time = 19.832663679\n" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "19.832663679" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "// define a training pipeline\n", + "val vw_pipeline = new Pipeline().setStages(Array(vwFeaturizer, vw))\n", + "\n", + "var t0 = System.nanoTime()\n", + "val vwModel = vw_pipeline.fit(train_df)\n", + "var t1 = System.nanoTime()\n", + "val train_time = (t1 - t0)*1e-9\n", + "println(\"Time to train Vowpal Wabbit model: \" + train_time + \"s\")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "// Save model to hdfs\n", + "vwModel.write.overwrite().save(\"./model/vwModel_twitter_sentiment\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "accumulo - Scala", + "language": "scala", + "name": "accumulo_scala" + }, + "language_info": { + "codemirror_mode": "text/x-scala", + "file_extension": ".scala", + "mimetype": "text/x-scala", + "name": "scala", + "pygments_lexer": "scala", + "version": "2.11.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}