diff --git a/firebase_admin/functions.py b/firebase_admin/functions.py new file mode 100644 index 000000000..b39ee0a66 --- /dev/null +++ b/firebase_admin/functions.py @@ -0,0 +1,437 @@ +# Copyright 2024 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Firebase Functions module.""" + +from __future__ import annotations +from datetime import datetime, timedelta +from urllib import parse +import re +import json +from base64 import b64encode +from typing import Any, Optional, Dict +from dataclasses import dataclass +from google.auth.compute_engine import Credentials as ComputeEngineCredentials + +import requests +import firebase_admin +from firebase_admin import App +from firebase_admin import _http_client +from firebase_admin import _utils + +_FUNCTIONS_ATTRIBUTE = '_functions' + +__all__ = [ + 'TaskOptions', + + 'task_queue', +] + + +_CLOUD_TASKS_API_RESOURCE_PATH = \ + 'projects/{project_id}/locations/{location_id}/queues/{resource_id}/tasks' +_CLOUD_TASKS_API_URL_FORMAT = \ + 'https://cloudtasks.googleapis.com/v2/' + _CLOUD_TASKS_API_RESOURCE_PATH +_FIREBASE_FUNCTION_URL_FORMAT = \ + 'https://{location_id}-{project_id}.cloudfunctions.net/{resource_id}' + +_FUNCTIONS_HEADERS = { + 'X-GOOG-API-FORMAT-VERSION': '2', + 'X-FIREBASE-CLIENT': 'fire-admin-python/{0}'.format(firebase_admin.__version__), +} + +# Default canonical location ID of the task queue. +_DEFAULT_LOCATION = 'us-central1' + +def _get_functions_service(app) -> _FunctionsService: + return _utils.get_app_service(app, _FUNCTIONS_ATTRIBUTE, _FunctionsService) + +def task_queue( + function_name: str, + extension_id: Optional[str] = None, + app: Optional[App] = None + ) -> TaskQueue: + """Creates a reference to a TaskQueue for a given function name. + + The function name can be either: + 1. A fully qualified function resource name: + `projects/{project-id}/locations/{location-id}/functions/{function-name}` + + 2. A partial resource name with location and function name, in which case + the runtime project ID is used: + `locations/{location-id}/functions/{function-name}` + + 3. A partial function name, in which case the runtime project ID and the + default location, `us-central1`, is used: + `{function-name}` + + Args: + function_name: Name of the function. + extension_id: Firebase extension ID (optional). + app: An App instance (optional). + + Returns: + TaskQueue: A TaskQueue instance. + + Raises: + ValueError: If the input arguments are invalid. + """ + return _get_functions_service(app).task_queue(function_name, extension_id) + +class _FunctionsService: + """Service class that implements Firebase Functions functionality.""" + def __init__(self, app: App): + self._project_id = app.project_id + if not self._project_id: + raise ValueError( + 'Project ID is required to access the Cloud Functions service. Either set the ' + 'projectId option, or use service account credentials. Alternatively, set the ' + 'GOOGLE_CLOUD_PROJECT environment variable.') + + self._credential = app.credential.get_credential() + self._http_client = _http_client.JsonHttpClient(credential=self._credential) + + def task_queue(self, function_name: str, extension_id: Optional[str] = None) -> TaskQueue: + """Creates a TaskQueue instance.""" + return TaskQueue( + function_name, extension_id, self._project_id, self._credential, self._http_client) + + @classmethod + def handle_functions_error(cls, error: Any): + """Handles errors received from the Cloud Functions API.""" + + return _utils.handle_platform_error_from_requests(error) + +class TaskQueue: + """TaskQueue class that implements Firebase Cloud Tasks Queues functionality.""" + def __init__( + self, + function_name: str, + extension_id: Optional[str], + project_id, + credential, + http_client + ) -> None: + + # Validate function_name + _Validators.check_non_empty_string('function_name', function_name) + + self._project_id = project_id + self._credential = credential + self._http_client = http_client + self._function_name = function_name + self._extension_id = extension_id + # Parse resources from function_name + self._resource = self._parse_resource_name(self._function_name, 'functions') + + # Apply defaults and validate resource_id + self._resource.project_id = self._resource.project_id or self._project_id + self._resource.location_id = self._resource.location_id or _DEFAULT_LOCATION + _Validators.check_non_empty_string('resource.resource_id', self._resource.resource_id) + # Validate extension_id if provided and edit resources depending + if self._extension_id is not None: + _Validators.check_non_empty_string('extension_id', self._extension_id) + self._resource.resource_id = f'ext-{self._extension_id}-{self._resource.resource_id}' + + + def enqueue(self, task_data: Any, opts: Optional[TaskOptions] = None) -> str: + """Creates a task and adds it to the queue. Tasks cannot be updated after creation. + + This action requires `cloudtasks.tasks.create` IAM permission on the service account. + + Args: + task_data: The data payload of the task. + opts: Options when enqueuing a new task (optional). + + Raises: + FirebaseError: If an error occurs while requesting the task to be queued by + the Cloud Functions service. + ValueError: If the input arguments are invalid. + + Returns: + str: The ID of the task relative to this queue. + """ + task = self._validate_task_options(task_data, self._resource, opts) + service_url = self._get_url(self._resource, _CLOUD_TASKS_API_URL_FORMAT) + task_payload = self._update_task_payload(task, self._resource, self._extension_id) + try: + resp = self._http_client.body( + 'post', + url=service_url, + headers=_FUNCTIONS_HEADERS, + json={'task': task_payload.__dict__} + ) + task_name = resp.get('name', None) + task_resource = \ + self._parse_resource_name(task_name, f'queues/{self._resource.resource_id}/tasks') + return task_resource.resource_id + except requests.exceptions.RequestException as error: + raise _FunctionsService.handle_functions_error(error) + + def delete(self, task_id: str) -> None: + """Deletes an enqueued task if it has not yet started. + + This action requires `cloudtasks.tasks.delete` IAM permission on the service account. + + Args: + task_id: The ID of the task relative to this queue. + + Raises: + FirebaseError: If an error occurs while requesting the task to be deleted by + the Cloud Functions service. + ValueError: If the input arguments are invalid. + """ + _Validators.check_non_empty_string('task_id', task_id) + service_url = self._get_url(self._resource, _CLOUD_TASKS_API_URL_FORMAT + f'/{task_id}') + try: + self._http_client.body( + 'delete', + url=service_url, + headers=_FUNCTIONS_HEADERS, + ) + except requests.exceptions.RequestException as error: + raise _FunctionsService.handle_functions_error(error) + + + def _parse_resource_name(self, resource_name: str, resource_id_key: str) -> Resource: + """Parses a full or partial resource path into a ``Resource``.""" + if '/' not in resource_name: + return Resource(resource_id=resource_name) + + reg = f'^(projects/([^/]+)/)?locations/([^/]+)/{resource_id_key}/([^/]+)$' + match = re.search(reg, resource_name) + if match is None: + raise ValueError('Invalid resource name format.') + return Resource(project_id=match[2], location_id=match[3], resource_id=match[4]) + + def _get_url(self, resource: Resource, url_format: str) -> str: + """Generates url path from a ``Resource`` and url format string.""" + return url_format.format( + project_id=resource.project_id, + location_id=resource.location_id, + resource_id=resource.resource_id) + + def _validate_task_options( + self, + data: Any, + resource: Resource, + opts: Optional[TaskOptions] = None + ) -> Task: + """Validate and create a Task from optional ``TaskOptions``.""" + task_http_request = { + 'url': '', + 'oidc_token': { + 'service_account_email': '' + }, + 'body': b64encode(json.dumps(data).encode()).decode(), + 'headers': { + 'Content-Type': 'application/json', + } + } + task = Task(http_request=task_http_request) + + if opts is not None: + if opts.headers is not None: + task.http_request['headers'] = {**task.http_request['headers'], **opts.headers} + if opts.schedule_time is not None and opts.schedule_delay_seconds is not None: + raise ValueError( + 'Both sechdule_delay_seconds and schedule_time cannot be set at the same time.') + if opts.schedule_time is not None and opts.schedule_delay_seconds is None: + if not isinstance(opts.schedule_time, datetime): + raise ValueError('schedule_time should be UTC datetime.') + task.schedule_time = opts.schedule_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') + if opts.schedule_delay_seconds is not None and opts.schedule_time is None: + if not isinstance(opts.schedule_delay_seconds, int) \ + or opts.schedule_delay_seconds < 0: + raise ValueError('schedule_delay_seconds should be positive int.') + schedule_time = datetime.utcnow() + timedelta(seconds=opts.schedule_delay_seconds) + task.schedule_time = schedule_time.strftime('%Y-%m-%dT%H:%M:%S.%fZ') + if opts.dispatch_deadline_seconds is not None: + if not isinstance(opts.dispatch_deadline_seconds, int) \ + or opts.dispatch_deadline_seconds < 15 \ + or opts.dispatch_deadline_seconds > 1800: + raise ValueError( + 'dispatch_deadline_seconds should be int in the range of 15s to ' + '1800s (30 mins).') + task.dispatch_deadline = f'{opts.dispatch_deadline_seconds}s' + if opts.task_id is not None: + if not _Validators.is_task_id(opts.task_id): + raise ValueError( + 'task_id can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-)' + ', or underscores (_). The maximum length is 500 characters.') + task.name = self._get_url( + resource, _CLOUD_TASKS_API_RESOURCE_PATH + f'/{opts.task_id}') + if opts.uri is not None: + if not _Validators.is_url(opts.uri): + raise ValueError( + 'uri must be a valid RFC3986 URI string using the https or http schema.') + task.http_request['url'] = opts.uri + return task + + def _update_task_payload(self, task: Task, resource: Resource, extension_id: str) -> Task: + """Prepares task to be sent with credentials.""" + # Get function url from task or generate from resources + if not _Validators.is_non_empty_string(task.http_request['url']): + task.http_request['url'] = self._get_url(resource, _FIREBASE_FUNCTION_URL_FORMAT) + # If extension id is provided, it emplies that it is being run from a deployed extension. + # Meaning that it's credential should be a Compute Engine Credential. + if _Validators.is_non_empty_string(extension_id) and \ + isinstance(self._credential, ComputeEngineCredentials): + + id_token = self._credential.token + task.http_request['headers'] = \ + {**task.http_request['headers'], 'Authorization': f'Bearer ${id_token}'} + # Delete oidc token + del task.http_request['oidc_token'] + else: + task.http_request['oidc_token'] = \ + {'service_account_email': self._credential.service_account_email} + return task + + +class _Validators: + """A collection of data validation utilities.""" + @classmethod + def check_non_empty_string(cls, label: str, value: Any): + """Checks if given value is a non-empty string and throws error if not.""" + if not isinstance(value, str): + raise ValueError('{0} "{1}" must be a string.'.format(label, value)) + if value == '': + raise ValueError('{0} "{1}" must be a non-empty string.'.format(label, value)) + + @classmethod + def is_non_empty_string(cls, value: Any): + """Checks if given value is a non-empty string and returns bool.""" + if not isinstance(value, str) or value == '': + return False + return True + + @classmethod + def is_task_id(cls, task_id: Any): + """Checks if given value is a valid task id.""" + reg = '^[A-Za-z0-9_-]+$' + if re.match(reg, task_id) is not None and len(task_id) <= 500: + return True + return False + + @classmethod + def is_url(cls, url: Any): + """Checks if given value is a valid url.""" + if not isinstance(url, str): + return False + try: + parsed = parse.urlparse(url) + if not parsed.netloc or parsed.scheme not in ['http', 'https']: + return False + return True + except Exception: # pylint: disable=broad-except + return False + + +@dataclass +class TaskOptions: + """Task Options that can be applied to a Task. + Args: + schedule_delay_seconds: The number of seconds after the current time at which to attempt or + retry the task. Should only be set if ``schedule_time`` is not set. + + schedule_time: The time when the task is scheduled to be attempted or retried. Should only + be set if ``schedule_delay_seconds`` is not set. + + dispatch_deadline_seconds: The deadline for requests sent to the worker. If the worker does + not respond by this deadline then the request is cancelled and the attempt is marked as + a ``DEADLINE_EXCEEDED`` failure. Cloud Tasks will retry the task according to the + ``RetryConfig``. The default is 10 minutes. The deadline must be in the range of 15 + seconds and 30 minutes (1800 seconds). + + task_id: The ID to use for the enqueued task. If not provided, one will be automatically + generated. + + If provided, an explicitly specified task ID enables task de-duplication. + Task IDs should be strings that contain only letters ([A-Za-z]), numbers ([0-9]), + hyphens (-), and underscores (_) with a maximum length of 500 characters. If a task's + ID is identical to that of an existing task or a task that was deleted or executed + recently then the call will throw an error with code "functions/task-already-exists". + Another task with the same ID can't be created for ~1hour after the original task was + deleted or executed. + + Because there is an extra lookup cost to identify duplicate task IDs, setting ID + significantly increases latency. + + Also, note that the infrastructure relies on an approximately uniform distribution + of task IDs to store and serve tasks efficiently. For this reason, using hashed strings + for the task ID or for the prefix of the task ID is recommended. Choosing task IDs that + are sequential or have sequential prefixes, for example using a timestamp, causes an + increase in latency and error rates in all task commands. + + "Push IDs" from the Firebase Realtime Database make poor IDs because they are based on + timestamps and will cause contention (slowdowns) in your task queue. Reversed push IDs + however form a perfect distribution and are an ideal key. To reverse a string in Python + use ``reversedString = someString[::-1]`` + + headers: HTTP request headers to include in the request to the task queue function. These + headers represent a subset of the headers that will accompany the task's HTTP request. + Some HTTP request headers will be ignored or replaced: `Authorization`, `Host`, + `Content-Length`, `User-Agent` and others cannot be overridden. + + A complete list of these ignored or replaced headers can be found in the following + definition of the HttpRequest.headers property: + https://cloud.google.com/tasks/docs/reference/rest/v2/projects.locations.queues.tasks#httprequest + + By default, Content-Type is set to 'application/json'. + + The size of the headers must be less than 80KB. + + uri: The full URL that the request will be sent to. Must be a valid RFC3986 https or + http URL. + """ + schedule_delay_seconds: Optional[int] = None + schedule_time: Optional[datetime] = None + dispatch_deadline_seconds: Optional[int] = None + task_id: Optional[str] = None + headers: Optional[Dict[str, str]] = None + uri: Optional[str] = None + +@dataclass +class Task: + """Contains the relevant fields for enqueueing tasks that trigger Cloud Functions. + + This is a limited subset of the Cloud Functions `Task` resource. See the following + page for definitions of this class's properties: + https://cloud.google.com/tasks/docs/reference/rest/v2/projects.locations.queues.tasks#resource:-task + + Args: + httpRequest: The request to be made by the task worker. + name: The name of the function. See the Cloud docs for the format of this property. + schedule_time: The time when the task is scheduled to be attempted or retried. + dispatch_deadline: The deadline for requests sent to the worker. + """ + http_request: Dict[str, Optional[str | dict]] + name: Optional[str] = None + schedule_time: Optional[str] = None + dispatch_deadline: Optional[str] = None + + +@dataclass +class Resource: + """Contains the parsed address of a resource. + + Args: + resource_id: The ID of the resource. + project_id: The project ID of the resource. + location_id: The location ID of the resource. + """ + resource_id: str + project_id: Optional[str] = None + location_id: Optional[str] = None diff --git a/integration/test_functions.py b/integration/test_functions.py new file mode 100644 index 000000000..606798436 --- /dev/null +++ b/integration/test_functions.py @@ -0,0 +1,56 @@ +# Copyright 2024 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Integration tests for firebase_admin.functions module.""" + +import pytest + +import firebase_admin +from firebase_admin import functions +from integration import conftest + + +@pytest.fixture(scope='module') +def app(request): + cred, _ = conftest.integration_conf(request) + return firebase_admin.initialize_app(cred, name='integration-functions') + + +class TestFunctions: + + _TEST_FUNCTIONS_PARAMS = [ + {'function_name': 'function-name'}, + {'function_name': 'projects/test-project/locations/test-location/functions/function-name'}, + {'function_name': 'function-name', 'extension_id': 'extension-id'}, + { + 'function_name': \ + 'projects/test-project/locations/test-location/functions/function-name', + 'extension_id': 'extension-id' + } + ] + + @pytest.mark.parametrize('task_queue_params', _TEST_FUNCTIONS_PARAMS) + def test_task_queue(self, task_queue_params): + queue = functions.task_queue(**task_queue_params) + assert queue is not None + assert callable(queue.enqueue) + assert callable(queue.delete) + + @pytest.mark.parametrize('task_queue_params', _TEST_FUNCTIONS_PARAMS) + def test_task_queue_app(self, task_queue_params, app): + assert app.name == 'integration-functions' + queue = functions.task_queue(**task_queue_params, app=app) + assert queue is not None + assert callable(queue.enqueue) + assert callable(queue.delete) diff --git a/tests/test_functions.py b/tests/test_functions.py new file mode 100644 index 000000000..75809c1ad --- /dev/null +++ b/tests/test_functions.py @@ -0,0 +1,301 @@ +# Copyright 2024 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Test cases for the firebase_admin.functions module.""" + +from datetime import datetime, timedelta +import json +import time +import pytest + +import firebase_admin +from firebase_admin import functions +from tests import testutils + + +_DEFAULT_DATA = {'city': 'Seattle'} +_CLOUD_TASKS_URL = 'https://cloudtasks.googleapis.com/v2/' +_DEFAULT_TASK_PATH = \ + 'projects/test-project/locations/us-central1/queues/test-function-name/tasks/test-task-id' +_DEFAULT_REQUEST_URL = \ + _CLOUD_TASKS_URL + 'projects/test-project/locations/us-central1/queues/test-function-name/tasks' +_DEFAULT_TASK_URL = _CLOUD_TASKS_URL + _DEFAULT_TASK_PATH +_DEFAULT_RESPONSE = json.dumps({'name': _DEFAULT_TASK_PATH}) +_ENQUEUE_TIME = datetime.utcnow() +_SCHEDULE_TIME = _ENQUEUE_TIME + timedelta(seconds=100) + +class TestTaskQueue: + @classmethod + def setup_class(cls): + cred = testutils.MockCredential() + firebase_admin.initialize_app(cred, {'projectId': 'test-project'}) + + @classmethod + def teardown_class(cls): + testutils.cleanup_apps() + + def _instrument_functions_service(self, app=None, status=200, payload=_DEFAULT_RESPONSE): + if not app: + app = firebase_admin.get_app() + functions_service = functions._get_functions_service(app) + recorder = [] + functions_service._http_client.session.mount( + _CLOUD_TASKS_URL, + testutils.MockAdapter(payload, status, recorder)) + return functions_service, recorder + + def test_task_queue_no_project_id(self): + def evaluate(): + app = firebase_admin.initialize_app(testutils.MockCredential(), name='no-project-id') + with pytest.raises(ValueError): + functions.task_queue('test-function-name', app=app) + testutils.run_without_project_id(evaluate) + + @pytest.mark.parametrize('function_name', [ + 'projects/test-project/locations/us-central1/functions/test-function-name', + 'locations/us-central1/functions/test-function-name', + 'test-function-name', + ]) + def test_task_queue_function_name(self, function_name): + queue = functions.task_queue(function_name) + assert queue._resource.resource_id == 'test-function-name' + assert queue._resource.project_id == 'test-project' + assert queue._resource.location_id == 'us-central1' + + def test_task_queue_empty_function_name_error(self): + with pytest.raises(ValueError) as excinfo: + functions.task_queue('') + assert str(excinfo.value) == 'function_name "" must be a non-empty string.' + + def test_task_queue_non_string_function_name_error(self): + with pytest.raises(ValueError) as excinfo: + functions.task_queue(1234) + assert str(excinfo.value) == 'function_name "1234" must be a string.' + + @pytest.mark.parametrize('function_name', [ + '/test', + 'test/', + 'test-project/us-central1/test-function-name', + 'projects/test-project/functions/test-function-name', + 'functions/test-function-name', + ]) + def test_task_queue_invalid_function_name_error(self, function_name): + with pytest.raises(ValueError) as excinfo: + functions.task_queue(function_name) + assert str(excinfo.value) == 'Invalid resource name format.' + + def test_task_queue_extension_id(self): + queue = functions.task_queue("test-function-name", "test-extension-id") + assert queue._resource.resource_id == 'ext-test-extension-id-test-function-name' + assert queue._resource.project_id == 'test-project' + assert queue._resource.location_id == 'us-central1' + + def test_task_queue_empty_extension_id_error(self): + with pytest.raises(ValueError) as excinfo: + functions.task_queue('test-function-name', '') + assert str(excinfo.value) == 'extension_id "" must be a non-empty string.' + + def test_task_queue_non_string_extension_id_error(self): + with pytest.raises(ValueError) as excinfo: + functions.task_queue('test-function-name', 1234) + assert str(excinfo.value) == 'extension_id "1234" must be a string.' + + + def test_task_enqueue(self): + _, recorder = self._instrument_functions_service() + queue = functions.task_queue('test-function-name') + task_id = queue.enqueue(_DEFAULT_DATA) + assert len(recorder) == 1 + assert recorder[0].method == 'POST' + assert recorder[0].url == _DEFAULT_REQUEST_URL + assert recorder[0].headers['Content-Type'] == 'application/json' + assert recorder[0].headers['Authorization'] == 'Bearer mock-token' + assert task_id == 'test-task-id' + + def test_task_enqueue_with_extension(self): + resource_name = ( + 'projects/test-project/locations/us-central1/queues/' + 'ext-test-extension-id-test-function-name/tasks' + ) + extension_response = json.dumps({'name': resource_name + '/test-task-id'}) + _, recorder = self._instrument_functions_service(payload=extension_response) + queue = functions.task_queue('test-function-name', 'test-extension-id') + task_id = queue.enqueue(_DEFAULT_DATA) + assert len(recorder) == 1 + assert recorder[0].method == 'POST' + assert recorder[0].url == _CLOUD_TASKS_URL + resource_name + assert recorder[0].headers['Content-Type'] == 'application/json' + assert recorder[0].headers['Authorization'] == 'Bearer mock-token' + assert task_id == 'test-task-id' + + def test_task_delete(self): + _, recorder = self._instrument_functions_service() + queue = functions.task_queue('test-function-name') + queue.delete('test-task-id') + assert len(recorder) == 1 + assert recorder[0].method == 'DELETE' + assert recorder[0].url == _DEFAULT_TASK_URL + + +class TestTaskQueueOptions: + + _DEFAULT_TASK_OPTS = {'schedule_delay_seconds': None, 'schedule_time': None, \ + 'dispatch_deadline_seconds': None, 'task_id': None, 'headers': None} + + non_alphanumeric_chars = [ + ',', '.', '?', '!', ':', ';', "'", '"', '(', ')', '[', ']', '{', '}', + '@', '&', '*', '+', '=', '$', '%', '#', '~', '\\', '/', '|', '^', + '\t', '\n', '\r', '\f', '\v', '\0', '\a', '\b', + 'é', 'ç', 'ö', '❤️', '€', '¥', '£', '←', '→', '↑', '↓', 'π', 'Ω', 'ß' + ] + + @classmethod + def setup_class(cls): + cred = testutils.MockCredential() + firebase_admin.initialize_app(cred, {'projectId': 'test-project'}) + + @classmethod + def teardown_class(cls): + testutils.cleanup_apps() + + def _instrument_functions_service(self, app=None, status=200, payload=_DEFAULT_RESPONSE): + if not app: + app = firebase_admin.get_app() + functions_service = functions._get_functions_service(app) + recorder = [] + functions_service._http_client.session.mount( + _CLOUD_TASKS_URL, + testutils.MockAdapter(payload, status, recorder)) + return functions_service, recorder + + + @pytest.mark.parametrize('task_opts_params', [ + { + 'schedule_delay_seconds': 100, + 'schedule_time': None, + 'dispatch_deadline_seconds': 200, + 'task_id': 'test-task-id', + 'headers': {'x-test-header': 'test-header-value'}, + 'uri': 'https://google.com' + }, + { + 'schedule_delay_seconds': None, + 'schedule_time': _SCHEDULE_TIME, + 'dispatch_deadline_seconds': 200, + 'task_id': 'test-task-id', + 'headers': {'x-test-header': 'test-header-value'}, + 'uri': 'http://google.com' + }, + ]) + def test_task_options(self, task_opts_params): + _, recorder = self._instrument_functions_service() + queue = functions.task_queue('test-function-name') + task_opts = functions.TaskOptions(**task_opts_params) + queue.enqueue(_DEFAULT_DATA, task_opts) + + assert len(recorder) == 1 + task = json.loads(recorder[0].body.decode())['task'] + + schedule_time = datetime.fromisoformat(task['schedule_time'][:-1]) + delta = abs(schedule_time - _SCHEDULE_TIME) + assert delta <= timedelta(seconds=15) + + assert task['dispatch_deadline'] == '200s' + assert task['http_request']['headers']['x-test-header'] == 'test-header-value' + assert task['http_request']['url'] in ['http://google.com', 'https://google.com'] + assert task['name'] == _DEFAULT_TASK_PATH + + + def test_schedule_set_twice_error(self): + _, recorder = self._instrument_functions_service() + opts = functions.TaskOptions(schedule_delay_seconds=100, schedule_time=datetime.utcnow()) + queue = functions.task_queue('test-function-name') + with pytest.raises(ValueError) as excinfo: + queue.enqueue(_DEFAULT_DATA, opts) + assert len(recorder) == 0 + assert str(excinfo.value) == \ + 'Both sechdule_delay_seconds and schedule_time cannot be set at the same time.' + + + @pytest.mark.parametrize('schedule_time', [ + time.time(), + str(datetime.utcnow()), + datetime.utcnow().isoformat(), + datetime.utcnow().isoformat() + 'Z', + '', ' ' + ]) + def test_invalid_schedule_time_error(self, schedule_time): + _, recorder = self._instrument_functions_service() + opts = functions.TaskOptions(schedule_time=schedule_time) + queue = functions.task_queue('test-function-name') + with pytest.raises(ValueError) as excinfo: + queue.enqueue(_DEFAULT_DATA, opts) + assert len(recorder) == 0 + assert str(excinfo.value) == 'schedule_time should be UTC datetime.' + + + @pytest.mark.parametrize('schedule_delay_seconds', [ + -1, '100', '-1', '', ' ', -1.23, 1.23 + ]) + def test_invalid_schedule_delay_seconds_error(self, schedule_delay_seconds): + _, recorder = self._instrument_functions_service() + opts = functions.TaskOptions(schedule_delay_seconds=schedule_delay_seconds) + queue = functions.task_queue('test-function-name') + with pytest.raises(ValueError) as excinfo: + queue.enqueue(_DEFAULT_DATA, opts) + assert len(recorder) == 0 + assert str(excinfo.value) == 'schedule_delay_seconds should be positive int.' + + + @pytest.mark.parametrize('dispatch_deadline_seconds', [ + 14, 1801, -15, -1800, 0, '100', '-1', '', ' ', -1.23, 1.23, + ]) + def test_invalid_dispatch_deadline_seconds_error(self, dispatch_deadline_seconds): + _, recorder = self._instrument_functions_service() + opts = functions.TaskOptions(dispatch_deadline_seconds=dispatch_deadline_seconds) + queue = functions.task_queue('test-function-name') + with pytest.raises(ValueError) as excinfo: + queue.enqueue(_DEFAULT_DATA, opts) + assert len(recorder) == 0 + assert str(excinfo.value) == \ + 'dispatch_deadline_seconds should be int in the range of 15s to 1800s (30 mins).' + + + @pytest.mark.parametrize('task_id', [ + '', ' ', 'task/1', 'task.1', 'a'*501, *non_alphanumeric_chars + ]) + def test_invalid_task_id_error(self, task_id): + _, recorder = self._instrument_functions_service() + opts = functions.TaskOptions(task_id=task_id) + queue = functions.task_queue('test-function-name') + with pytest.raises(ValueError) as excinfo: + queue.enqueue(_DEFAULT_DATA, opts) + assert len(recorder) == 0 + assert str(excinfo.value) == ( + 'task_id can contain only letters ([A-Za-z]), numbers ([0-9]), ' + 'hyphens (-), or underscores (_). The maximum length is 500 characters.' + ) + + @pytest.mark.parametrize('uri', [ + '', ' ', 'a', 'foo', 'image.jpg', [], {}, True, 'google.com', 'www.google.com' + ]) + def test_invalid_uri_error(self, uri): + _, recorder = self._instrument_functions_service() + opts = functions.TaskOptions(uri=uri) + queue = functions.task_queue('test-function-name') + with pytest.raises(ValueError) as excinfo: + queue.enqueue(_DEFAULT_DATA, opts) + assert len(recorder) == 0 + assert str(excinfo.value) == \ + 'uri must be a valid RFC3986 URI string using the https or http schema.' diff --git a/tests/testutils.py b/tests/testutils.py index e52b90d1a..ab4fb40cb 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -18,7 +18,7 @@ import pytest -from google.auth import credentials +from google.auth import credentials, compute_engine from google.auth import transport from requests import adapters from requests import models @@ -119,6 +119,10 @@ class MockGoogleCredential(credentials.Credentials): def refresh(self, request): self.token = 'mock-token' + @property + def service_account_email(self): + return 'mock-email' + class MockCredential(firebase_admin.credentials.Base): """A mock Firebase credential implementation.""" @@ -129,6 +133,19 @@ def __init__(self): def get_credential(self): return self._g_credential +class MockGoogleComputeEngineCredential(compute_engine.Credentials): + """A mock Compute Engine credential""" + def refresh(self, request): + self.token = 'mock-compute-engine-token' + +class MockComputeEngineCredential(firebase_admin.credentials.Base): + """A mock Firebase credential implementation.""" + + def __init__(self): + self._g_credential = MockGoogleComputeEngineCredential() + + def get_credential(self): + return self._g_credential class MockMultiRequestAdapter(adapters.HTTPAdapter): """A mock HTTP adapter that supports multiple responses for the Python requests module."""