-
Notifications
You must be signed in to change notification settings - Fork 10
/
conftest.py
586 lines (458 loc) · 18 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
from unittest.mock import Mock, patch
import boto3
import factory
import pytest
from botocore.stub import Stubber
from django.conf import settings
from django.core.cache import cache
from django.core.management import call_command
from django.db.models.signals import post_save
from moto import mock_aws
from opensearchpy.helpers.test import get_test_client
from pytest_django.lazy_django import skip_if_no_django
from datahub.core.constants import AdministrativeArea
from datahub.core.queues.scheduler import DataHubScheduler
from datahub.core.test_utils import create_test_user, HawkAPITestClient
from datahub.dnb_api.utils import format_dnb_company
from datahub.documents.utils import get_s3_client_for_bucket
from datahub.ingest.constants import TEST_AWS_REGION, TEST_S3_BUCKET_NAME
from datahub.metadata.test.factories import SectorFactory
from datahub.search.apps import get_search_app_by_model, get_search_apps
from datahub.search.bulk_sync import sync_objects
from datahub.search.opensearch import (
alias_exists,
create_index,
delete_alias,
delete_index,
index_exists,
)
from datahub.search.signals import SignalReceiver
@pytest.fixture(scope='session')
def django_db_setup(pytestconfig, django_db_setup, django_db_blocker):
"""Fixture for DB setup."""
with django_db_blocker.unblock():
call_command('loadinitialmetadata', force=True)
@pytest.fixture(scope='session', autouse=True)
def set_faker_locale():
"""Sets the default locale for Faker."""
with factory.Faker.override_default_locale('en_GB'):
yield
@pytest.fixture
def api_request_factory():
"""Django REST framework ApiRequestFactory instance."""
skip_if_no_django()
from rest_framework.test import APIRequestFactory
return APIRequestFactory()
@pytest.fixture
def api_client():
"""Django REST framework ApiClient instance."""
skip_if_no_django()
from rest_framework.test import APIClient
return APIClient()
@pytest.fixture
def hawk_api_client():
"""Hawk API client fixture."""
yield HawkAPITestClient()
class _ReturnValueTracker:
def __init__(self, cls, method_name):
self.return_values = []
self.original_callable = getattr(cls, method_name)
def make_mock(self):
def _spy(*args, **kwargs):
return_value = self.original_callable(*args, **kwargs)
self.return_values.append(return_value)
return return_value
return _spy
@pytest.fixture
def track_return_values(monkeypatch):
"""
Fixture that can be used to track the return values of a callable.
Usage example:
# obj could be a class or a module (for example)
def test_something(track_return_values):
tracker = track_return_values(obj, 'name_of_callable')
...
assert tracker.return_values == [1, 2, 3]
"""
def _patch(obj, callable_name):
tracker = _ReturnValueTracker(obj, callable_name)
monkeypatch.setattr(obj, callable_name, tracker.make_mock())
return tracker
yield _patch
# AWS
@pytest.fixture
def aws_credentials():
"""Mocked AWS credentials for moto."""
with patch.dict('os.environ', {
'AWS_ACCESS_KEY_ID': 'test-key-id',
'AWS_SECRET_ACCESS_KEY': 'test-secret',
'AWS_SECURITY_TOKEN': 'test-token',
'AWS_SESSION_TOKEN': 'test-token',
'AWS_DEFAULT_REGION': TEST_AWS_REGION,
}):
yield
@pytest.fixture
def s3_client(aws_credentials):
"""Fixture for a mocked S3 client.
Also creates a bucket named `test-bucket` in the same region.
"""
with mock_aws():
s3_client = boto3.client('s3', region_name=TEST_AWS_REGION)
s3_client.create_bucket(
Bucket=TEST_S3_BUCKET_NAME,
CreateBucketConfiguration={'LocationConstraint': TEST_AWS_REGION},
)
yield s3_client
@pytest.fixture()
def s3_stubber():
"""S3 stubber using the botocore Stubber class."""
s3_client = get_s3_client_for_bucket('default')
with Stubber(s3_client) as s3_stubber:
yield s3_stubber
@pytest.fixture()
def local_memory_cache():
"""Get local memory cache."""
yield
cache.clear()
@pytest.fixture
def synchronous_thread_pool(monkeypatch):
"""Run everything submitted to thread pools executor in sync."""
monkeypatch.setattr(
'datahub.core.thread_pool._submit_to_thread_pool',
_synchronous_submit_to_thread_pool,
)
@pytest.fixture
def synchronous_on_commit(monkeypatch):
"""During a test run a transaction is never committed, so we have to improvise."""
monkeypatch.setattr('django.db.transaction.on_commit', _synchronous_on_commit)
def _synchronous_submit_to_thread_pool(fn, *args, **kwargs):
fn(*args, **kwargs)
def _synchronous_on_commit(fn):
fn()
@pytest.fixture
def hierarchical_sectors():
"""Creates three test sectors in a hierarchy."""
parent = None
sectors = []
for _ in range(3):
sector = SectorFactory(parent=parent)
sectors.append(sector)
parent = sector
yield sectors
# SEARCH
@pytest.fixture(scope='session')
def _opensearch_client(worker_id):
"""
Makes the OpenSearch test helper client available.
Also patches settings.ES_INDEX_PREFIX using the xdist worker ID so that each process
gets unique indices when running tests using multiple processes using pytest -n.
"""
# pytest's monkeypatch does not work in session fixtures, but there is no need to restore
# the value so we just overwrite it normally
settings.OPENSEARCH_INDEX_PREFIX = f'test_{worker_id}'
from opensearch_dsl.connections import connections
client = get_test_client(nowait=False)
connections.add_connection('default', client)
yield client
@pytest.fixture(scope='session')
def _opensearch_session(_opensearch_client):
"""
Session-scoped fixture that creates OpenSearch indexes that persist for the entire test
session.
"""
# Create models in the test index
for search_app in get_search_apps():
# Clean up in case of any aborted test runs
index_name = search_app.search_model.get_target_index_name()
read_alias = search_app.search_model.get_read_alias()
write_alias = search_app.search_model.get_write_alias()
if index_exists(index_name):
delete_index(index_name)
if alias_exists(read_alias):
delete_alias(read_alias)
if alias_exists(write_alias):
delete_alias(write_alias)
# Create indices and aliases
alias_names = (read_alias, write_alias)
create_index(
index_name, search_app.search_model._doc_type.mapping, alias_names=alias_names,
)
yield _opensearch_client
for search_app in get_search_apps():
delete_index(search_app.search_model.get_target_index_name())
@pytest.fixture
def opensearch(_opensearch_session):
"""
Function-scoped pytest fixture that:
- ensures OpenSearch is available for the test
- deletes all documents from OpenSearch at the end of the test.
"""
yield _opensearch_session
_opensearch_session.indices.refresh()
indices = [search_app.search_model.get_target_index_name() for search_app in get_search_apps()]
_opensearch_session.delete_by_query(
indices,
body={'query': {'match_all': {}}},
)
_opensearch_session.indices.refresh()
@pytest.fixture
def opensearch_with_signals(opensearch, synchronous_on_commit):
"""
Function-scoped pytest fixture that:
- ensures OpenSearch is available for the test
- connects search signal receivers so that OpenSearch documents are automatically
created for model instances saved during the test
- deletes all documents from OpenSearch at the end of the test
Use this fixture when specifically testing search signal receivers.
Call opensearch_with_signals.indices.refresh() after creating objects to refresh all
search indices and ensure synced objects are available for querying.
"""
for search_app in get_search_apps():
search_app.connect_signals()
yield opensearch
for search_app in get_search_apps():
search_app.disconnect_signals()
class SavedObjectCollector:
"""
Collects the search apps of saved search objects and indexes those apps in bulk in
OpenSearch.
"""
def __init__(self, opensearch_client, apps_to_collect):
"""
Initialises the collector.
:param apps_to_collect: the search apps to monitor the `post_save` signal for (and sync
saved objects for when `flush_and_refresh()` is called)
"""
self.collected_apps = set()
self.opensearch_client = opensearch_client
self.signal_receivers_to_connect = [
SignalReceiver(post_save, search_app.queryset.model, self._collect)
for search_app in set(apps_to_collect)
]
# Disconnect all existing search post_save signal receivers (in case they were connected)
self.signal_receivers_to_disable = [
receiver
for search_app in get_search_apps()
for receiver in search_app.get_signal_receivers()
if receiver.signal is post_save
]
def __enter__(self):
"""Enable the collector by connecting our post_save signals."""
for receiver in self.signal_receivers_to_connect:
receiver.connect()
for receiver in self.signal_receivers_to_disable:
receiver.disable()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Disable the collector by disconnecting our post_save signals."""
for receiver in self.signal_receivers_to_connect:
receiver.disconnect()
for receiver in self.signal_receivers_to_disable:
receiver.enable()
def flush_and_refresh(self):
"""Sync objects of all collected apps to OpenSearch and refresh search indices."""
for search_app in self.collected_apps:
search_model = search_app.search_model
read_indices, write_index = search_model.get_read_and_write_indices()
sync_objects(search_model, search_app.queryset.all(), read_indices, write_index)
self.collected_apps.clear()
self.opensearch_client.indices.refresh()
def _collect(self, obj):
"""
Logic run on post_save for models of all search apps.
Note: This does not use transaction.on_commit(), because transactions in tests
are not committed. Be careful if reusing this logic in production code (as you would
usually want to delay syncing until the transaction is committed).
"""
model = obj.__class__
search_app = get_search_app_by_model(model)
self.collected_apps.add(search_app)
@pytest.fixture
def opensearch_collector_context_manager(opensearch, synchronous_on_commit, request):
"""
Slightly lower-level version of opensearch_with_collector.
Function-scoped pytest fixture that:
- ensures OpenSearch is available for the test
- deletes all documents from OpenSearch at the end of the test
- yields a context manager that can be used to collects all model objects saved so
they can be synced to OpenSearch in bulk
Call opensearch_collector_context_manager.flush_and_refresh() to sync collected objects to
OpenSearch and refresh all indices.
In most cases, you should not use this fixture directly, but use opensearch_with_collector or
opensearch_with_signals instead.
"""
marker_apps = {
app
for marker in request.node.iter_markers('opensearch_collector_apps')
for app in marker.args
}
apps = marker_apps or get_search_apps()
yield SavedObjectCollector(opensearch, apps)
@pytest.fixture
def opensearch_with_collector(opensearch_collector_context_manager):
"""
Function-scoped pytest fixture that:
- ensures OpenSearch is available for the test
- collects all model objects saved so they can be synced to OpenSearch in bulk
- deletes all documents from OpenSearch at the end of the test
Use this fixture for search tests that don't specifically test signal receivers.
Call opensearch_with_collector.flush_and_refresh() to sync collected objects to OpenSearch and
refresh all indices.
"""
with opensearch_collector_context_manager as collector:
yield collector
@pytest.fixture
def mock_opensearch_client(monkeypatch):
"""Patches the OpenSearch library so that a mock client is used."""
mock_client = Mock()
monkeypatch.setattr('opensearch_dsl.connections.connections.get_connection', mock_client)
yield mock_client
@pytest.fixture
def mock_connection_for_create_index(monkeypatch):
"""Patches the OpenSearch library so that a mock client is used."""
mock_client = Mock()
monkeypatch.setattr('opensearch_dsl.connections.connections.get_connection', mock_client)
monkeypatch.setattr('opensearch_dsl.index.get_connection', mock_client)
yield mock_client
@pytest.fixture
def dnb_response_uk():
"""
Returns a UK-based DNB company.
"""
return {
'results': [
{
'address_country': 'GB',
'address_county': '',
'address_area_name': None,
'address_line_1': 'Unit 10, Ockham Drive',
'address_line_2': '',
'address_postcode': 'UB6 0F2',
'address_town': 'GREENFORD',
'annual_sales': 50651895.0,
'annual_sales_currency': 'USD',
'domain': 'foo.com',
'duns_number': '123456789',
'employee_number': 260,
'global_ultimate_duns_number': '291332174',
'global_ultimate_primary_name': 'FOO BICYCLE LIMITED',
'industry_codes': [
{
'code': '336991',
'description': 'Motorcycle, Bicycle, and Parts Manufacturing',
'priority': 1,
'typeDescription': 'North American Industry Classification System 2017',
'typeDnbCode': 30832,
},
{
'code': '1927',
'description': 'Motorcycle Manufacturing',
'priority': 1,
'typeDescription': 'D&B Hoovers Industry Code',
'typeDnbCode': 25838,
},
],
'is_annual_sales_estimated': None,
'is_employees_number_estimated': True,
'is_out_of_business': False,
'legal_status': 'corporation',
'primary_industry_codes': [
{
'usSicV4': '3751',
'usSicV4Description': 'Mfg motorcycles/bicycles',
},
],
'primary_name': 'FOO BICYCLE LIMITED',
'registered_address_country': 'GB',
'registered_address_county': '',
'registered_address_line_1': 'C/O LONE VARY',
'registered_address_line_2': '',
'registered_address_postcode': 'UB6 0F2',
'registered_address_town': 'GREENFORD',
'registration_numbers': [
{
'registration_number': '01261539',
'registration_type': 'uk_companies_house_number',
},
],
'trading_names': [],
},
],
}
@pytest.fixture
def formatted_dnb_company(dnb_response_uk):
"""
Get formatted DNB company data.
"""
return format_dnb_company(dnb_response_uk['results'][0])
@pytest.fixture
def formatted_dnb_company_area(dnb_response_uk):
"""
Get formatted DNB company data.
"""
dnb_response_area = dnb_response_uk['results'][0].copy()
return format_dnb_company(dnb_response_area)
@pytest.fixture
def formatted_dnb_company_area_non_uk(dnb_response_non_uk):
"""
Get formatted DNB company data.
"""
dnb_response_area = dnb_response_non_uk['results'][0].copy()
administrative_areas = [area.value.name for area in AdministrativeArea]
if dnb_response_area['address_area_name'] in administrative_areas:
dnb_response_area.update(
address_area_name=dnb_response_area['address_area_name'],
)
return format_dnb_company(dnb_response_area)
@pytest.fixture
def search_support_user():
"""A user with permissions for search_support views."""
return create_test_user(permission_codenames=['view_simplemodel', 'view_relatedmodel'])
def pytest_addoption(parser):
"""Adds a new flag to pytest to skip excluded tests"""
parser.addoption(
'--skip-excluded', '--se',
action='store_true',
default=False,
help='Skip excluded tests from running',
)
def pytest_collection_modifyitems(config, items):
"""Skip excluded tests"""
if config.getoption('--skip-excluded') is False:
return
for item in items:
if any([
m.name == 'excluded' or m.name.startswith('excluded_')
for m in item.iter_markers()
]):
item.add_marker(pytest.mark.skip(reason='Test marked as excluded'))
@pytest.fixture()
def queue():
with DataHubScheduler('burst-no-fork') as queue:
try:
yield queue
finally:
queue.clear()
@pytest.fixture()
def async_queue():
with DataHubScheduler('burst-no-fork', is_async=True) as queue:
try:
yield queue
finally:
queue.clear()
@pytest.fixture()
def fork_queue():
"""Don't use this if you are going to do work as this will block any tests running"""
with DataHubScheduler('fork') as queue:
try:
yield queue
finally:
queue.clear()
@pytest.fixture
def metadata_client(hawk_api_client):
"""Hawk API client fixture configured to use credentials with the metadata scope."""
hawk_api_client.set_credentials(
'test-id-with-metadata-scope',
'test-key-with-metadata-scope',
)
yield hawk_api_client