Skip to content

Commit

Permalink
Merge pull request #84 from treasure-data/support_cursor_based_increm…
Browse files Browse the repository at this point in the history
…ental

Support cursor based incremental
  • Loading branch information
hieudion authored Aug 7, 2023
2 parents 602fbbb + ba1871f commit f37e5ce
Show file tree
Hide file tree
Showing 11 changed files with 1,143 additions and 5 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
## 0.4.2 - 2022-23-05
## 0.4.4 - 2023-07-21
* [enhancement] Support cursor based incremental [#84](https://github.com/treasure-data/embulk-input-zendesk/pull/84)

## 0.4.3 - 2022-10-21
* [enhancement] Bump up to v0.4.3, built with the Gradle plugin v0.5.5 [#78](https://github.com/treasure-data/embulk-input-zendesk/pull/78)

## 0.4.2 - 2022-05-23
* [enhancement] Catchup embulk v0.10.32 [#77](https://github.com/treasure-data/embulk-input-zendesk/pull/77)

## 0.4.1 - 2022-29-03
## 0.4.1 - 2022-03-29
* [enhancement] Remove deprecated functions [#76](https://github.com/treasure-data/embulk-input-zendesk/pull/76)

## 0.4.0 - 2022-03-03
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Required Embulk version >= 0.9.6.
- **profile_source**: Profile source of user event, required if `target` is `user_events`.
- **user_event_source**: Source of user event, required if `target` is `user_events`.
- **user_event_type**: Type of user event, required if `target` is `user_events`.
- **enable_cursor_based_api**: Enable to use cursor based api endpoint for tickets and users target (boolean, default: `false`)

## Example

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ repositories {
def embulkVersion = '0.10.31'

group = "com.treasuredata.embulk.plugins"
version = "0.4.3-SNAPSHOT"
version = "0.4.4-SNAPSHOT"
description = "Loads records From Zendesk"

sourceCompatibility = 1.8
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/org/embulk/input/zendesk/ZendeskInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.embulk.input.zendesk.models.AuthenticationMethod;
import org.embulk.input.zendesk.models.Target;
import org.embulk.input.zendesk.services.ZendeskChatService;
import org.embulk.input.zendesk.services.ZendeskCursorBasedService;
import org.embulk.input.zendesk.services.ZendeskCustomObjectService;
import org.embulk.input.zendesk.services.ZendeskNPSService;
import org.embulk.input.zendesk.services.ZendeskService;
Expand Down Expand Up @@ -157,6 +158,10 @@ public interface PluginTask
@ConfigDefault("null")
Optional<String> getUserEventSource();

@Config("enable_cursor_based_api")
@ConfigDefault("false")
boolean getEnableCursorBasedApi();

@Config("columns")
SchemaConfig getColumns();
}
Expand Down Expand Up @@ -301,7 +306,7 @@ private JsonNode addAllColumnsToSchema(final JsonNode jsonNode, final Target tar
ConfigDiff configDiff = guessData(jsonNode, target.getJsonName());
ConfigDiff parser = configDiff.getNested("parser");
if (parser.has("columns")) {
JsonNode columns = parser.get(JsonNode.class, "columns");
JsonNode columns = parser.get(JsonNode.class, "columns");
final Iterator<JsonNode> ite = columns.elements();

while (ite.hasNext()) {
Expand Down Expand Up @@ -444,6 +449,12 @@ protected ZendeskService dispatchPerTarget(ZendeskInputPlugin.PluginTask task)
switch (task.getTarget()) {
case TICKETS:
case USERS:
/*
The cursor based incremental API is enabled only tickets and users targets
It allows to fetch more than 10.000 records which is now the limitation of the old incremental api
https://developer.zendesk.com/documentation/ticketing/managing-tickets/using-the-incremental-export-api/#cursor-based-incremental-exports
*/
return task.getEnableCursorBasedApi() ? new ZendeskCursorBasedService(task) : new ZendeskSupportAPIService(task);
case ORGANIZATIONS:
case TICKET_METRICS:
case TICKET_EVENTS:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
package org.embulk.input.zendesk.services;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.http.HttpStatus;
import org.apache.http.client.utils.URIBuilder;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
import org.embulk.input.zendesk.RecordImporter;
import org.embulk.input.zendesk.ZendeskInputPlugin;
import org.embulk.input.zendesk.clients.ZendeskRestClient;
import org.embulk.input.zendesk.models.ZendeskException;
import org.embulk.input.zendesk.utils.ZendeskConstants;
import org.embulk.input.zendesk.utils.ZendeskDateUtils;
import org.embulk.input.zendesk.utils.ZendeskUtils;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URISyntaxException;
import java.util.Iterator;

import static org.embulk.input.zendesk.ZendeskInputPlugin.CONFIG_MAPPER_FACTORY;

public class ZendeskCursorBasedService
implements ZendeskService
{
private static final Logger logger = LoggerFactory.getLogger(ZendeskNormalServices.class);

protected ZendeskInputPlugin.PluginTask task;

private ZendeskRestClient zendeskRestClient;

public ZendeskCursorBasedService(final ZendeskInputPlugin.PluginTask task)
{
this.task = task;
}

@Override
public boolean isSupportIncremental()
{
return true;
}

@Override
public TaskReport addRecordToImporter(int taskIndex, RecordImporter recordImporter)
{
TaskReport taskReport = CONFIG_MAPPER_FACTORY.newTaskReport();
importData(task, recordImporter, taskReport);

return taskReport;
}

@Override
public JsonNode getDataFromPath(String path, int page, boolean isPreview, long startTime)
{
try {
String buildPath = buildPath(0);
final String response = getZendeskRestClient().doGet(buildPath, task, Exec.isPreview());
return ZendeskUtils.parseJsonObject(response);
}
catch (URISyntaxException e) {
throw new ConfigException(e);
}
}

@VisibleForTesting
protected ZendeskRestClient getZendeskRestClient()
{
if (zendeskRestClient == null) {
zendeskRestClient = new ZendeskRestClient();
}
return zendeskRestClient;
}

private void importData(final ZendeskInputPlugin.PluginTask task, final RecordImporter recordImporter, final TaskReport taskReport)
{
long initStartTime = 0;

if (task.getStartTime().isPresent()) {
initStartTime = ZendeskDateUtils.getStartTime(task.getStartTime().get());
}

long nextStartTime = initStartTime;
long totalRecords = 0;
try {
String path = buildPath(initStartTime);

while (true) {
final JsonNode result = fetchResultFromPath(path);

final Iterator<JsonNode> iterator = ZendeskUtils.getListRecords(result, task.getTarget().getJsonName());

int numberOfRecords = 0;

while (iterator.hasNext()) {
final JsonNode recordJsonNode = iterator.next();
fetchSubResourceAndAddToImporter(recordJsonNode, task, recordImporter);
numberOfRecords++;
// Store nextStartTime of last item
if (!iterator.hasNext() && task.getIncremental()) {
nextStartTime = ZendeskDateUtils.isoToEpochSecond(recordJsonNode.get(ZendeskConstants.Field.UPDATED_AT).asText());
}
}

totalRecords = totalRecords + numberOfRecords;
if (result.has(ZendeskConstants.Field.END_OF_STREAM)) {
if (result.get(ZendeskConstants.Field.END_OF_STREAM).asBoolean()) {
break;
}
}
else {
throw new DataException("Missing end of stream, please double-check the endpoint");
}
if (Exec.isPreview()) {
break;
}

path = result.get(ZendeskConstants.Field.AFTER_URL).asText();
}

logger.info("import records total " + totalRecords);

if (!Exec.isPreview() && task.getIncremental()) {
storeStartTimeForConfigDiff(taskReport, nextStartTime);
}
}
catch (Exception e) {
throw new DataException(e);
}
}

private String buildPath(long startTime)
throws URISyntaxException
{
return ZendeskUtils.getURIBuilder(task.getLoginUrl()).setPath(ZendeskConstants.Url.API + "/" + "incremental" + "/" + task.getTarget().toString() + "/" + "cursor.json").build().toString() + "?start_time=" + startTime;
}

private JsonNode fetchResultFromPath(String path)
{
final String response = getZendeskRestClient().doGet(path, task, Exec.isPreview());
return ZendeskUtils.parseJsonObject(response);
}

private void fetchSubResourceAndAddToImporter(final JsonNode jsonNode, final ZendeskInputPlugin.PluginTask task, final RecordImporter recordImporter)
{
task.getIncludes().forEach(include -> {
final String relatedObjectName = include.trim();

final URIBuilder uriBuilder = ZendeskUtils.getURIBuilder(task.getLoginUrl()).setPath(ZendeskConstants.Url.API + "/" + task.getTarget().toString() + "/" + jsonNode.get(ZendeskConstants.Field.ID).asText() + "/" + relatedObjectName + ".json");
try {
final JsonNode result = getDataFromPath(uriBuilder.toString(), 0, false, 0);
if (result != null && result.has(relatedObjectName)) {
((ObjectNode) jsonNode).set(include, result.get(relatedObjectName));
}
}
catch (final ConfigException e) {
// Sometimes we get 404 when having invalid endpoint, so ignore when we get 404 InvalidEndpoint
if (!(e.getCause() instanceof ZendeskException && ((ZendeskException) e.getCause()).getStatusCode() == HttpStatus.SC_NOT_FOUND)) {
throw e;
}
}
});

recordImporter.addRecord(jsonNode);
}

private void storeStartTimeForConfigDiff(final TaskReport taskReport, final long nextStartTime)
{
taskReport.set(ZendeskConstants.Field.START_TIME, nextStartTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public static class Field
public static final String GENERATED_TIMESTAMP = "generated_timestamp";
public static final String UPDATED_AT = "updated_at";
public static final String ID = "id";
public static final String END_OF_STREAM = "end_of_stream";
public static final String AFTER_URL = "after_url";
}

public static class Url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static String convertToDateTimeFormat(String datetime, String dateTimeFor
}

// start_time should be start from 0
public static long getStartTime(final String time)
public static long getStartTime(final String time)
{
try {
return isoToEpochSecond(time);
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/org/embulk/input/zendesk/TestZendeskInputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.embulk.config.TaskSource;
import org.embulk.input.zendesk.models.Target;
import org.embulk.input.zendesk.services.ZendeskChatService;
import org.embulk.input.zendesk.services.ZendeskCursorBasedService;
import org.embulk.input.zendesk.services.ZendeskCustomObjectService;
import org.embulk.input.zendesk.services.ZendeskNPSService;
import org.embulk.input.zendesk.services.ZendeskService;
Expand Down Expand Up @@ -205,6 +206,25 @@ public void testDispatchPerTargetShouldReturnSupportAPIService()
testReturnSupportAPIService(Target.ORGANIZATIONS);
}

@Test
public void testDispatchPerTargetShouldReturn()
{
zendeskInputPlugin = spy(new ZendeskInputPlugin());

final ConfigSource src = ZendeskTestHelper.getConfigSource("base.yml");
src.set("target", Target.TICKETS.name().toLowerCase());
src.set("columns", Collections.EMPTY_LIST);
src.set("enable_cursor_based_api", true);
ZendeskInputPlugin.PluginTask task = CONFIG_MAPPER.map(src, ZendeskInputPlugin.PluginTask.class);
ZendeskService zendeskService = zendeskInputPlugin.dispatchPerTarget(task);
assertTrue(zendeskService instanceof ZendeskCursorBasedService);

src.set("target", Target.USERS.name().toLowerCase());
task = CONFIG_MAPPER.map(src, ZendeskInputPlugin.PluginTask.class);
zendeskService = zendeskInputPlugin.dispatchPerTarget(task);
assertTrue(zendeskService instanceof ZendeskCursorBasedService);
}

@Test
public void testDispatchPerTargetShouldReturnNPSService()
{
Expand Down
Loading

0 comments on commit f37e5ce

Please sign in to comment.