Skip to content

Commit

Permalink
Added iPython Notebook for streaming. Entire stream routed to localho…
Browse files Browse the repository at this point in the history
…st:5555
  • Loading branch information
ashok133 committed Apr 24, 2018
0 parents commit b8cc1c2
Show file tree
Hide file tree
Showing 52 changed files with 703 additions and 0 deletions.
Binary file added .checkpoint-1524605250000.crc
Binary file not shown.
Binary file added .checkpoint-1524605260000.crc
Binary file not shown.
Binary file added .checkpoint-1524605270000.crc
Binary file not shown.
Binary file added .checkpoint-1524605280000.crc
Binary file not shown.
Binary file added .checkpoint-1524605290000.crc
Binary file not shown.
Binary file added .checkpoint-1524605300000.crc
Binary file not shown.
Binary file added .checkpoint-1524605310000.crc
Binary file not shown.
Binary file added .checkpoint-1524605320000.crc
Binary file not shown.
Binary file added .checkpoint-1524605330000.crc
Binary file not shown.
Binary file added .checkpoint-1524605340000.crc
Binary file not shown.
6 changes: 6 additions & 0 deletions .ipynb_checkpoints/BDA-test-checkpoint.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 2
}
215 changes: 215 additions & 0 deletions .ipynb_checkpoints/twitter_feed_bda-checkpoint.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from pyspark import SparkContext\n",
"from pyspark.streaming import StreamingContext\n",
"from pyspark.sql import SQLContext\n",
"from pyspark.sql.functions import desc"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://192.168.0.104:4041\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v2.3.0</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>local[*]</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>PySparkShell</code></dd>\n",
" </dl>\n",
" </div>\n",
" "
],
"text/plain": [
"<SparkContext master=local[*] appName=PySparkShell>"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sc"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"ssc = StreamingContext(sc, 10 )\n",
"sqlContext = SQLContext(sc)\n",
"ssc.checkpoint( \"file:///Users/ashok/Desktop/Untitled Folder\")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"socket_stream = ssc.socketTextStream(\"192.168.0.104\", 5555)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"lines = socket_stream.window( 20 )"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from collections import namedtuple\n",
"fields = (\"tag\", \"count\" )\n",
"Tweet = namedtuple( 'Tweet', fields )"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"( lines.flatMap( lambda text: text.split( \" \" ) )\n",
" .filter( lambda word: word.lower().startswith(\"#\") )\n",
" .map( lambda word: ( word.lower(), 1 ) )\n",
" .reduceByKey( lambda a, b: a + b )\n",
" .map( lambda rec: Tweet( rec[0], rec[1] ) )\n",
" .foreachRDD( lambda rdd: rdd.toDF().sort( desc(\"count\") )\n",
" .limit(10).registerTempTable(\"tweets\") ) )"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"<pyspark.sql.context.SQLContext at 0x111b1d290>"
]
},
"execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"sqlContext"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"ssc.start() "
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"import seaborn as sn\n",
"%matplotlib inline"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"ename": "IllegalArgumentException",
"evalue": "u'Unable to locate hive jars to connect to metastore. Please set spark.sql.hive.metastore.jars.'",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mIllegalArgumentException\u001b[0m Traceback (most recent call last)",
"\u001b[0;32m<ipython-input-13-360684cde2c1>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m 6\u001b[0m \u001b[0;32mwhile\u001b[0m \u001b[0mcount\u001b[0m \u001b[0;34m<\u001b[0m \u001b[0;36m10\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 7\u001b[0m \u001b[0mtime\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msleep\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m2\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 8\u001b[0;31m \u001b[0mtop_10_tweets\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0msqlContext\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msql\u001b[0m\u001b[0;34m(\u001b[0m \u001b[0;34m'Select tag, count from tweets'\u001b[0m \u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 9\u001b[0m \u001b[0mtop_10_df\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mtop_10_tweets\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoPandas\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 10\u001b[0m \u001b[0mdisplay\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mclear_output\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mwait\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0mTrue\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/Users/ashok/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/context.pyc\u001b[0m in \u001b[0;36msql\u001b[0;34m(self, sqlQuery)\u001b[0m\n\u001b[1;32m 351\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row1'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m2\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row2'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m3\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row3'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 352\u001b[0m \"\"\"\n\u001b[0;32m--> 353\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msparkSession\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msql\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msqlQuery\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 354\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 355\u001b[0m \u001b[0;34m@\u001b[0m\u001b[0msince\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m1.0\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/Users/ashok/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/session.pyc\u001b[0m in \u001b[0;36msql\u001b[0;34m(self, sqlQuery)\u001b[0m\n\u001b[1;32m 706\u001b[0m \u001b[0;34m[\u001b[0m\u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row1'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m2\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row2'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mRow\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mf1\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m3\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mf2\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34mu'row3'\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 707\u001b[0m \"\"\"\n\u001b[0;32m--> 708\u001b[0;31m \u001b[0;32mreturn\u001b[0m \u001b[0mDataFrame\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jsparkSession\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msql\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0msqlQuery\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_wrapped\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 709\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 710\u001b[0m \u001b[0;34m@\u001b[0m\u001b[0msince\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;36m2.0\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/Users/ashok/spark-2.3.0-bin-hadoop2.7/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1158\u001b[0m \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1159\u001b[0m return_value = get_return_value(\n\u001b[0;32m-> 1160\u001b[0;31m answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m 1161\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 1162\u001b[0m \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;32m/Users/ashok/spark-2.3.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 77\u001b[0m \u001b[0;32mraise\u001b[0m \u001b[0mQueryExecutionException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 78\u001b[0m \u001b[0;32mif\u001b[0m \u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mstartswith\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m'java.lang.IllegalArgumentException: '\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 79\u001b[0;31m \u001b[0;32mraise\u001b[0m \u001b[0mIllegalArgumentException\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0ms\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msplit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m': '\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;36m1\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m[\u001b[0m\u001b[0;36m1\u001b[0m\u001b[0;34m]\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstackTrace\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m 80\u001b[0m \u001b[0;32mraise\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m 81\u001b[0m \u001b[0;32mreturn\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mIllegalArgumentException\u001b[0m: u'Unable to locate hive jars to connect to metastore. Please set spark.sql.hive.metastore.jars.'"
]
}
],
"source": [
"import time\n",
"from IPython import display\n",
"\n",
"\n",
"count = 0\n",
"while count < 10:\n",
" time.sleep(2)\n",
" top_10_tweets = sqlContext.sql( 'Select tag, count from tweets' )\n",
" top_10_df = top_10_tweets.toPandas()\n",
" display.clear_output(wait=True)\n",
" sn.plt.figure( figsize = ( 10, 8 ) )\n",
" sn.barplot( x=\"count\", y=\"tag\", data=top_10_df)\n",
" sn.plt.show()\n",
" count = count + 1"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 2",
"language": "python",
"name": "python2"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 2
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython2",
"version": "2.7.14"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading

0 comments on commit b8cc1c2

Please sign in to comment.