-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding Veeva Vault Objects example #56
base: main
Are you sure you want to change the base?
Changes from 1 commit
13879ef
150585d
f4b61cb
1e05115
04dcf46
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
{ | ||
"vaultDNS": "<YOUR_VEEVA_VAULT_DNS>.veevavault.com", | ||
orizwanft marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"username": "<YOUR_VEEVA_VAULT_USERNAME>", | ||
"password": "<YOUR_VEEVA_VAULT_PASSWORD>" | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,171 @@ | ||||||
# This is a simple example for how to work with the fivetran_connector_sdk module. | ||||||
# This code is currently configured to Retrieve Details from All Object Types in Veeva Vault and then utilize VQL to retrieve all Object records, creating 1 table per object | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
# You will need to provide your own Veeva Vault credentials for this to work --> vaultDNS, username, and password variables in configuration.json | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
# Retrieve Details from All Object Types endpoint: https://developer.veevavault.com/api/24.2/#retrieve-details-from-all-object-types | ||||||
# VQL endpoint: https://developer.veevavault.com/api/24.2/#vault-query-language-vql | ||||||
# Can also add code to extract from other endpoints as needed | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
# See the Technical Reference documentation (https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update) | ||||||
# and the Best Practices documentation (https://fivetran.com/docs/connectors/connector-sdk/best-practices) for details | ||||||
|
||||||
varundhall marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
from fivetran_connector_sdk import Connector | ||||||
from fivetran_connector_sdk import Operations as op | ||||||
from fivetran_connector_sdk import Logging as log | ||||||
import requests | ||||||
import json | ||||||
import datetime | ||||||
|
||||||
# Function to start a Veeva Session, inputs needed are: username, password, and vaultDNS -- all from the configuration.json file | ||||||
# Auth endpoint: https://developer.veevavault.com/api/24.2/#user-name-and-password | ||||||
# The startVeevaSession function takes 1 parameter: | ||||||
# - configuration: a dictionary that holds the configuration settings for the connector. | ||||||
def startVeevaSession(configuration: dict): | ||||||
username = configuration.get('username') | ||||||
password = configuration.get('password') | ||||||
auth_url = f"https://{configuration.get('vaultDNS')}/api/v24.2/auth" | ||||||
headers = { | ||||||
"Content-Type": "application/x-www-form-urlencoded", | ||||||
"Accept": "application/json" | ||||||
} | ||||||
data = { | ||||||
"username": username, | ||||||
"password": password | ||||||
} | ||||||
response = requests.post(auth_url, headers=headers, data=data) | ||||||
if response.status_code == 200: | ||||||
response_json = response.json() | ||||||
if response_json.get('responseStatus') == "SUCCESS": | ||||||
session_id = response_json.get('sessionId') | ||||||
else: | ||||||
log.severe(f"Failed to create session for user with username: {username}") | ||||||
session_id = None | ||||||
else: | ||||||
log.severe(f"Failed to get 200 response from Auth URL: {auth_url}, received response status code of {response.status_code}") | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. polish this log statement, and also throw the response.message, so that error is visible There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this work? "Authentication failed: {auth_url} returned {response.status_code}. Expected 200." |
||||||
session_id = None | ||||||
log.info(f"Started session for user: {username}") | ||||||
return session_id | ||||||
|
||||||
# Function that ends a Veeva session, inputs needed are: active session_id and vaultDNS from configuration.json file | ||||||
# End session endpoint: https://developer.veevavault.com/api/24.2/#end-session | ||||||
# The endVeevaSession function takes 2 parameters: | ||||||
# - configuration: a dictionary that holds the configuration settings for the connector. | ||||||
# - session_id: a Veeva Vault session ID generated from the startVeevaSession function | ||||||
def endVeevaSession(configuration: dict, session_id): | ||||||
deactivate_session_url = f"https://{configuration.get('vaultDNS')}/api/v24.2/session" | ||||||
headers = { | ||||||
"Authorization": session_id | ||||||
} | ||||||
response_deact = requests.delete(deactivate_session_url, headers=headers) | ||||||
log.info(f"Deactivation of session for user: {configuration.get('username')} responded with code: {response_deact.status_code}: {response_deact.reason}") | ||||||
|
||||||
# Function to get list of available Veeva Vault objects | ||||||
# Endpoint documentation: https://developer.veevavault.com/api/24.2/#retrieve-details-from-all-object-types | ||||||
# The getVaultObjects function takes 2 parameters: | ||||||
# - configuration: a dictionary that holds the configuration settings for the connector. | ||||||
# - session_id: a Veeva Vault session ID generated from the startVeevaSession function | ||||||
def getVaultObjects(configuration: dict, session_id): | ||||||
base_url = f"https://{configuration.get('vaultDNS')}/api/v24.2/" | ||||||
headers = {'Authorization': session_id, | ||||||
'Accept': 'application/json'} | ||||||
get_objects_url = base_url + 'configuration/Objecttype' | ||||||
obj_response = requests.get(get_objects_url, headers = headers).json() | ||||||
objects = obj_response.get('data') | ||||||
return objects | ||||||
|
||||||
# Define the schema function which lets you configure the schema your connector delivers. | ||||||
# See the technical reference documentation for more details on the schema function: | ||||||
# https://fivetran.com/docs/connectors/connector-sdk/technical-reference#schema | ||||||
# The schema function takes 1 parameter: | ||||||
# - configuration: a dictionary that holds the configuration settings for the connector. | ||||||
def schema(configuration: dict): | ||||||
# Dynamically define schema based on getVaultObjects response | ||||||
# Note: Defining all tables to have a PK of "id" right now - need to verify that this will hold true for all objects | ||||||
session_id = startVeevaSession(configuration) | ||||||
objects = getVaultObjects(configuration, session_id) | ||||||
schema_def = [] | ||||||
for obj in objects: | ||||||
schema_def.append({"table": obj.get('label_plural'), "primary_key": ["id"]}) | ||||||
endVeevaSession(configuration, session_id) | ||||||
return schema_def | ||||||
|
||||||
# Define the update function, which is a required function, and is called by Fivetran during each sync. | ||||||
# See the technical reference documentation for more details on the update function | ||||||
# https://fivetran.com/docs/connectors/connector-sdk/technical-reference#update | ||||||
# The function takes 2 parameters: | ||||||
# - configuration: dictionary contains any secrets or payloads you configure when deploying the connector | ||||||
# - state: a dictionary contains whatever state you have chosen to checkpoint during the prior sync | ||||||
# The state dictionary is empty for the first sync or for any full re-sync | ||||||
# This function is designed to loop through and retrieve vault Object data using VQL endpoint: https://developer.veevavault.com/api/24.2/#submitting-a-query | ||||||
def update(configuration: dict, state: dict): | ||||||
varundhall marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
base_url = f"https://{configuration.get('vaultDNS')}/api/v24.2/" | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. define this as a constant and then use that for all occurrences There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Defined as a constant at the start of the code under #Constant |
||||||
# Batch size to query, this can be updated and will be placed in every query's PAGESIZE VQL clause | ||||||
page_size = 50 | ||||||
# Define current time (where to start subsequent sync) and extract current cursor from state | ||||||
current_time = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ') | ||||||
cursor = state.get('cursor') | ||||||
# Start Veeva session and retrieve all object details for which to query | ||||||
session_id = startVeevaSession(configuration) | ||||||
objects = getVaultObjects(configuration, session_id) | ||||||
# Loop through all objects to define fields and compose query | ||||||
for obj in objects: | ||||||
# Get object name, label, and compose commma separated list of fields to query for | ||||||
object_name = obj.get('object') | ||||||
object_label = obj.get('label_plural') | ||||||
fields_dict = obj.get('type_fields') | ||||||
# May want to wrap some fields in TOLABEL() here depending on desired output | ||||||
fields = ', '.join(d.get('name') for d in fields_dict) | ||||||
# Query with date filter if incremental sync, without if initial sync | ||||||
if cursor: | ||||||
query = f"select {fields} from {object_name} WHERE modified_date__v > '{cursor}' PAGESIZE {page_size}" | ||||||
else: | ||||||
query = f"select {fields} from {object_name} PAGESIZE {page_size}" | ||||||
log.info(f"Next query is: {query}") | ||||||
# Define headers and %20 spaces for query payload | ||||||
vql_headers = { | ||||||
'Authorization': session_id, | ||||||
'Accept': 'application/json', | ||||||
'X-VaultAPI-DescribeQuery': 'true', | ||||||
'Content-Type': 'application/x-www-form-urlencoded' | ||||||
} | ||||||
payload = f"q={query.replace(' ', '%20')}" | ||||||
# Generate API url and send VQL endpoint | ||||||
# VQL endpoint: https://developer.veevavault.com/api/24.2/#submitting-a-query | ||||||
vql_url = base_url + 'query' | ||||||
obj_vql = requests.post(vql_url, headers = vql_headers, data=payload).json() | ||||||
# Get and analyze responseDetails, which includes: pagesize, pageoffset, size, total, next_page | ||||||
response_details = obj_vql.get('responseDetails') | ||||||
log.info(f"VQL query response details: {response_details}") | ||||||
# If records returned, turn them into dictionary and upsert to object_table | ||||||
if response_details.get('size') > 0: | ||||||
for record in obj_vql.get('data'): | ||||||
row = {k: v[0] if isinstance(v, list) else v for k,v in record.items()} | ||||||
yield op.upsert(object_label, row) | ||||||
# Until there is no next_page, keep calling next_page_url from API response to paginate through object | ||||||
while response_details.get('next_page'): | ||||||
next_page_url = f"https://{configuration.get('vaultDNS')}{response_details.get('next_page')}" | ||||||
obj_vql = requests.post(next_page_url, headers = vql_headers).json() | ||||||
response_details = obj_vql.get('responseDetails') | ||||||
log.info(f"VQL query response details: {response_details}") | ||||||
if response_details.get('size') > 0: | ||||||
for record in obj_vql.get('data'): | ||||||
row = {k: v[0] if isinstance(v, list) else v for k,v in record.items()} | ||||||
# Upsert row to object's table | ||||||
yield op.upsert(object_label, row) | ||||||
# End Veeva Session after all objects and pages are iterated through | ||||||
endVeevaSession(configuration, session_id) | ||||||
|
||||||
# Set cursor/filter time for next sync to be the start time of this current sync | ||||||
yield op.checkpoint({"cursor": current_time}) | ||||||
|
||||||
# This creates the connector object that will use the update function defined in this connector.py file. | ||||||
connector = Connector(update=update, schema=schema) | ||||||
|
||||||
# Check if the script is being run as the main module. | ||||||
# This is Python's standard entry method allowing your script to be run directly from the command line or IDE 'run' button. | ||||||
# This is useful for debugging while you write your code. Note this method is not called by Fivetran when executing your connector in production. | ||||||
# Please test using the Fivetran debug command prior to finalizing and deploying your connector. | ||||||
if __name__ == "__main__": | ||||||
# Open the configuration.json file and load its contents into a dictionary. | ||||||
with open("configuration.json", 'r') as f: | ||||||
configuration = json.load(f) | ||||||
# Adding this code to your `connector.py` allows you to test your connector by running your file directly from your IDE: | ||||||
connector.debug(configuration=configuration) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.