Skip to content

Commit

Permalink
OpenAI Text Embeddings v2 (#29)
Browse files Browse the repository at this point in the history
Fetch text embeddings from OpenAI on a per-tweet basis, so we can compare against the per-user approach.
  • Loading branch information
s2t2 authored Feb 2, 2024
1 parent 1b8372d commit fe9e414
Show file tree
Hide file tree
Showing 31 changed files with 6,502 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
google-credentials.json
google-credentials-shared.json

data/*.csv

data/*/*.csv
data/*/*.csv.gz
Expand Down
13 changes: 10 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ OPENAI_API_KEY="sk__________"

GOOGLE_APPLICATION_CREDENTIALS="/path/to/openai-embeddings-2023/google-credentials.json"
BUCKET_NAME="my-bucket"

DATASET_ADDRESS="my_project.my_dataset"
```

## Usage
Expand All @@ -62,16 +64,14 @@ python -m app.openai_service
```


### Dataset Loading
### Embeddings per User (v1)

Demonstrate ability to load the dataset:

```sh
python -m app.dataset
```

### Data Analysis

Perform machine learning and other analyses on the data:

OpenAI Embeddings:
Expand All @@ -87,6 +87,13 @@ Word2Vec Embeddings:
+ [Classification](app/word2vec_classification/README.md)


### Embeddings per Tweet (v1)

OpenAI Embeddings:

+ [Fetching Embeddings](app/openai_embeddings/per_tweet/README.md)


## Testing

```sh
Expand Down
82 changes: 82 additions & 0 deletions app/bq_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# https://raw.githubusercontent.com/s2t2/tweet-analysis-2023/main/app/bq_service.py

import os
from datetime import datetime

from dotenv import load_dotenv
from google.cloud import bigquery
#from google.cloud.bigquery import QueryJobConfig, ScalarQueryParameter
from pandas import DataFrame

from app.google_apis import GOOGLE_APPLICATION_CREDENTIALS # implicit check by google.cloud

load_dotenv()

#GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") # implicit check by google.cloud

# used by child classes only, defined here for convenience
DATASET_ADDRESS = os.getenv("DATASET_ADDRESS", default="tweet-collector-py.impeachment_development") # "MY_PROJECT.MY_DATASET"


class BigQueryService():

def __init__(self, client=None, dataset_address=DATASET_ADDRESS):
self.client = client or bigquery.Client()
self.dataset_address = dataset_address

def execute_query(self, sql, verbose=True):
if verbose == True:
print(sql)
job = self.client.query(sql)
return job.result()

def query_to_df(self, sql, verbose=True):
"""high-level wrapper to return a DataFrame"""
results = self.execute_query(sql, verbose=verbose)
records = [dict(row) for row in list(results)]
df = DataFrame(records)
return df

@staticmethod
def split_into_batches(my_list, batch_size=10_000):
"""Splits a list into evenly sized batches"""
# h/t: https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
for i in range(0, len(my_list), batch_size):
yield my_list[i : i + batch_size]

@staticmethod
def generate_timestamp(dt=None):
"""Formats datetime object for storing in BigQuery. Uses current time by default. """
dt = dt or datetime.now()
return dt.strftime("%Y-%m-%d %H:%M:%S")

def insert_records_in_batches(self, table, records, batch_size=5_000):
"""
Inserts records in batches because attempting to insert too many rows at once
may result in google.api_core.exceptions.BadRequest: 400
Params:
table (table ID string, Table, or TableReference)
records (list of dictionaries)
"""
rows_to_insert = [list(d.values()) for d in records]
#errors = self.client.insert_rows(table, rows_to_insert)
errors = []
batches = list(BigQueryService.split_into_batches(rows_to_insert, batch_size=batch_size))
for batch in batches:
errors += self.client.insert_rows(table, batch)
return errors



if __name__ == "__main__":

service = BigQueryService()
client = service.client
print("PROJECT:", client.project)

print("DATASETS:")
datasets = list(client.list_datasets())
for ds in datasets:
#print("...", ds.project, ds.dataset_id)
print("...", ds.reference)
11 changes: 11 additions & 0 deletions app/google_apis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@




import os

from dotenv import load_dotenv

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") # implicit check by google.cloud
4 changes: 3 additions & 1 deletion app/model_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
from google.cloud import storage as gcs
from dotenv import load_dotenv

from app.google_apis import GOOGLE_APPLICATION_CREDENTIALS # implicit check by google.cloud

load_dotenv()

GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") # implicit check by google.cloud for env var
#GOOGLE_APPLICATION_CREDENTIALS = os.getenv("GOOGLE_APPLICATION_CREDENTIALS") # implicit check by google.cloud for env var

#PROJECT_ID = os.getenv("GOOGLE_PROJECT_NAME") # "my-project"
BUCKET_NAME = os.getenv("BUCKET_NAME") # "my-bucket" needs to be globally unique!
Expand Down
226 changes: 226 additions & 0 deletions app/openai_embeddings_v2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
# OpenAI Embeddings (v2)

Get embeddings, not only per user, but also per tweet, so we can compare the two approaches. Pull a new sample of tweets for the users we have been analyzing, but this time make sure to keep track of which tweets are being used, which will aid comparisons.

## Setup

Migrations, as necessary. Here we create a table of all tweets from each user in the sample:

```sql
--CREATE TABLE `tweet-collector-py.impeachment_production.botometer_sample_max_10` as (
--CREATE TABLE `tweet-collector-py.impeachment_production.botometer_sample_max_50` as (
CREATE TABLE `tweet-collector-py.impeachment_production.botometer_sample` as (
WITH ranked_tweets AS (
SELECT
u.user_id, t.status_id, t.status_text, t.created_at,
ROW_NUMBER() OVER (PARTITION BY u.user_id ORDER BY RAND()) AS row_num
FROM (
SELECT DISTINCT user_id
FROM `tweet-collector-py.impeachment_production.botometer_sample_openai_tweet_embeddings_20230724`
) u
JOIN `tweet-collector-py.impeachment_production.tweets_v2` t on t.user_id = u.user_id
ORDER BY u.user_id, t.created_at
--LIMIT 10
)

SELECT user_id, row_num,
status_id, status_text, created_at,
FROM ranked_tweets
-- WHERE row_num <= 10 -- MAX_TWEETS_PER_USER
-- WHERE row_num <= 50 -- MAX_TWEETS_PER_USER

);
```


How to sample from this table (choose a `MAX_TWEETS_PER_USER`, which we set as 50 by default):

```sql
SELECT
count(distinct user_id) as user_count -- 7566
,count(distinct status_id) as status_count -- 183727
FROM `tweet-collector-py.impeachment_production.botometer_sample`
WHERE row_num <= 50 -- MAX_TWEETS_PER_USER
```

```sql
CREATE TABLE `tweet-collector-py.impeachment_production.botometer_sample_max_50` as (
SELECT *
FROM `tweet-collector-py.impeachment_production.botometer_sample`
WHERE row_num <= 50
ORDER BY user_id, row_num
)
```

The 7,566 users in this sample have 183,727 tweets.

Unique table of texts with identifiers:

```sql
DROP TABLE IF EXISTS `tweet-collector-py.impeachment_production.botometer_sample_max_50_texts_map`;
CREATE TABLE IF NOT EXISTS `tweet-collector-py.impeachment_production.botometer_sample_max_50_texts_map` as (
--WITH texts_map as (
SELECT --s.user_id, s.row_num, s.status_id, s.status_text, s.created_at
ROW_NUMBER() OVER () AS status_text_id
,s.status_text
,count(DISTINCT s.status_id) as status_count
,array_agg(DISTINCT s.status_id) as status_ids
,count(DISTINCT s.user_id) as user_count
,array_agg(DISTINCT s.user_id) as user_ids
FROM `tweet-collector-py.impeachment_production.botometer_sample` s
WHERE s.row_num <= 50 -- MAX_TWEETS_PER_USER
GROUP BY 2
--ORDER BY status_count desc
--)
--SELECT status_text, status_count, status_id
--FROM texts_map,
--UNNEST(status_ids) AS status_id
)
```

Of the 183,727 tweets in this sample, there are 80,205 unique texts.

Migrate table to receive text embeddings:

```sql
CREATE TABLE IF NOT EXISTS `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_text_embeddings` (
status_text_id INT64,
embeddings ARRAY<FLOAT64>
)
```

Migrate table to receive user embeddings:

```sql
DROP TABLE IF EXISTS `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_user_embeddings`;
CREATE TABLE IF NOT EXISTS `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_user_embeddings` (
user_id INT64,
embeddings ARRAY<FLOAT64>
)
```

## Embeddings

### User level Embeddings

Fetch user-level embeddings, and store in BQ:

```sh
python -m app.openai_embeddings.per_user

USERS_LIMIT=10 python -m app.openai_embeddings.per_user
USERS_LIMIT=100 python -m app.openai_embeddings.per_user
USERS_LIMIT=1000 python -m app.openai_embeddings.per_user
```

Monitoring the results:

```sql
SELECT
count(distinct s.user_id) as user_count
,count(distinct case when emb.user_id is not null then s.user_id end) as users_collected
,count(distinct case when emb.user_id is not null then s.user_id end) / count(distinct s.user_id) as pct_collected
FROM `tweet-collector-py.impeachment_production.botometer_sample` s
LEFT JOIN `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_user_embeddings` emb
ON s.user_id = emb.user_id

```


### Tweet level Embeddings

Fetch tweet-level embeddings, and store in BQ:

```sh
python -m app.openai_embeddings.per_tweet.embeddings_job

TEXTS_LIMIT=10 python -m app.openai_embeddings.per_tweet
TEXTS_LIMIT=1500 python -m app.openai_embeddings.per_tweet
TEXTS_LIMIT=10000 python -m app.openai_embeddings.per_tweet
TEXTS_LIMIT=250000 python -m app.openai_embeddings.per_tweet
```

Monitoring the results:

```sql
SELECT count(distinct status_text_id) as text_count
FROM `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_text_embeddings` emb
```


Reconstruct table of embedding per status (as they were originally fetched for each distinct text):


```sql
CREATE TABLE `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_status_embeddings` as (
WITH lookup_table as (
SELECT txt.status_text_id ,status_id
FROM `tweet-collector-py.impeachment_production.botometer_sample_max_50_texts_map` txt,
UNNEST(txt.status_ids) as status_id
)

SELECT txt.status_id, txt.status_text_id, emb.embeddings
FROM lookup_table txt
JOIN `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_text_embeddings` emb
ON txt.status_text_id = emb.status_text_id
ORDER BY 2
--LIMIT 10

)
```

Looks like we may have some duplicates, so update the table to remove dups:

```sql
-- SELECT status_id, count(*) as row_count
-- FROM `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_status_embeddings`
-- GROUP BY 1
-- HAVING row_count > 1
-- ORDER BY 2 DESC
-- -- 14652 example status ids: 1212493877673779200, 1212848708171321344, 1217970948529364992

-- SELECT status_id, status_text_id, count(*) as row_count
-- FROM `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_status_embeddings`
-- GROUP BY 1,2
-- HAVING row_count > 1
-- ORDER BY 2 DESC
-- -- 14652 dups, example status ids: 1212493877673779200, 1212848708171321344, 1217970948529364992

CREATE TABLE `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_status_embeddings_v2` as (
-- DE-DUPLICATED :-)
SELECT status_id, status_text_id, any_value(embeddings) as embeddings
FROM `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_status_embeddings`
GROUP BY 1,2
)

--SELECT count(distinct status_id) as status_count
--FROM `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_status_embeddings_v2`
---- 183727
```

Add the user-level info back to the table for convenience of future queries. Can always not select it later.

```sql
CREATE TABLE `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_status_embeddings_v3` as (
SELECT s.user_id, s.status_id, s.status_text, s.created_at
, array_length(emb.embeddings) as embeds_length
,emb.embeddings
FROM `tweet-collector-py.impeachment_production.botometer_sample` s
JOIN `tweet-collector-py.impeachment_production.botometer_sample_max_50_openai_status_embeddings_v2` emb
ON s.status_id = emb.status_id
-- LIMIT 10000
)

```

The contents of the embeddings alone are greater than the BQ export limit of 1GB, so we have to [export to GCS](https://cloud.google.com/bigquery/docs/exporting-data), or stream via notebook.



## Exporting CSV files to Drive

See [notebooks](/notebooks/openai_embeddings_v2/README.md).

## Analysis

See [notebooks](/notebooks/openai_embeddings_v2/README.md).
Loading

0 comments on commit fe9e414

Please sign in to comment.