diff --git a/binder/environment.yml b/binder/environment.yml index 87a37e80..e56a0977 100644 --- a/binder/environment.yml +++ b/binder/environment.yml @@ -5,7 +5,7 @@ dependencies: - bokeh=2.1.1 - dask=2.20.0 - dask-image=0.2.0 - - dask-ml=1.5.0 + - dask-ml=1.6.0 - dask-labextension=2.0.2 - jupyterlab=2.1 - nodejs=14 diff --git a/machine-learning/text-vectorization.ipynb b/machine-learning/text-vectorization.ipynb index 9770699a..6ca6d5e7 100644 --- a/machine-learning/text-vectorization.ipynb +++ b/machine-learning/text-vectorization.ipynb @@ -4,16 +4,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Text Vectorization Pipeline\n", + "# Working with Text Data\n", "\n", - "This example illustrates how Dask-ML can be used to classify large textual datasets in parallel.\n", - "It is adapted from [this scikit-learn example](https://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html#sphx-glr-auto-examples-applications-plot-out-of-core-classification-py).\n", - "\n", - "The primary differences are that\n", - "\n", - "* We fit the entire model, including text vectorization, as a pipeline.\n", - "* We use dask collections like [Dask Bag](https://docs.dask.org/en/latest/bag.html), [Dask Dataframe](https://docs.dask.org/en/latest/dataframe.html), and [Dask Array](https://docs.dask.org/en/latest/array.html)\n", - " rather than generators to work with larger than memory datasets." + "Dask-ML includes several ways to [process text data](https://ml.dask.org/modules/api.html#dask-ml-feature-extraction-text-feature-extraction). Typically these work with the [`Dask DataFrame`](https://docs.dask.org/en/latest/dataframe.html) or [`Bag`](https://docs.dask.org/en/latest/bag.html) collections, which can reference larger-than-memory datasets stored on disk or in distributed memory on a Dask Cluster." ] }, { @@ -22,9 +15,9 @@ "metadata": {}, "outputs": [], "source": [ - "from dask.distributed import Client, progress\n", + "from dask.distributed import Client\n", "\n", - "client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')\n", + "client = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')\n", "client" ] }, @@ -32,9 +25,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Fetch the data\n", - "\n", - "Scikit-Learn provides a utility to fetch the newsgroups dataset." + "In this example, we'll work with the 20 newsgroups dataset from scikit-learn. Each element in the dataset has a bit of metadata and the full text of a post." ] }, { @@ -45,20 +36,15 @@ "source": [ "import sklearn.datasets\n", "\n", - "bunch = sklearn.datasets.fetch_20newsgroups()" + "news = sklearn.datasets.fetch_20newsgroups()\n", + "print(news.data[0][:500])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "The data from scikit-learn isn't *too* large, so the data is just\n", - "returned in memory. Each document is a string. The target we're predicting\n", - "is an integer, which codes the topic of the post.\n", - "\n", - "We'll load the documents and targets directly into a dask DataFrame.\n", - "In practice, on a larger than memory dataset, you would likely load the\n", - "documents from disk or cloud storage using `dask.bag` or `dask.delayed`." + "This returns a list of documents (strings). We'll load the datset using `dask.bag.from_sequence`, but in practice you would want to load the data on the workers. See https://docs.dask.org/en/latest/best-practices.html#load-data-with-dask and https://docs.dask.org/en/latest/delayed-best-practices.html#don-t-call-dask-delayed-on-other-dask-collections." ] }, { @@ -67,20 +53,45 @@ "metadata": {}, "outputs": [], "source": [ - "import dask.dataframe as dd\n", - "import pandas as pd\n", + "import dask\n", + "import numpy as np\n", + "import dask.bag as db\n", "\n", - "df = dd.from_pandas(pd.DataFrame({\"text\": bunch.data, \"target\": bunch.target}),\n", - " npartitions=25)\n", + "documents = db.from_sequence(news['data'], npartitions=10).persist()\n", + "documents" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Feature Extraction\n", "\n", - "df" + "Dask-ML's feature extractors turn Bags or DataFrames of raw documents (strings) into Dask Arrays backed by scipy.sparse matrices.\n", + "\n", + "If the limitations of `HashingVectorizer` (no inverse transform, no IDF weighting) are acceptable, then we strongly recommend using it over something like `CountVectorizer`. `HashingVectorizer` is completely stateless and so is much easier (and faster) to use in a distributed setting.\n", + "\n", + "Note that becuase `HashingVectorizer` is stateless, the calls to `fit` and `transform` are nearly instant." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import dask_ml.feature_extraction\n", + "\n", + "hashing_vectorizer = dask_ml.feature_extraction.text.HashingVectorizer()\n", + "%time hashing_vectorizer.fit(documents)\n", + "%time transformed = hashing_vectorizer.transform(documents)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Each row in the `text` column has a bit of metadata and the full text of a post." + "It's only when you `.compute()` the result that we load data and do the transformation." ] }, { @@ -89,23 +100,58 @@ "metadata": {}, "outputs": [], "source": [ - "print(df.head().loc[0, 'text'][:500])" + "%time transformed.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "## Feature Hashing" + "`CountVectorizer` is not stateless unless you provide a `vocabulary` ahead of time. When no vocabulary is provided, `CountVectorizer.fit` or `CountVectorizer.fit_transform` will need to load data to discover the unique set of terms in the documents." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "vectorizer = dask_ml.feature_extraction.text.CountVectorizer()\n", + "%time result = vectorizer.fit_transform(documents)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now `.fit_transform` (and `fit` and `transform`) is much more expensive since all the documents must be loaded to determine the `vocabulary`.\n", + "\n", + "Thee result is again a Dask `Array` backed by `scipy.sparse.csr_matrix` objects. We can bring it back to the client with `.compute()`" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%time result.compute()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Notice that we persisted `documents` earlier. If possible, persisting the input documents is preferable to avoid making two passes over the data. One to discover the vocabulary and a second to transform. If the dataset is larger than (distributed) memory, then two passes will be necessary." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Dask's [HashingVectorizer](https://ml.dask.org/modules/generated/dask_ml.feature_extraction.text.HashingVectorizer.html#dask_ml.feature_extraction.text.HashingVectorizer) provides a similar API to [scikit-learn's implementation](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.HashingVectorizer.html). In fact, Dask-ML's implementation uses scikit-learn's, applying it to each partition of the input `dask.dataframe.Series` or `dask.bag.Bag`.\n", + "## A note on vocabularies\n", "\n", - "Transformation, once we actually compute the result, happens in parallel and returns a dask Array." + "You can also provide a vocabulary ahead of time, which avoids the need for making two passes over the data. This makes operations like `vectorizer.transform` instantaneous, since no vocabulary needs to be discovered. However, vocabularies can become quite large. Consider persisting your data ahead of time to avoid bloating the size of the `CountVectorizer` object. Dask-ML's `CountVectorizer` works just fine when the `vocabulary` is a pointer to a piece of data on the cluster." ] }, { @@ -114,20 +160,61 @@ "metadata": {}, "outputs": [], "source": [ - "import dask_ml.feature_extraction.text\n", + "# reuse the vocabulary from the previously fitted estimator.\n", + "# In practice this would come from an external source.\n", + "vocabulary = vectorizer.vocabulary_\n", + "remote_vocabulary, = client.scatter([vocabulary], broadcast=True)\n", "\n", - "vect = dask_ml.feature_extraction.text.HashingVectorizer()\n", - "X = vect.fit_transform(df['text'])\n", - "X" + "vectorizer2 = dask_ml.feature_extraction.text.CountVectorizer(\n", + " vocabulary=remote_vocabulary\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "`CountVectorizer.transform` doesn't need to do any real work now, so it's fast." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%time result = vectorizer2.transform(documents)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "%time result.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "The output array `X` has unknown chunk sizes becase the input dask Series or Bags don't know their own length.\n", + "See https://scikit-learn.org/stable/modules/feature_extraction.html#vectorizing-a-large-text-corpus-with-the-hashing-trick for more on problems with large vocabularies, which recommends \"feature hashing\" as a possible solution.\n", + "\n", + "## Feature Hashing\n", "\n", - "Each block in `X` is a `scipy.sparse` matrix." + "Feature hashing transforms a DataFrame or Bag of inputs (mappings or strings) to a sparse array. It is completely stateless, and so doesn't suffer from the same issues as `CountVectorizer`. See https://scikit-learn.org/stable/modules/feature_extraction.html#feature-hashing for more." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "hasher = dask_ml.feature_extraction.text.FeatureHasher(input_type=\"string\")\n", + "result = hasher.transform(documents)\n", + "result" ] }, { @@ -136,14 +223,51 @@ "metadata": {}, "outputs": [], "source": [ - "X.blocks[0].compute()" + "%time result.compute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "This is a document-term matrix. Each row is the hashed representation of the original post." + "## Text Vectorization Pipeline" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The rest of this example is adapted from [this scikit-learn example](https://scikit-learn.org/stable/auto_examples/applications/plot_out_of_core_classification.html#sphx-glr-auto-examples-applications-plot-out-of-core-classification-py).\n", + "\n", + "The primary differences are that\n", + "\n", + "* We fit the entire model, including text vectorization, as a pipeline.\n", + "* We use dask collections like [Dask Bag](https://docs.dask.org/en/latest/bag.html), [Dask Dataframe](https://docs.dask.org/en/latest/dataframe.html), and [Dask Array](https://docs.dask.org/en/latest/array.html)\n", + " rather than generators to work with larger than memory datasets." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We'll load the documents and targets directly into a dask DataFrame.\n", + "In practice, on a larger than memory dataset, you would likely load the\n", + "documents from disk or cloud storage using `dask.bag` or `dask.delayed`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import dask.dataframe as dd\n", + "import pandas as pd\n", + "\n", + "df = dd.from_pandas(pd.DataFrame({\"text\": news.data, \"target\": news.target}),\n", + " npartitions=25)\n", + "\n", + "df" ] }, { @@ -164,7 +288,7 @@ "metadata": {}, "outputs": [], "source": [ - "bunch.target_names" + "news.target_names" ] }, { @@ -175,7 +299,7 @@ "source": [ "import numpy as np\n", "\n", - "positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]\n", + "positive = np.arange(len(news.target_names))[['comp' in x for x in news.target_names]]\n", "y = df['target'].isin(positive).astype(int)\n", "y" ] @@ -210,6 +334,7 @@ "sgd = sklearn.linear_model.SGDClassifier(\n", " tol=1e-3\n", ")\n", + "vect = dask_ml.feature_extraction.text.HashingVectorizer()\n", "clf = dask_ml.wrappers.Incremental(\n", " sgd, scoring='accuracy', assume_equal_chunks=True\n", ")\n", @@ -293,7 +418,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.4" + "version": "3.8.5" } }, "nbformat": 4,