Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to Filter Pub/Sub Messages by Attributes for Cloud Functions #156

Open
selimacerbas opened this issue Jan 3, 2025 · 4 comments
Open

Comments

@selimacerbas
Copy link

selimacerbas commented Jan 3, 2025

Description:
The module for deploying Google Cloud Functions with Pub/Sub triggers does not support filtering messages at the subscription level. As a result, all messages published to the topic are sent to all Cloud Functions associated with the topic, even if the message attributes are intended to target specific functions. This leads to unnecessary invocation of functions that are not meant to handle those messages.

Problem Statement:
Google Cloud Pub/Sub supports message attribute filtering at the subscription level, which allows only messages matching specific attributes to be delivered to the subscriber. However, the predefined module being used for deploying Cloud Functions does not provide a way to configure these filters on the subscriptions.

Without subscription-level filtering:

All Cloud Functions subscribed to the same topic will process every message published to that topic.
It creates unnecessary overhead for the functions that receive messages they are not designed to handle.
Developers are forced to implement manual filtering logic inside the Cloud Functions themselves, which is inefficient and increases costs.
Example Scenario:
Objective:

We want to use a single Pub/Sub topic to handle scheduled messages from Cloud Scheduler, but ensure that only specific Cloud Functions are triggered based on message attributes.

Desired Flow:

Cloud Scheduler publishes a message to the scheduled-tasks Pub/Sub topic with an attribute function_target. Example message:

{
  "data": "some-payload",
  "attributes": {
    "function_target": "functionA"
  }
}

A subscription for Function A should filter messages with function_target = "functionA".
A subscription for Function B should filter messages with function_target = "functionB".

Current Behavior:

The predefined module creates subscriptions without filters.
Both Function A and Function B receive all messages published to the topic, regardless of the function_target attribute.

When we define event_filters property as non null, it simply wants to check for the values that exists under the defined event_type schema. As an example;

**############ ERROR ##############**

│ Error: Error creating function: googleapi: Error 400: Validation failed for trigger projects/***/locations/europe-west2/triggers/project_id: The request was invalid: invalid argument: event type **google.cloud.pubsub.topic.v1.messagePublished** not supported: attribute action not found within event type
│ 
│   with module.function.module.cloud-function.google_cloudfunctions2_function.function,
│   on .terraform/modules/function.cloud-function/main.tf line 21, in resource "google_cloudfunctions2_function" "function":
│   21: resource "google_cloudfunctions2_function" "function" {
│ 
╵
╷
│ Error: Error creating function: googleapi: Error 400: Validation failed for trigger projects/***/locations/europe-west6/triggers/project_id: The request was invalid: invalid argument: event type **google.cloud.pubsub.topic.v1.messagePublished** not supported: attribute action not found within event type
│ 
│   with module.function.module.cloud-function.google_cloudfunctions2_function.function,
│   on .terraform/modules/function.cloud-function/main.tf line 21, in resource "google_cloudfunctions2_function" "function":
│   21: resource "google_cloudfunctions2_function" "function" {

################## ERROR END ###################

The current configuration wants to check the schema of MessageDataPublished. Schema can be found under: https://github.com/googleapis/google-cloudevents/blob/main/jsonschema/google/events/cloud/pubsub/v1/MessagePublishedData.json. The "action" attribute is passed via Cloud Scheduler but is is not handled at Subscription level.

image

This part at UI on PubSub (above picture) should be handled properly.

@prabhu34
Copy link
Collaborator

prabhu34 commented Jan 8, 2025

I understand you are expecting filters for the PubSub subscription! If this is supported by Google APIs we should be able to find out the filters or have them implemented in the module as well.

Please share the Terraform module configuration to understand this better. @selimacerbas

@selimacerbas
Copy link
Author

selimacerbas commented Jan 17, 2025

Sorry for my late response @prabhu34 here is my configuration I was using;

module "cloud-function" {
  source  = "GoogleCloudPlatform/cloud-functions/google"
  version = "0.6.0"

  function_name     = var.CLOUD_FUNCTION.name
  project_id        = var.GCP_PROJECT_ID
  function_location = var.REGION
  runtime           = var.CLOUD_FUNCTION.runtime
  entrypoint        = var.CLOUD_FUNCTION.entrypoint

  storage_source = {
    bucket     = module.cloud-storage.name
    object     = google_storage_bucket_object.bucket_object.name
    generation = null
  }

  event_trigger = var.CLOUD_FUNCTION.is_event_triggered == true ? {
    trigger_region        = var.REGION,
    event_type            = "google.cloud.pubsub.topic.v1.messagePublished",
    retry_policy          = "RETRY_POLICY_RETRY",
    service_account_email = "${var.CLOUD_FUNCTION.pubsub_sa_name}@${var.GCP_PROJECT_ID}.iam.gserviceaccount.com",
    pubsub_topic          = "projects/${var.GCP_PROJECT_ID}/topics/${var.CLOUD_FUNCTION.pubsub_topic_name}",
    event_filters         = var.CLOUD_FUNCTION.pubsub_attributes
  } : null

  service_config = {
    available_cpu      = "1"
    min_instance_count = "1"
    max_instance_count = "1"
    available_memory   = "256Mi"
    timeout_seconds    = "60"
    runtime_env_variables = var.CLOUD_FUNCTION.runtime_env_variables != null ? merge(var.CLOUD_FUNCTION.runtime_env_variables, {
      "GCP_PROJECT_ID" = var.GCP_PROJECT_ID
      }) : {
      "GCP_PROJECT_ID" = var.GCP_PROJECT_ID
    }
    runtime_secret_env_variables   = var.CLOUD_FUNCTION.runtime_secret_env_variables != null ? var.CLOUD_FUNCTION.runtime_secret_env_variables : null
    ingress_settings               = "ALLOW_INTERNAL_ONLY"
    service_account_email          = module.service-account.service_accounts_map[var.CLOUD_FUNCTION.sa_name].email
    all_traffic_on_latest_revision = true
  }

  labels = {}

###############################################

The pubsub attribute value looks like this;

pubsub_attributes = [
      {
        attribute_key   = "attributes.action"
        attribute_value = "do_this_foo"
      }
    ]

################################################

as for PubSub module I was using this module;

module "pubsub" {
  source     = "terraform-google-modules/pubsub/google"
  version    = "7.0.0"
  project_id = var.GCP_PROJECT_ID
  topic      = var.PUBSUB.topic_name

  message_storage_policy = {
    allowed_persistence_regions = [var.REGION]
  }

  grant_token_creator = false
}

In the example of Pubsub module, there is examples of implementation of filters on subscription level. This can be taken as reference.

@prabhu34
Copy link
Collaborator

Thank you for sharing the code. Here is my understanding of your requirement and possible solution.

You are looking to filter out the messages based on attributes and trigger the Cloud Function accordingly. Cloud Functions uses a PubSub topic and triggers the function when a message is published. This is using the EventArc Triggers which supports list of sources, event providers (PubSub API) and event types (Direct or Logging).

This module supports the event triggers and filters already and as per the API as well as Terraform Resource documentation, the event_filters block can be used to filter the event triggers that are supported.

The event_filters block supports: attribute - (Required) 'Required. The name of a CloudEvents attribute. Currently, only a subset of attributes are supported for filtering. Use the gcloud eventarc providers describe command to learn more about events and their attributes. Do not filter for the 'type' attribute here, as this is already achieved by the resource's event_type attribute.

To identify the list of supported event types that can be used to filter those event triggers are listed in public documentation. Example for PubSub.

Here is how you can find the attributes of the event provider that can be used for filtering. Below is the supported attributes for PubSub and only attribute it supports is type, which anyway is part of the event_type variable.

cloud eventarc providers describe pubsub.googleapis.com --location=us-east4
displayName: Cloud Pub/Sub
eventTypes:
- description: A message is published to the specified Pub/Sub topic.
  filteringAttributes:
  - attribute: type
    required: true
  type: google.cloud.pubsub.topic.v1.messagePublished
name: projects/PROJECT_ID/locations/us-east4/providers/pubsub.googleapis.com

If you are choosing a different event type (say for Cloud Storage), there are supported attributes to filter like bucket.

- description: The metadata of an existing object changes.
  filteringAttributes:
  - attribute: bucket
    description: The bucket name being watched.
    required: true
  - attribute: type
    required: true
  type: google.cloud.storage.object.v1.metadataUpdated 

Hope this answers and helps to use the module and filters correctly.

@selimacerbas
Copy link
Author

selimacerbas commented Jan 27, 2025

Thank you for the detailed explanation. This means since filtering can only be achieved via API types and their provided attributes, google.cloud.pubsub.topic.v1.messagePublished API does not have any other attribute than type. This means it is impossible to filter incoming messages by their provided custom attributes. If function adjusted as EVENT triggered, the push subscription is enabled automatically once function defined as EVENT triggered. Once push subscriptions are enabled, we can not add custom filtering at subscription level.

I was researching little more and I encountered this stack overflow link which was mentioning how to trigger Cloud Function and filter Pubsub Attributes at Subscription level. It says the function has to be set as HTTP triggered so that we can create our own push subscriptions with necessary filters. The only downside would be, function ingress has to be publicly open. So instead of using ALLOW_INTERNAL_ONLY, ingress has to be written as ALLOW_ALL. But Auth at Push Subscription to the Function Service Account can be adjusted which could eliminate un-auth external-calls to function. Ofc, since function is HTTP triggered, the HTTP request handling has to be adjusted accordingly. Functions framework comes handy in this part. I found this docu regarding HTTP payload sent by Push Subscription very helpful.

After doing these steps, I am able to filter Pubsub Attributes at Subscription level and ONLY trigger correct function that expects this attribute.

I think a new feature for EVENT triggered functions that will be triggered by incoming custom attributes to pubsub would be great.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants