Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Source Mixpanel low code migration #36724

Merged
merged 60 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
492dfe4
Initialization of low code migration
midavadim Mar 27, 2024
0ba0485
cohort_members_stream
midavadim Mar 27, 2024
f64abec
cohorts and revenue streams custom components
midavadim Mar 27, 2024
672d7fc
fixed records for engage streams
midavadim Mar 29, 2024
b931bc2
auth
midavadim Apr 1, 2024
95a220a
addded project secret auth method
midavadim Apr 1, 2024
3f223b1
error handling
midavadim Apr 1, 2024
6786628
removed comments
midavadim Apr 1, 2024
dc28438
funnels with partitition
midavadim Apr 2, 2024
38ff0e7
funnels with partitition
midavadim Apr 5, 2024
e628c0b
add custom FunnelsSubstreamPartitionRouter
midavadim Apr 9, 2024
ee81cbd
added custom pagination for engage stream
midavadim Apr 10, 2024
b6095c2
added dynamic schema for engage
midavadim Apr 10, 2024
43a620b
added caching for schema
midavadim Apr 10, 2024
a52051f
added comments
midavadim Apr 10, 2024
2b8e9dc
added export stream
midavadim Apr 11, 2024
9f7f29a
work around for timezone mismatch problem
midavadim Apr 11, 2024
554ea83
integrated config params
midavadim Apr 11, 2024
2d12e72
added export stream
midavadim Apr 11, 2024
ed70354
code clean up
midavadim Apr 11, 2024
32954f2
updated doc
midavadim Apr 12, 2024
9f31d43
format
midavadim Apr 12, 2024
2d43bc4
add option_title attr to config
midavadim Apr 12, 2024
5ffaa96
removed old code
midavadim Apr 12, 2024
3c6978d
updated unit test
midavadim Apr 15, 2024
fddbd47
format
midavadim Apr 15, 2024
4f0aab7
added reqs_per_hour_limit
midavadim Apr 15, 2024
e4f6911
tweeked reqs_per_hour_limit
midavadim Apr 15, 2024
159c023
updated unit tests
midavadim Apr 16, 2024
70a7edd
updated unit tests
midavadim Apr 17, 2024
52fcc67
removed tag
midavadim Apr 17, 2024
ac896dc
added test_export_get_json_schema
midavadim Apr 17, 2024
1e03761
updated test_funnels_stream
midavadim Apr 17, 2024
752a925
updated test_export_stream_fail
midavadim Apr 17, 2024
9dbdc41
fix review comments
midavadim Apr 23, 2024
ed162c8
annotations update
midavadim Apr 23, 2024
a81c117
format
midavadim Apr 23, 2024
3ca9583
update unit test
midavadim Apr 23, 2024
eaae7d1
Merge branch 'master' into midavadim/36356-mixpanel-low-code-migration
midavadim Apr 24, 2024
d0df96b
added pagination for cohort_members, fixed delay, added unit get_requ…
midavadim Apr 25, 2024
bad3e16
disabled incremental sync for annotations
midavadim Apr 25, 2024
65ddbc7
set reqs_per_hour_limit = 60
midavadim Apr 25, 2024
980f593
added semi-incremental support
midavadim Apr 26, 2024
77ff468
Merge branch 'master' into midavadim/36356-mixpanel-low-code-migration
midavadim Apr 26, 2024
1df4398
fixed unit tests
midavadim Apr 26, 2024
d9f7b90
added default start_date value
midavadim Apr 30, 2024
d9972f6
add browser_version transformation to string
midavadim May 2, 2024
c93faf4
Merge branch 'master' into midavadim/36356-mixpanel-low-code-migration
midavadim May 2, 2024
b49aeb2
removed comments
midavadim May 2, 2024
7a3e613
added state migration
midavadim May 5, 2024
fe17df1
format
midavadim May 6, 2024
6cd1009
Merge branch 'master' into midavadim/36356-mixpanel-low-code-migration
girarda May 6, 2024
0de9682
updated cdk version, removed odd incremental_sync for annotations stream
midavadim May 7, 2024
3a897d4
Merge remote-tracking branch 'origin/midavadim/36356-mixpanel-low-cod…
midavadim May 7, 2024
fecb825
updated manifest version
midavadim May 7, 2024
3fb1860
moved get_url_base to manifest
midavadim May 7, 2024
776250f
added doc
midavadim May 7, 2024
e37672f
Merge branch 'master' into midavadim/36356-mixpanel-low-code-migration
midavadim May 7, 2024
e510be1
fix after merge
midavadim May 7, 2024
92655a8
update abnormal test
midavadim May 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
"type": "STREAM",
"stream": {
"stream_state": {
"36152117": { "date": "2030-01-01" },
"41833532": { "date": "2030-01-01" },
"36152117": { "date": "2030-01-01" }
"41833755": { "date": "2030-01-01" },
"41833700": { "date": "2030-01-01" }
},
"stream_descriptor": { "name": "funnels" }
}
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/connectors/source-mixpanel/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerImageTag: 2.2.2
dockerImageTag: 2.3.0
dockerRepository: airbyte/source-mixpanel
documentationUrl: https://docs.airbyte.com/integrations/sources/mixpanel
githubIssueLabel: source-mixpanel
Expand Down Expand Up @@ -58,5 +58,5 @@ data:
supportLevel: certified
tags:
- language:python
- cdk:python
- cdk:low-code
metadataSpecVersion: "1.0"
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ requires = [ "poetry-core>=1.0.0",]
build-backend = "poetry.core.masonry.api"

[tool.poetry]
version = "2.2.2"
version = "2.3.0"
name = "source-mixpanel"
description = "Source implementation for Mixpanel."
authors = [ "Airbyte <[email protected]>",]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
midavadim marked this conversation as resolved.
Show resolved Hide resolved

import time
from dataclasses import dataclass
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional

import dpath.util
import requests
from airbyte_cdk.models import AirbyteMessage, SyncMode, Type
from airbyte_cdk.sources.declarative.extractors import DpathExtractor
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.migrations.legacy_to_per_partition_state_migration import LegacyToPerPartitionStateMigration
from airbyte_cdk.sources.declarative.models import DatetimeBasedCursor
from airbyte_cdk.sources.declarative.partition_routers import SubstreamPartitionRouter
from airbyte_cdk.sources.declarative.requesters import HttpRequester
from airbyte_cdk.sources.declarative.requesters.paginators.strategies.page_increment import PageIncrement
from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import _default_file_path
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState

from .source import SourceMixpanel
from .streams.engage import EngageSchema


class MixpanelHttpRequester(HttpRequester):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could methods get_url_base and get_request_params be moved to the manifest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_url_base moved to manifest

get_request_params should be declared here because is the issue: #38021

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible the values aren't passed because the fields are not defined on the custom requester? The fields must be defined as dataclass params to be accessible. Here's the doc

The class must also be a dataclass where each field represents an argument to configure from the yaml file, and an InitVar named parameters.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what exactly goes wrong but I just add request_parameters/request_headers and they do not work:

  requester:
    type: CustomRequester
    class_name: "source_mixpanel.components.MixpanelHttpRequester"
    request_parameters:
        project_id: "{{ config['credentials']['project_id'] }}"
         XXXX: XXXXXX
    request_headers:
        XXXXX: XXXXXX

midavadim marked this conversation as resolved.
Show resolved Hide resolved
reqs_per_hour_limit = 60
is_first_request = True

def get_request_headers(
midavadim marked this conversation as resolved.
Show resolved Hide resolved
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:

return {"Accept": "application/json"}

def get_request_params(
midavadim marked this conversation as resolved.
Show resolved Hide resolved
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
project_id = self.config.get("credentials", {}).get("project_id")
return {"project_id": project_id} if project_id else {}

def _request_params(
self,
stream_state: Optional[StreamState],
stream_slice: Optional[StreamSlice],
next_page_token: Optional[Mapping[str, Any]],
extra_params: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""
Flatten extra_params if it contains pagination information
"""
next_page_token = None # reset it, pagination data is in extra_params
if extra_params:
page = extra_params.pop("page", {})
extra_params.update(page)
return super()._request_params(stream_state, stream_slice, next_page_token, extra_params)

def send_request(self, **kwargs) -> Optional[requests.Response]:

if self.reqs_per_hour_limit:
if self.is_first_request:
self.is_first_request = False
else:
# we skip this block, if self.reqs_per_hour_limit = 0,
# in all other cases wait for X seconds to match API limitations
# https://help.mixpanel.com/hc/en-us/articles/115004602563-Rate-Limits-for-Export-API-Endpoints#api-export-endpoint-rate-limits
self.logger.info(
f"Sleep for {3600 / self.reqs_per_hour_limit} seconds to match API limitations after reading from {self.name}"
)
time.sleep(3600 / self.reqs_per_hour_limit)

return super().send_request(**kwargs)


class AnnotationsHttpRequester(MixpanelHttpRequester):
midavadim marked this conversation as resolved.
Show resolved Hide resolved
def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
return {}


class FunnelsHttpRequester(MixpanelHttpRequester):
def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> MutableMapping[str, Any]:
params = super().get_request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token)
params["unit"] = "day"
midavadim marked this conversation as resolved.
Show resolved Hide resolved
return params


class CohortMembersSubstreamPartitionRouter(SubstreamPartitionRouter):
def get_request_body_json(
self,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
# https://developer.mixpanel.com/reference/engage-query
cohort_id = stream_slice["id"]
return {"filter_by_cohort": f'{{"id":{cohort_id}}}'}
midavadim marked this conversation as resolved.
Show resolved Hide resolved


class EngageTransformation(RecordTransformation):
def transform(
midavadim marked this conversation as resolved.
Show resolved Hide resolved
self,
record: Record,
config: Optional[Config] = None,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> Record:
"""
- flatten $properties fields
- remove leading '$'
"""
record["distinct_id"] = record.pop("$distinct_id")
properties = record.pop("$properties")
for property_name in properties:
this_property_name = property_name
if property_name.startswith("$"):
# Just remove leading '$' for 'reserved' mixpanel properties name, example:
# from API: '$browser'
# to stream: 'browser'
this_property_name = this_property_name[1:]
record[this_property_name] = properties[property_name]

return record


class RevenueDpathExtractor(DpathExtractor):
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
"""
response.json() example:
{
'computed_at': '2021-07-03T12:43:48.889421+00:00',
'results': {
'$overall': { <-- should be skipped
'amount': 0.0,
'count': 124,
'paid_count': 0
},
'2021-06-01': {
'amount': 0.0,
'count': 124,
'paid_count': 0
},
'2021-06-02': {
'amount': 0.0,
'count': 124,
'paid_count': 0
},
...
},
'session_id': '162...',
'status': 'ok'
}
"""
new_records = []
for record in super().extract_records(response):
for date_entry in record:
if date_entry != "$overall":
list.append(new_records, {"date": date_entry, **record[date_entry]})
return new_records


class FunnelsDpathExtractor(DpathExtractor):
def extract_records(self, response: requests.Response) -> List[Mapping[str, Any]]:
"""
response.json() example:
{
'computed_at': '2021-07-03T12:43:48.889421+00:00',
'results': {
'$overall': { <-- should be skipped
'amount': 0.0,
'count': 124,
'paid_count': 0
},
'2021-06-01': {
'amount': 0.0,
'count': 124,
'paid_count': 0
},
...
},
'session_id': '162...',
'status': 'ok'
}
"""
new_records = []
for record in super().extract_records(response):
for date_entry in record:
list.append(new_records, {"date": date_entry, **record[date_entry]})
return new_records


class FunnelsSubstreamPartitionRouter(SubstreamPartitionRouter):
def stream_slices(self) -> Iterable[StreamSlice]:
"""
Add 'funnel_name' to the slice, the rest code is exactly the same as in super().stream_slices(...)
Remove empty 'parent_slice' attribute to be compatible with LegacyToPerPartitionStateMigration
"""
if not self.parent_stream_configs:
midavadim marked this conversation as resolved.
Show resolved Hide resolved
yield from []
else:
for parent_stream_config in self.parent_stream_configs:
parent_stream = parent_stream_config.stream
parent_field = parent_stream_config.parent_key.eval(self.config) # type: ignore # parent_key is always casted to an interpolated string
partition_field = parent_stream_config.partition_field.eval(self.config) # type: ignore # partition_field is always casted to an interpolated string
for parent_stream_slice in parent_stream.stream_slices(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_state=None
):
empty_parent_slice = True
parent_partition = parent_stream_slice.partition if parent_stream_slice else {}

for parent_record in parent_stream.read_records(
sync_mode=SyncMode.full_refresh, cursor_field=None, stream_slice=parent_stream_slice, stream_state=None
):
# Skip non-records (eg AirbyteLogMessage)
if isinstance(parent_record, AirbyteMessage):
if parent_record.type == Type.RECORD:
parent_record = parent_record.record.data
else:
continue
elif isinstance(parent_record, Record):
parent_record = parent_record.data
try:
partition_value = dpath.util.get(parent_record, parent_field)
except KeyError:
pass
else:
empty_parent_slice = False
yield StreamSlice(
partition={partition_field: partition_value},
cursor_slice={"funnel_name": parent_record.get("name")},
)
# If the parent slice contains no records,
if empty_parent_slice:
yield from []


@dataclass
class EngagePaginationStrategy(PageIncrement):
"""
Engage stream uses 2 params for pagination:
session_id - returned after first request
page - incremental page number
"""

def next_page_token(self, response, last_records: List[Mapping[str, Any]]) -> Optional[Mapping[str, Any]]:
midavadim marked this conversation as resolved.
Show resolved Hide resolved
"""
Determines page and subpage numbers for the `items` stream

Attributes:
response: Contains `boards` and corresponding lists of `items` for each `board`
last_records: Parsed `items` from the response
"""
decoded_response = response.json()
page_number = decoded_response.get("page")
total = decoded_response.get("total") # exist only on first page
if total:
self._total = total

if self._total and page_number is not None and self._total > self.page_size * (page_number + 1):
return {"session_id": decoded_response.get("session_id"), "page": page_number + 1}
else:
self._total = None
return None


class EngageJsonFileSchemaLoader(JsonFileSchemaLoader):
midavadim marked this conversation as resolved.
Show resolved Hide resolved
"""Engage schema combines static and dynamic approaches"""

schema: Mapping[str, Any]

def __post_init__(self, parameters: Mapping[str, Any]):
if not self.file_path:
self.file_path = _default_file_path()
self.file_path = InterpolatedString.create(self.file_path, parameters=parameters)
self.schema = {}

def get_json_schema(self) -> Mapping[str, Any]:
"""
Dynamically load additional properties from API
Add cache to reduce a number of API calls because get_json_schema()
is called for each extracted record
"""

if self.schema:
return self.schema

schema = super().get_json_schema()

types = {
"boolean": {"type": ["null", "boolean"]},
"number": {"type": ["null", "number"], "multipleOf": 1e-20},
# no format specified as values can be "2021-12-16T00:00:00", "1638298874", "15/08/53895"
"datetime": {"type": ["null", "string"]},
"object": {"type": ["null", "object"], "additionalProperties": True},
"list": {"type": ["null", "array"], "required": False, "items": {}},
"string": {"type": ["null", "string"]},
}

params = {"authenticator": SourceMixpanel.get_authenticator(self.config), "region": self.config.get("region")}
project_id = self.config.get("credentials", {}).get("project_id")
if project_id:
params["project_id"] = project_id

schema["additionalProperties"] = self.config.get("select_properties_by_default", True)

# read existing Engage schema from API
schema_properties = EngageSchema(**params).read_records(sync_mode=SyncMode.full_refresh)
for property_entry in schema_properties:
property_name: str = property_entry["name"]
property_type: str = property_entry["type"]
if property_name.startswith("$"):
# Just remove leading '$' for 'reserved' mixpanel properties name, example:
# from API: '$browser'
# to stream: 'browser'
property_name = property_name[1:]
# Do not overwrite 'standard' hard-coded properties, add 'custom' properties
if property_name not in schema["properties"]:
schema["properties"][property_name] = types.get(property_type, {"type": ["null", "string"]})
self.schema = schema
return schema
Loading
Loading