diff --git a/pipeline/src/pipeline/get_data.py b/pipeline/src/pipeline/get_data.py index 720fae0..bbd6cfa 100644 --- a/pipeline/src/pipeline/get_data.py +++ b/pipeline/src/pipeline/get_data.py @@ -376,17 +376,20 @@ async def scrape_messages(telegram_client, telegram_channels, start_date, end_da wait_time = 5 ): reply = None - if message is not None: + print('message.text: ', message.text) + if message.text: df_messages = arrange_telegram_messages(df_messages, message, reply, channel) if channel_entity.broadcast and message.post and message.replies: df_replies = pd.DataFrame() try: async for reply in telegram_client.iter_messages( channel_entity, + offset_date=start_date, + reverse=True, reply_to=message.id, wait_time = 5 ): - if reply is not None: + if reply.text: df_replies = arrange_telegram_messages(df_replies, message, reply, channel) time.sleep(5) if len(df_replies) > 0: diff --git a/pipeline/src/pipeline/utils.py b/pipeline/src/pipeline/utils.py index 298593d..c8f16de 100644 --- a/pipeline/src/pipeline/utils.py +++ b/pipeline/src/pipeline/utils.py @@ -896,37 +896,37 @@ def save_data(name, directory, data, id, sm_code, config): with open(tweets_path, "rb") as upload_file: blob_client.upload_blob(upload_file, overwrite=True) - # append to existing twitter dataframe - if "rcrc" in data.columns: - final_table_columns = ["index", "source", "member_count", "message_count", \ - "text", "datetime", "id", "date", "rcrc", "cva", "full_text_en"] - data.drop(columns=[col for col in data if col not in final_table_columns], inplace=True) - if containsNumber(name): - name = "_".join(name.split('_')[0:3]) - - data_all_path = f"./{directory}/{name}_all.csv" - try: - if not config["skip-datalake"]: - blob_client = get_blob_service_client(f'{directory}/{name}_all.csv', config) - with open(data_all_path, "wb") as download_file: - download_file.write(blob_client.download_blob().readall()) - data_old = pd.read_csv(data_all_path) # , lines=True) - data_all = data_old.append(data, ignore_index=True) - except: - data_all = data.copy() - - # drop duplicates and save - try: - data_all = data_all.drop_duplicates(subset=[id]) - except: - pass - data_all.to_csv(data_all_path, index=False, encoding="utf-8") - - # upload to datalake - if not config["skip-datalake"]: - blob_client = get_blob_service_client(f'{directory}/{name}_all.csv', config) - with open(data_all_path, "rb") as upload_file: - blob_client.upload_blob(upload_file, overwrite=True) + # # append to existing twitter dataframe + # if "rcrc" in data.columns: + # final_table_columns = ["index", "source", "member_count", "message_count", \ + # "text", "datetime", "id", "date", "rcrc", "cva", "full_text_en"] + # data.drop(columns=[col for col in data if col not in final_table_columns], inplace=True) + # if containsNumber(name): + # name = "_".join(name.split('_')[0:3]) + + # data_all_path = f"./{directory}/{name}_all.csv" + # try: + # if not config["skip-datalake"]: + # blob_client = get_blob_service_client(f'{directory}/{name}_all.csv', config) + # with open(data_all_path, "wb") as download_file: + # download_file.write(blob_client.download_blob().readall()) + # data_old = pd.read_csv(data_all_path) # , lines=True) + # data_all = data_old.append(data, ignore_index=True) + # except: + # data_all = data.copy() + + # # drop duplicates and save + # try: + # data_all = data_all.drop_duplicates(subset=[id]) + # except: + # pass + # data_all.to_csv(data_all_path, index=False, encoding="utf-8") + + # # upload to datalake + # if not config["skip-datalake"]: + # blob_client = get_blob_service_client(f'{directory}/{name}_all.csv', config) + # with open(data_all_path, "rb") as upload_file: + # blob_client.upload_blob(upload_file, overwrite=True) def read_db(sm_code, start_date, end_date, config):