-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
9dd17f4
commit b8e0fd8
Showing
6 changed files
with
446 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
import pytest | ||
from datetime import datetime | ||
from unittest.mock import Mock, patch, MagicMock | ||
from src.analyzer import PoiAnalyzer | ||
from src.database import Database | ||
from src.notification import SlackNotifier | ||
import requests | ||
|
||
@pytest.fixture | ||
def mock_db(): | ||
db = Mock(spec=Database) | ||
# Create a context manager mock | ||
context_mock = MagicMock() | ||
mock_conn = MagicMock() | ||
mock_cursor = MagicMock() | ||
|
||
# Set up the context manager chain | ||
db.get_connection.return_value = context_mock | ||
context_mock.__enter__.return_value = mock_conn | ||
mock_conn.cursor.return_value.__enter__.return_value = mock_cursor | ||
|
||
# Store cursor on db for tests that need it | ||
db._test_cursor = mock_cursor | ||
return db # Return just the db, not the tuple | ||
|
||
@pytest.fixture | ||
def mock_notifier(): | ||
return Mock(spec=SlackNotifier) | ||
|
||
@pytest.fixture | ||
def analyzer(mock_db, mock_notifier): | ||
return PoiAnalyzer(mock_db, mock_notifier) | ||
|
||
def test_analyze_pois_no_discrepancy(analyzer, mock_db): | ||
# Setup | ||
deployment_id = "Qm123" | ||
block_number = 1000 | ||
|
||
# Mock database responses | ||
mock_db.check_notification_sent.return_value = False | ||
mock_db.get_latest_pois.return_value = { | ||
"poi_hash_1": {"indexer1", "indexer2"} # Single POI hash = no discrepancy | ||
} | ||
|
||
# Execute | ||
result = analyzer.analyze_pois(deployment_id, block_number) | ||
|
||
# Assert | ||
assert result is None | ||
mock_db.check_notification_sent.assert_called_once_with(deployment_id, block_number) | ||
mock_db.get_latest_pois.assert_called_once_with(deployment_id, block_number) | ||
|
||
def test_analyze_pois_with_discrepancy(analyzer, mock_db): | ||
# Setup | ||
deployment_id = "Qm123" | ||
block_number = 1000 | ||
|
||
# Mock database responses | ||
mock_db.check_notification_sent.return_value = False | ||
mock_db.get_latest_pois.return_value = { | ||
"poi_hash_1": {"indexer1"}, | ||
"poi_hash_2": {"indexer2"} # Two different POI hashes = discrepancy | ||
} | ||
|
||
# Execute | ||
result = analyzer.analyze_pois(deployment_id, block_number) | ||
|
||
# Assert | ||
assert result is not None | ||
assert result["deployment_cid"] == deployment_id | ||
assert result["block_number"] == block_number | ||
assert result["submissions"] == mock_db.get_latest_pois.return_value | ||
|
||
def test_check_poi_reuse(analyzer): | ||
"""Test POI reuse detection.""" | ||
# Setup | ||
submissions = { | ||
"poi_hash_1": {"indexer1"}, | ||
"poi_hash_2": {"indexer2"} | ||
} | ||
|
||
# Mock database responses | ||
mock_cursor = analyzer.db._test_cursor | ||
mock_cursor.fetchall.return_value = [ | ||
# Match the columns from the query: | ||
# poi, deployment_id, block_number, indexer_address, network_name, submission_time | ||
("poi_hash_1", "deployment1", 1000, b"addr1", "mainnet", datetime.now()), | ||
("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", datetime.now()) | ||
] | ||
|
||
# Execute | ||
result = analyzer._check_poi_reuse(submissions) | ||
|
||
# Assert | ||
assert "poi_hash_1" in result | ||
assert len(result["poi_hash_1"]) == 1 | ||
assert "Previously used" in result["poi_hash_1"][0] | ||
|
||
def test_analyze_pois_already_notified(analyzer, mock_db): | ||
"""Test that we don't re-notify about known discrepancies.""" | ||
deployment_id = "Qm123" | ||
block_number = 1000 | ||
|
||
mock_db.check_notification_sent.return_value = True | ||
|
||
result = analyzer.analyze_pois(deployment_id, block_number) | ||
assert result is None | ||
mock_db.get_latest_pois.assert_not_called() | ||
|
||
def test_analyze_pois_no_submissions(analyzer, mock_db): | ||
"""Test handling of blocks with no POI submissions.""" | ||
deployment_id = "Qm123" | ||
block_number = 1000 | ||
|
||
mock_db.check_notification_sent.return_value = False | ||
mock_db.get_latest_pois.return_value = {} | ||
|
||
result = analyzer.analyze_pois(deployment_id, block_number) | ||
assert result is None | ||
|
||
def test_process_new_submissions_handles_errors(analyzer, mock_db): | ||
"""Test error handling in the main processing loop.""" | ||
# Mock _get_recent_submissions to return some test data | ||
analyzer._get_recent_submissions = Mock(return_value=[ | ||
("Qm123", 1000), | ||
("Qm456", 2000) | ||
]) | ||
|
||
# Make analyze_pois raise an exception for the second submission | ||
def mock_analyze(deployment_id, block_number): | ||
if deployment_id == "Qm456": | ||
raise Exception("Test error") | ||
return None | ||
|
||
analyzer.analyze_pois = Mock(side_effect=mock_analyze) | ||
mock_db.cleanup_old_notifications = Mock() | ||
|
||
# This should not raise an exception and should continue processing | ||
analyzer.process_new_submissions() | ||
|
||
# Verify we tried to process both submissions | ||
assert analyzer.analyze_pois.call_count == 2 | ||
# Verify cleanup was still called | ||
mock_db.cleanup_old_notifications.assert_called_once() | ||
|
||
def test_get_recent_submissions_handles_api_errors(analyzer): | ||
"""Test handling of GraphQL API errors.""" | ||
with patch('requests.post') as mock_post: | ||
# Mock a failed API response | ||
mock_post.side_effect = requests.exceptions.RequestException("API Error") | ||
|
||
result = analyzer._get_recent_submissions() | ||
assert result == [] # Should return empty list on error | ||
|
||
def test_check_poi_reuse_with_multiple_reuses(analyzer): | ||
"""Test POI reuse detection with multiple reuse patterns.""" | ||
submissions = { | ||
"poi_hash_1": {"indexer1"}, | ||
"poi_hash_2": {"indexer2"} | ||
} | ||
|
||
now = datetime.now() | ||
|
||
# Mock database responses | ||
mock_cursor = analyzer.db._test_cursor | ||
mock_cursor.fetchall.return_value = [ | ||
("poi_hash_1", "deployment1", 1000, b"addr1", "mainnet", now), | ||
("poi_hash_1", "deployment2", 900, b"addr2", "mainnet", now), | ||
("poi_hash_2", "deployment1", 1000, b"addr1", "mainnet", now), | ||
("poi_hash_2", "deployment1", 950, b"addr1", "mainnet", now) | ||
] | ||
|
||
result = analyzer._check_poi_reuse(submissions) | ||
assert len(result) == 2 # Both POIs were reused |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
import pytest | ||
from datetime import datetime | ||
from unittest.mock import Mock, MagicMock, patch | ||
from src.database import Database | ||
import psycopg2 | ||
|
||
@pytest.fixture | ||
def mock_conn(): | ||
conn = MagicMock() | ||
with patch('psycopg2.connect', return_value=conn): | ||
yield conn | ||
|
||
@pytest.fixture | ||
def database(mock_conn): | ||
"""Create a Database instance with mocked connection.""" | ||
with patch('psycopg2.pool.SimpleConnectionPool') as mock_pool: | ||
# Mock the connection pool | ||
pool = MagicMock() | ||
pool.getconn.return_value = mock_conn | ||
mock_pool.return_value = pool | ||
|
||
# Mock migrations | ||
with patch('src.migration.MigrationManager') as mock_manager: | ||
mock_manager_instance = Mock() | ||
mock_manager.return_value = mock_manager_instance | ||
|
||
db = Database() | ||
return db | ||
|
||
def test_database_connection_retry(mock_conn): | ||
"""Test that database connection retries on failure.""" | ||
with patch('psycopg2.pool.SimpleConnectionPool') as mock_pool: | ||
# Make pool creation fail twice then succeed | ||
mock_pool.side_effect = [ | ||
psycopg2.Error("Test error"), | ||
psycopg2.Error("Test error"), | ||
MagicMock() # Successful pool | ||
] | ||
|
||
# Mock migrations | ||
with patch('src.migration.MigrationManager') as mock_manager: | ||
mock_manager_instance = Mock() | ||
mock_manager.return_value = mock_manager_instance | ||
with patch('time.sleep'): # Don't actually sleep in tests | ||
db = Database() | ||
# Verify pool was created | ||
assert db.pool is not None | ||
|
||
def test_get_latest_pois(database, mock_conn): | ||
"""Test fetching latest POI submissions.""" | ||
mock_cursor = MagicMock() | ||
mock_cursor.fetchall.return_value = [ | ||
("poi1", "indexer1"), | ||
("poi1", "indexer2"), | ||
("poi2", "indexer3") | ||
] | ||
mock_conn.cursor.return_value.__enter__.return_value = mock_cursor | ||
|
||
result = database.get_latest_pois("deployment1", 1000) | ||
|
||
assert len(result) == 2 # Two unique POIs | ||
assert result["poi1"] == {"indexer1", "indexer2"} | ||
assert result["poi2"] == {"indexer3"} | ||
|
||
def test_check_notification_sent(database, mock_conn): | ||
"""Test checking if notification was already sent.""" | ||
mock_cursor = MagicMock() | ||
mock_cursor.fetchone.return_value = (True,) | ||
mock_conn.cursor.return_value.__enter__.return_value = mock_cursor | ||
|
||
result = database.check_notification_sent("deployment1", 1000) | ||
assert result is True | ||
|
||
def test_record_notification(database, mock_conn): | ||
"""Test recording a notification.""" | ||
mock_cursor = MagicMock() | ||
mock_conn.cursor.return_value.__enter__.return_value = mock_cursor | ||
|
||
database.record_notification("deployment1", 1000, "test message") | ||
|
||
# Verify the INSERT query was executed with correct parameters | ||
mock_cursor.execute.assert_any_call( | ||
""" | ||
WITH current_pois AS ( | ||
SELECT array_agg(DISTINCT p.poi ORDER BY p.poi) as poi_set | ||
FROM pois p | ||
JOIN blocks b ON b.id = p.block_id | ||
JOIN sg_deployments d ON d.id = p.sg_deployment_id | ||
WHERE d.ipfs_cid = %s AND b.number = %s | ||
) | ||
INSERT INTO poi_notifications (deployment_id, block_number, message, sent_at, poi_set) | ||
SELECT %s, %s, %s, NOW(), c.poi_set::bytea[] | ||
FROM current_pois c | ||
""", | ||
("deployment1", 1000, "deployment1", 1000, "test message") | ||
) | ||
mock_conn.commit.assert_called_once() | ||
|
||
def test_cleanup_old_notifications(database, mock_conn): | ||
"""Test cleaning up old notifications.""" | ||
mock_cursor = MagicMock() | ||
mock_conn.cursor.return_value.__enter__.return_value = mock_cursor | ||
|
||
database.cleanup_old_notifications(days=30) | ||
|
||
# Verify the DELETE query was executed with correct parameters | ||
mock_cursor.execute.assert_any_call( | ||
""" | ||
DELETE FROM poi_notifications | ||
WHERE sent_at < NOW() - INTERVAL '%s days' | ||
""", | ||
(30,) | ||
) | ||
mock_conn.commit.assert_called_once() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
import pytest | ||
from unittest.mock import Mock, MagicMock, patch | ||
from src.migration import MigrationManager | ||
import os | ||
|
||
@pytest.fixture | ||
def mock_conn(): | ||
return MagicMock() | ||
|
||
@pytest.fixture | ||
def manager(mock_conn): | ||
return MigrationManager(mock_conn) | ||
|
||
def test_ensure_migration_table(manager, mock_conn): | ||
"""Test migration table creation.""" | ||
mock_cursor = MagicMock() | ||
mock_conn.cursor.return_value.__enter__.return_value = mock_cursor | ||
|
||
# Reset the mock to clear any previous calls | ||
mock_conn.commit.reset_mock() | ||
|
||
manager._ensure_migration_table() | ||
|
||
mock_cursor.execute.assert_called_once() | ||
mock_conn.commit.assert_called_once() | ||
|
||
def test_get_applied_migrations(manager, mock_conn): | ||
"""Test fetching applied migrations.""" | ||
mock_cursor = MagicMock() | ||
mock_cursor.fetchall.return_value = [ | ||
("001_create_notifications_table",), | ||
("002_another_migration",) | ||
] | ||
mock_conn.cursor.return_value.__enter__.return_value = mock_cursor | ||
|
||
result = manager.get_applied_migrations() | ||
assert len(result) == 2 | ||
assert "001_create_notifications_table" in result |
Oops, something went wrong.