Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/refactor filters #185

Merged
merged 1 commit into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/test-fixtures/transform-mixed-filtered.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ transform {

transform {
use "spEnrichedFilter" {
field = "app_id"
atomic_field = "app_id"
regex = "wrong"
regex_timeout = 10
}
Expand Down
267 changes: 172 additions & 95 deletions pkg/transform/snowplow_enriched_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ package transform

import (
"fmt"
"log"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -21,119 +21,55 @@ import (
"github.com/snowplow-devops/stream-replicator/pkg/models"
)

func findSpEnrichedFilterValue(queriedField, parsedEventName, eventVer, field string, parsedMessage analytics.ParsedEvent, path []interface{}) ([]interface{}, error) {
var vf interface{}
var valueFound []interface{}
var err error

switch {
case strings.HasPrefix(queriedField, `contexts_`):
vf, err = parsedMessage.GetContextValue(queriedField, path...)
valueFound = append(valueFound, vf.([]interface{})...)
case strings.HasPrefix(queriedField, `unstruct_event`):
eventNameFull := `unstruct_event_` + parsedEventName
if queriedField == eventNameFull || queriedField == eventNameFull+`_`+eventVer {
vf, err = parsedMessage.GetUnstructEventValue(path...)
valueFound = append(valueFound, vf)
}
default:
vf, err = parsedMessage.GetValue(field)
valueFound = append(valueFound, vf)
func evaluateSpEnrichedFilter(re *regexp2.Regexp, valuesFound []interface{}) bool {
// if valuesFound is nil, we found no value.
// Because negative matches are a thing, we still want to match against an empty string
if valuesFound == nil {
valuesFound = make([]interface{}, 1)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, when there were no values, we never evaluated anything (just returned false)

This is important as a negative lookahead should evaluate to true when we have no value (beacuse "" != "someValue")

}
if err != nil {
// GetValue returns an error if the field requested is empty. Check for that particular error before returning error
if err.Error() == analytics.EmptyFieldErr {
return nil, nil
for _, v := range valuesFound {
if v == nil {
v = "" // because nil gets cast to `<nil>`
Copy link
Collaborator Author

@colmsnowplow colmsnowplow Aug 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is necessary because we're using fmt.Sprint() to cast everything to string when we evaluate the filter.

Really, this is a workaround to cover up some jank - I've opened an issue to do better here: #186

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's my mistake here, I did not realise that nil does not work properly with Sprintf. We should indeed find a better solution here. My vote goes towards handling the different possible types in the existing filter.

}
return nil, err
}
return valueFound, nil
}

func evaluateSpEnrichedFilter(valuesFound []interface{}, regex string, regexTimeout int) bool {
re, err := regexp2.Compile(regex, 0)
re.MatchTimeout = time.Duration(regexTimeout) * time.Second
if err != nil {
log.Fatal(errors.Wrap(err, `error compiling regex for filter`))
}
for _, v := range valuesFound {
if ok, _ := re.MatchString(fmt.Sprintf("%v", v)); ok {
return true
}
}
return false
}

// createSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event
// and a regex declared by the user.
func createSpEnrichedFilterFunction(field, regex string, regexTimeout int, isUnstructEvent bool) (TransformationFunction, error) {
func createSpEnrichedFilterFunction(regex string, regexTimeout int, getFunc valueGetter) (TransformationFunction, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to call out as it may make it easier to make sense of the changes:

This create function (and the make functions for Getters) run once, on startup. The filter function itself runs once per batch of event (which at the moment is usually batches of 1 event - so for every event).

The idea of this refactor is:

  • We only compile regexes once
  • We keep DRY principle
  • We outsoruce the part which gets the value to a separate function, which is independently unit testable. (getter)
  • The nuance of unstruct filters doing more work is also shifted to the getter function - which is appropriate to the design IMO: if the event name or version don't match, we should behave as if the field desired simply is nil.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is brilliant, I love the new structure both from a readability and from a technical view.

if regexTimeout == 0 {
// default timeout for regex is 10 seconds
regexTimeout = 10
}

// regexToMatch is what we use to evaluate the actual filter, once we have the value.
regexToMatch, err := regexp2.Compile(regex, 0)
regexToMatch.MatchTimeout = time.Duration(regexTimeout) * time.Second
if err != nil {
return nil, errors.Wrap(err, `error compiling regex for filter`)
}

return func(message *models.Message, intermediateState interface{}) (*models.Message, *models.Message, *models.Message, interface{}) {
if regexTimeout == 0 {
// default timeout for regex is 10 seconds
regexTimeout = 10
}

// Evaluate intermediateState to parsedEvent
parsedMessage, parseErr := IntermediateAsSpEnrichedParsed(intermediateState, message)
if parseErr != nil {
message.SetError(parseErr)
return nil, nil, message, nil
}

// This regex retrieves the path fields
// (e.g. field1.field2[0].field3 -> [field1, field2, 0, field3])
regexWords := `\w+`
re := regexp.MustCompile(regexWords)

// separate the path string into words using regex
path := re.FindAllString(field, -1)
separatedPath := make([]string, len(path)-1)
for idx, pathField := range path[1:] {
separatedPath[idx] = pathField
}

var parsedEventName string
var eventMajorVer string
var err error

// only call SDK functions if an unstruct_event is being filtered
if isUnstructEvent {
// get event name
eventName, err := parsedMessage.GetValue(`event_name`)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}
parsedEventName = eventName.(string)
// get event version
fullEventVer, err := parsedMessage.GetValue(`event_version`)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}
// get the major event version
eventMajorVer = strings.Split(fullEventVer.(string), `-`)[0]
if eventMajorVer == `` {
message.SetError(fmt.Errorf(`invalid schema version format: %s`, fullEventVer))
return nil, nil, message, nil
}
}

// find the value in the event
valueFound, err := findSpEnrichedFilterValue(
path[0],
parsedEventName,
eventMajorVer,
field,
parsedMessage,
convertPathToInterfaces(separatedPath),
)
// get the value
valueFound, err := getFunc(parsedMessage)
if err != nil {
message.SetError(err)
return nil, nil, message, nil
}

// evaluate whether the found value passes the filter, determining if the message should be kept
shouldKeepMessage := evaluateSpEnrichedFilter(valueFound, regex, regexTimeout)
shouldKeepMessage := evaluateSpEnrichedFilter(regexToMatch, valueFound)

// if message is not to be kept, return it as a filtered message to be acked in the main function
if !shouldKeepMessage {
Expand All @@ -145,17 +81,158 @@ func createSpEnrichedFilterFunction(field, regex string, regexTimeout int, isUns
}, nil
}

// valueGetter is a function that can hold the logic for getting values in the case of base, context, and unstruct fields,
// which respecively require different logic.
type valueGetter func(analytics.ParsedEvent) ([]interface{}, error)

// Because each type of value requires different arguments, we use these `make` functions to construct them.
// This allows us to unit test each one, plug them into the createSpEnrichedFilterFunction constructor,
// and to construct them so that field names/paths and regexes are handled only once, at startup.

// makeBaseValueGetter returns a valueGetter for base-level values.
func makeBaseValueGetter(field string) valueGetter {
return func(parsedMessage analytics.ParsedEvent) (value []interface{}, err error) {
// find the value in the event
valueFound, err := parsedMessage.GetValue(field)
// We don't return an error for empty field since this just means the value is nil.
if err != nil && err.Error() != analytics.EmptyFieldErr {
return nil, err
}
return []interface{}{valueFound}, nil
}
}

// NewSpEnrichedFilterFunction returns a TransformationFunction which filters messages based on a field in the Snowplow enriched event.
func NewSpEnrichedFilterFunction(field, regex string, regexTimeout int) (TransformationFunction, error) {
return createSpEnrichedFilterFunction(field, regex, regexTimeout, false)

// getBaseValueForMatch is responsible for retrieving data from the message for base fields
getBaseValueForMatch := makeBaseValueGetter(field)

return createSpEnrichedFilterFunction(regex, regexTimeout, getBaseValueForMatch)
}

// makeContextValueGetter creates a valueGetter for context data
func makeContextValueGetter(name string, path []interface{}) valueGetter {
return func(parsedMessage analytics.ParsedEvent) ([]interface{}, error) {
value, err := parsedMessage.GetContextValue(name, path...)
// We don't return an error for empty field since this just means the value is nil.
if err != nil && err.Error() != analytics.EmptyFieldErr {
return nil, err
}
// bug in analytics sdk requires the type casting below. https://github.com/snowplow/snowplow-golang-analytics-sdk/issues/36
// GetContextValue should always return []interface{} but instead it returns an interface{} which always contains type []interface{}

// if it's nil, return nil - we just didn't find any value.
if value == nil {
return nil, nil
}
// otherwise, type assertion.
valueFound, ok := value.([]interface{})
if !ok {
return nil, errors.New(fmt.Sprintf("Context filter encountered unexpected type in getting value for path %v", path))
}

return valueFound, nil
}
}

// NewSpEnrichedFilterFunctionContext returns a TransformationFunction for filtering a context
func NewSpEnrichedFilterFunctionContext(field, regex string, regexTimeout int) (TransformationFunction, error) {
return createSpEnrichedFilterFunction(field, regex, regexTimeout, false)
func NewSpEnrichedFilterFunctionContext(contextFullName, pathToField, regex string, regexTimeout int) (TransformationFunction, error) {

path, err := parsePathToArguments(pathToField)
if err != nil {
return nil, errors.Wrap(err, "error creating Context filter function")
}

// getContextValuesForMatch is responsible for retrieving data from the message for context fields
getContextValuesForMatch := makeContextValueGetter(contextFullName, path)

return createSpEnrichedFilterFunction(regex, regexTimeout, getContextValuesForMatch)
}

// makeUnstructValueGetter creates a valueGetter for unstruct data.
func makeUnstructValueGetter(eventName string, versionRegex *regexp.Regexp, path []interface{}) valueGetter {
return func(parsedMessage analytics.ParsedEvent) (value []interface{}, err error) {
eventNameFound, err := parsedMessage.GetValue(`event_name`)
if err != nil { // This field can't be empty for a valid event, so we return all errors here
return nil, err
}
if eventNameFound != eventName { // If we don't have an exact match on event name, we return nil value
return nil, nil
}
versionFound, err := parsedMessage.GetValue(`event_version`)
if err != nil { // This field can't be empty for a valid event, so we return all errors here
return nil, err
}
if !versionRegex.MatchString(versionFound.(string)) { // If we don't match the provided version regex, return nil value
return nil, nil
}

valueFound, err := parsedMessage.GetUnstructEventValue(path...)
// We don't return an error for empty field since this just means the value is nil.
if err != nil && err.Error() != analytics.EmptyFieldErr && !strings.Contains(err.Error(), "not found") {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since fields can be optional, I've changed this to return nil when we can't find the field. Actually this is a problem with the API design in the analytics SDK so there's an issue to fix that also.

// This last clause exists because of this: https://github.com/snowplow/snowplow-golang-analytics-sdk/issues/37
// TODO: Fix that and remove it as soon as possible.
return nil, err
}

if valueFound == nil {
return nil, nil
}

return []interface{}{valueFound}, nil
}
}

// NewSpEnrichedFilterFunctionUnstructEvent returns a TransformationFunction for filtering an unstruct_event
func NewSpEnrichedFilterFunctionUnstructEvent(field, regex string, regexTimeout int) (TransformationFunction, error) {
return createSpEnrichedFilterFunction(field, regex, regexTimeout, true)
func NewSpEnrichedFilterFunctionUnstructEvent(eventNameToMatch, eventVersionToMatch, pathToField, regex string, regexTimeout int) (TransformationFunction, error) {

path, err := parsePathToArguments(pathToField)
if err != nil {
return nil, errors.Wrap(err, "error creating Unstruct filter function")
}

versionRegex, err := regexp.Compile(eventVersionToMatch)
if err != nil {
return nil, errors.Wrap(err, fmt.Sprint("Failed to compile regex: ", eventVersionToMatch))
}

// getUnstructValuesForMatch is responsible for retrieving data from the message for context fields.
// It also checks that the correct event name and version are provided, and returns nil if not.
getUnstructValuesForMatch := makeUnstructValueGetter(eventNameToMatch, versionRegex, path)

return createSpEnrichedFilterFunction(regex, regexTimeout, getUnstructValuesForMatch)
}

// parsePathToArguments parses a string path to custom data (eg. `test1.test2[0].test3`)
// into the slice of interfaces expected by the analytics SDK's Get() methods.
func parsePathToArguments(pathToField string) ([]interface{}, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a neater implementation using something like:

	re := regexp.MustCompile(`\w+|\[\d+\]`)
	parts := re.FindAllString(pathToField, -1)

        convertedPath := make([]interface{}, 0)
	for _, s := range parts {
		// if it looks like `[42]` then extract the integer and treat it as an array index
                // otherwise treat it as an object key
                append(convertedPath, ???)
	}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That regex in my previous suggestion was flawed. This one might be better:

re := regexp.MustCompile(`\[\d+\]|[^\.\[]+`)

I'm not 100% sure this method will end up being neater than what you did. But I think it's worth following through with the effort, to see what the end result looks like and then to decide which is neater.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure why I didn't play with a different regex first, just pivoted to something else... 🤔 let me give it a test and see where I get to.

Copy link
Collaborator Author

@colmsnowplow colmsnowplow Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@istreeter after experimenting with it, this way is definitely better, covers more cases, and leaves us with cleaner code.

The one edge case that was still present was when the string provided contains an unclosed opening brace (like the case in our last unit test case: test1.test[2.test3. Because the first regex strips the opening brace, it thinks that's just a "2" - which means it wouldn't fail, and since a non-existent path just means 'match against nil'. It's an unlikely edge case but there's some downside.

I tried a different way of chopping it up to handle this explicitly, but it left us with less readable code and was quite awkward. But thinking about this case, I don't think it's likely that we have an unmatched [ in the path without it being an error. On that assumption, I just added a check that count([) == count(]) in the string before proceeding (and if it doesn't, we throw an error on startup).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to worry about such weird edge cases. If a user really has such weird field names, then they just have to accept that they cannot use stream replicator to perform transformations. That's OK, I think they'll get over it. Or they will re-design their schemas!

(What's important to me, is that nothing else breaks if a schema has weird fields. I mean, it would be bad if someone could generate an unexpected event that makes the replicator crash; but that's not what we're discussing here)

In other words, I think what you've done is good. It's a nice balance between trying to support all schemas, but not going overboard in unnecessarily supporting weird edge cases which don't deserve our support!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hang on..... does this regexp work, and also fix the edge case you were worried about?

re := regexp.MustCompile(`\[\d+\]|[^\.]+`)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It leaves us with parts like test1[1], which then needs to be handled gain.

That's not really a problem but I prefer what we have in the PR becuase I think it's actually preferable to throw the error when there's a miscount of braces - it's far more likely that this is misconfiguration than not. Without the check this misconfiguration would lead to the app deploying successfully and incorrectly filtering the data - which is a headache.

So I think it's better to keep the check and throw the error - in which case, I don't think there's a benefit to changing the regex.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// validate that an edge case (unmatched opening brace) isn't present
if strings.Count(pathToField, "[") != strings.Count(pathToField, "]") {
return nil, errors.New(fmt.Sprint("unmatched brace in path: ", pathToField))
}

// regex to separate path into components
re := regexp.MustCompile(`\[\d+\]|[^\.\[]+`)
parts := re.FindAllString(pathToField, -1)

// regex to identify arrays
arrayRegex := regexp.MustCompile(`\[\d+\]`)

convertedPath := make([]interface{}, 0)
for _, part := range parts {

if arrayRegex.MatchString(part) { // handle arrays first
intPart, err := strconv.Atoi(part[1 : len(part)-1]) // strip braces and convert to int
if err != nil {
return nil, errors.New(fmt.Sprint("error parsing path element: ", part))
}

convertedPath = append(convertedPath, intPart)
} else { // handle strings
convertedPath = append(convertedPath, part)
}

}
return convertedPath, nil
}
Loading