Skip to content

Commit

Permalink
Update beacon-ingestion.py
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesJUDITH authored Nov 6, 2024
1 parent 69bacd7 commit 33a4d6f
Showing 1 changed file with 48 additions and 30 deletions.
78 changes: 48 additions & 30 deletions berachain-beacon-data/beacon-ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,40 +1036,58 @@ def main():
'port': args.port
}

try:
print("\nInitializing database connection pool...")
db_pool = DatabaseConnectionPool(args.db_type, args.pool_size, **db_params)

ingestion = CosmosDataIngestion(
args.rpc_url,
db_pool,
batch_size=args.batch_size,
max_workers=args.workers,
validator_mappings_file=args.validator_mappings
)
print("\nInitializing database connection pool...")
db_pool = DatabaseConnectionPool(args.db_type, args.pool_size, **db_params)

ingestion = CosmosDataIngestion(
args.rpc_url,
db_pool,
batch_size=args.batch_size,
max_workers=args.workers,
validator_mappings_file=args.validator_mappings
)

if not args.skip_setup:
print("\nSetting up database tables...")
ingestion.setup_tables()

# Get latest chain height
print("\nGetting latest chain height...")
latest_block_response = requests.get(f"{args.rpc_url}/abci_info")
latest_block_response.raise_for_status()
latest_height = int(latest_block_response.json()["result"]["response"]["last_block_height"])

# Get our last processed height
start_height = ingestion.get_last_processed_block() + 1

print(f"\nStarting ingestion from block {start_height} to {latest_height}")
total_processed = ingestion.ingest_blocks(start_height, latest_height)
print(f"\nSuccessfully processed {total_processed} blocks")

if not args.skip_setup:
print("\nSetting up database tables...")
ingestion.setup_tables()

try:
while True:
# Get latest chain height
latest_block_response = requests.get(
f"{args.rpc_url}/abci_info",
timeout=10
)
latest_block_response.raise_for_status()
latest_height = int(latest_block_response.json()["result"]["response"]["last_block_height"])

# Get our last processed height
start_height = ingestion.get_last_processed_block() + 1

if start_height > latest_height:
print(f"\nCaught up at height {start_height-1}. Waiting for new blocks...")
time.sleep(5) # Wait 5 seconds before checking again
continue

# Process new blocks
print(f"\nProcessing blocks {start_height} to {latest_height}")
total_processed = ingestion.ingest_blocks(start_height, latest_height)
print(f"Successfully processed {total_processed} blocks")

except KeyboardInterrupt:
print("\nShutting down gracefully...")
except Exception as e:
print(f"\nFatal error: {str(e)}")
print(f"\nError occurred: {str(e)}")
raise
finally:
if 'db_pool' in locals():
db_pool.close_all()

if __name__ == "__main__":
import signal

def handle_sigterm(signum, frame):
print("\nReceived termination signal - shutting down...")
sys.exit(0)

signal.signal(signal.SIGTERM, handle_sigterm)
main()

0 comments on commit 33a4d6f

Please sign in to comment.