Skip to content

Commit

Permalink
exclude empty messages when scraping; disable saving *_all.csv
Browse files Browse the repository at this point in the history
  • Loading branch information
p-phung committed Mar 15, 2023
1 parent 9203f4e commit 1909745
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 33 deletions.
7 changes: 5 additions & 2 deletions pipeline/src/pipeline/get_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,17 +376,20 @@ async def scrape_messages(telegram_client, telegram_channels, start_date, end_da
wait_time = 5
):
reply = None
if message is not None:
print('message.text: ', message.text)
if message.text:
df_messages = arrange_telegram_messages(df_messages, message, reply, channel)
if channel_entity.broadcast and message.post and message.replies:
df_replies = pd.DataFrame()
try:
async for reply in telegram_client.iter_messages(
channel_entity,
offset_date=start_date,
reverse=True,
reply_to=message.id,
wait_time = 5
):
if reply is not None:
if reply.text:
df_replies = arrange_telegram_messages(df_replies, message, reply, channel)
time.sleep(5)
if len(df_replies) > 0:
Expand Down
62 changes: 31 additions & 31 deletions pipeline/src/pipeline/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -896,37 +896,37 @@ def save_data(name, directory, data, id, sm_code, config):
with open(tweets_path, "rb") as upload_file:
blob_client.upload_blob(upload_file, overwrite=True)

# append to existing twitter dataframe
if "rcrc" in data.columns:
final_table_columns = ["index", "source", "member_count", "message_count", \
"text", "datetime", "id", "date", "rcrc", "cva", "full_text_en"]
data.drop(columns=[col for col in data if col not in final_table_columns], inplace=True)
if containsNumber(name):
name = "_".join(name.split('_')[0:3])

data_all_path = f"./{directory}/{name}_all.csv"
try:
if not config["skip-datalake"]:
blob_client = get_blob_service_client(f'{directory}/{name}_all.csv', config)
with open(data_all_path, "wb") as download_file:
download_file.write(blob_client.download_blob().readall())
data_old = pd.read_csv(data_all_path) # , lines=True)
data_all = data_old.append(data, ignore_index=True)
except:
data_all = data.copy()

# drop duplicates and save
try:
data_all = data_all.drop_duplicates(subset=[id])
except:
pass
data_all.to_csv(data_all_path, index=False, encoding="utf-8")

# upload to datalake
if not config["skip-datalake"]:
blob_client = get_blob_service_client(f'{directory}/{name}_all.csv', config)
with open(data_all_path, "rb") as upload_file:
blob_client.upload_blob(upload_file, overwrite=True)
# # append to existing twitter dataframe
# if "rcrc" in data.columns:
# final_table_columns = ["index", "source", "member_count", "message_count", \
# "text", "datetime", "id", "date", "rcrc", "cva", "full_text_en"]
# data.drop(columns=[col for col in data if col not in final_table_columns], inplace=True)
# if containsNumber(name):
# name = "_".join(name.split('_')[0:3])

# data_all_path = f"./{directory}/{name}_all.csv"
# try:
# if not config["skip-datalake"]:
# blob_client = get_blob_service_client(f'{directory}/{name}_all.csv', config)
# with open(data_all_path, "wb") as download_file:
# download_file.write(blob_client.download_blob().readall())
# data_old = pd.read_csv(data_all_path) # , lines=True)
# data_all = data_old.append(data, ignore_index=True)
# except:
# data_all = data.copy()

# # drop duplicates and save
# try:
# data_all = data_all.drop_duplicates(subset=[id])
# except:
# pass
# data_all.to_csv(data_all_path, index=False, encoding="utf-8")

# # upload to datalake
# if not config["skip-datalake"]:
# blob_client = get_blob_service_client(f'{directory}/{name}_all.csv', config)
# with open(data_all_path, "rb") as upload_file:
# blob_client.upload_blob(upload_file, overwrite=True)


def read_db(sm_code, start_date, end_date, config):
Expand Down

0 comments on commit 1909745

Please sign in to comment.