Skip to content

Commit

Permalink
feat: add support for middleware (#308)
Browse files Browse the repository at this point in the history
* feat: add support for middleware

* feat: add middleware logic and test

* feat: fixes based on comments

* fix: nits fix

* fix: interal function improve

* test: isolate middlewareRunner test from amplitude client
  • Loading branch information
yuhao900914 authored Dec 15, 2021
1 parent 841f9ce commit 781a3d0
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 15 deletions.
92 changes: 77 additions & 15 deletions src/main/java/com/amplitude/api/AmplitudeClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ public class AmplitudeClient {
* The background event uploading worker thread instance.
*/
WorkerThread httpThread = new WorkerThread("httpThread");
/**
* The runner for middleware
* */
MiddlewareRunner middlewareRunner = new MiddlewareRunner();

/**
* Instantiates a new default instance AmplitudeClient and starts worker threads.
Expand Down Expand Up @@ -633,7 +637,7 @@ public AmplitudeClient setTrackingOptions(TrackingOptions trackingOptions) {
apiPropertiesTrackingOptions = appliedTrackingOptions.getApiPropertiesTrackingOptions();
return this;
}

/**
* Enable COPPA (Children's Online Privacy Protection Act) restrictions on ADID, city, IP address and location tracking.
* This can be used by any customer that does not want to collect ADID, city, IP address and location tracking.
Expand Down Expand Up @@ -826,6 +830,13 @@ void useForegroundTracking() {
*/
boolean isUsingForegroundTracking() { return usingForegroundTracking; }

/**
* Add middleware to the middleware runner
*/
void addEventMiddleware(Middleware middleware) {
middlewareRunner.add(middleware);
}

/**
* Whether app is in the foreground.
*
Expand Down Expand Up @@ -919,11 +930,15 @@ public void logEvent(String eventType, JSONObject eventProperties, JSONObject gr
* Tracking Sessions</a>
*/
public void logEvent(String eventType, JSONObject eventProperties, JSONObject groups, long timestamp, boolean outOfSession) {
logEvent(eventType, eventProperties, groups,
timestamp, outOfSession, null);
}

public void logEvent(String eventType, JSONObject eventProperties, JSONObject groups, long timestamp, boolean outOfSession, MiddlewareExtra extra) {
if (validateLogEvent(eventType)) {
logEventAsync(
eventType, eventProperties, null, null, groups, null,
timestamp, outOfSession
);
timestamp, outOfSession, extra);
}
}

Expand Down Expand Up @@ -1047,9 +1062,15 @@ protected boolean validateLogEvent(String eventType) {
* @param timestamp the timestamp
* @param outOfSession the out of session
*/
protected void logEventAsync(final String eventType, JSONObject eventProperties,
JSONObject apiProperties, JSONObject userProperties, JSONObject groups,
JSONObject groupProperties, final long timestamp, final boolean outOfSession) {
logEventAsync(eventType,eventProperties, apiProperties, userProperties, groups,groupProperties, timestamp, outOfSession, null);
};

protected void logEventAsync(final String eventType, JSONObject eventProperties,
JSONObject apiProperties, JSONObject userProperties, JSONObject groups,
JSONObject groupProperties, final long timestamp, final boolean outOfSession) {
JSONObject groupProperties, final long timestamp, final boolean outOfSession, MiddlewareExtra extra) {
// Clone the incoming eventProperties object before sending over
// to the log thread. Helps avoid ConcurrentModificationException
// if the caller starts mutating the object they passed in.
Expand Down Expand Up @@ -1088,7 +1109,7 @@ public void run() {
}
logEvent(
eventType, copyEventProperties, copyApiProperties,
copyUserProperties, copyGroups, copyGroupProperties, timestamp, outOfSession
copyUserProperties, copyGroups, copyGroupProperties, timestamp, outOfSession, extra
);
}
});
Expand All @@ -1107,9 +1128,16 @@ public void run() {
* @param outOfSession the out of session
* @return the event ID if succeeded, else -1.
*/
protected long logEvent(String eventType, JSONObject eventProperties, JSONObject apiProperties,
JSONObject userProperties, JSONObject groups, JSONObject groupProperties,
long timestamp, boolean outOfSession) {
return logEvent(eventType, eventProperties, apiProperties, userProperties, groups, groupProperties, timestamp,outOfSession, null);
}

protected long logEvent(String eventType, JSONObject eventProperties, JSONObject apiProperties,
JSONObject userProperties, JSONObject groups, JSONObject groupProperties,
long timestamp, boolean outOfSession) {
long timestamp, boolean outOfSession, MiddlewareExtra extra) {

logger.d(TAG, "Logged event to Amplitude: " + eventType);

if (optOut) {
Expand Down Expand Up @@ -1214,7 +1242,7 @@ protected long logEvent(String eventType, JSONObject eventProperties, JSONObject
event.put("groups", (groups == null) ? new JSONObject() : truncate(groups));
event.put("group_properties", (groupProperties == null) ? new JSONObject()
: truncate(groupProperties));
result = saveEvent(eventType, event);
result = saveEvent(eventType, event, extra);
} catch (JSONException e) {
logger.e(TAG, String.format(
"JSON Serialization of event type %s failed, skipping: %s", eventType, e.toString()
Expand All @@ -1231,7 +1259,9 @@ protected long logEvent(String eventType, JSONObject eventProperties, JSONObject
* @param event the event
* @return the event ID if succeeded, else -1
*/
protected long saveEvent(String eventType, JSONObject event) {
protected long saveEvent(String eventType, JSONObject event, MiddlewareExtra extra) {
if (!middlewareRunner.run(new MiddlewarePayload(event, extra))) return -1;

String eventString = event.toString();
if (Utils.isEmptyString(eventString)) {
logger.e(TAG, String.format(
Expand Down Expand Up @@ -1518,6 +1548,10 @@ public void logRevenue(String productId, int quantity, double price) {
logRevenue(productId, quantity, price, null, null);
}

public void logRevenue(String productId, int quantity, double price, String receipt,
String receiptSignature) {
logRevenue(productId, quantity, price, receipt, receiptSignature, null);
}
/**
* Log revenue with a productId, quantity, price, and receipt data for revenue verification.
*
Expand All @@ -1531,7 +1565,7 @@ public void logRevenue(String productId, int quantity, double price) {
* Tracking Revenue</a>
*/
public void logRevenue(String productId, int quantity, double price, String receipt,
String receiptSignature) {
String receiptSignature, MiddlewareExtra extra) {
if (!contextAndApiKeySet("logRevenue()")) {
return;
}
Expand All @@ -1550,7 +1584,7 @@ public void logRevenue(String productId, int quantity, double price, String rece
}

logEventAsync(
Constants.AMP_REVENUE_EVENT, null, apiProperties, null, null, null, getCurrentTimeMillis(), false
Constants.AMP_REVENUE_EVENT, null, apiProperties, null, null, null, getCurrentTimeMillis(), false, extra
);
}

Expand All @@ -1561,11 +1595,15 @@ Constants.AMP_REVENUE_EVENT, null, apiProperties, null, null, null, getCurrentTi
* @param revenue a {@link Revenue} object
*/
public void logRevenueV2(Revenue revenue) {
logRevenueV2(revenue, null);
}

public void logRevenueV2(Revenue revenue, MiddlewareExtra extra) {
if (!contextAndApiKeySet("logRevenueV2()") || revenue == null || !revenue.isValidRevenue()) {
return;
}

logEvent(Constants.AMP_REVENUE_EVENT, revenue.toJSONObject());
logEvent(Constants.AMP_REVENUE_EVENT, revenue.toJSONObject(), null, null, null, null, getCurrentTimeMillis(), false, extra);
}

/**
Expand All @@ -1589,6 +1627,18 @@ public void setUserProperties(final JSONObject userProperties, final boolean rep
* @param userProperties the user properties
*/
public void setUserProperties(final JSONObject userProperties) {
setUserProperties(userProperties, null);
}

/**
* Sets user properties. This is a convenience wrapper around the
* {@link Identify} API to set multiple user properties with a single
* command.
*
* @param userProperties the user properties
* @param extra the middleware extra object
*/
public void setUserProperties(final JSONObject userProperties, MiddlewareExtra extra) {
if (userProperties == null || userProperties.length() == 0 ||
!contextAndApiKeySet("setUserProperties")) {
return;
Expand All @@ -1610,7 +1660,7 @@ public void setUserProperties(final JSONObject userProperties) {
logger.e(TAG, e.toString());
}
}
identify(identify);
identify(identify, false, extra);
}

/**
Expand All @@ -1632,6 +1682,10 @@ public void identify(Identify identify) {
identify(identify, false);
}

public void identify(Identify identify, boolean outOfSession) {
identify(identify, outOfSession, null);
}

/**
* Identify. Use this to send an {@link com.amplitude.api.Identify} object containing
* user property operations to Amplitude server. If outOfSession is true, then the identify
Expand All @@ -1640,14 +1694,14 @@ public void identify(Identify identify) {
* @param identify an {@link Identify} object
* @param outOfSession whther to log the identify event out of session
*/
public void identify(Identify identify, boolean outOfSession) {
public void identify(Identify identify, boolean outOfSession, MiddlewareExtra extra) {
if (
identify == null || identify.userPropertiesOperations.length() == 0 ||
!contextAndApiKeySet("identify()")
) return;
logEventAsync(
Constants.IDENTIFY_EVENT, null, null, identify.userPropertiesOperations,
null, null, getCurrentTimeMillis(), outOfSession
null, null, getCurrentTimeMillis(), outOfSession, extra
);
}

Expand All @@ -1658,6 +1712,10 @@ null, null, getCurrentTimeMillis(), outOfSession
* @param groupName the group name (ex: 15)
*/
public void setGroup(String groupType, Object groupName) {
setGroup(groupType, groupName, null);
}

public void setGroup(String groupType, Object groupName, MiddlewareExtra extra) {
if (!contextAndApiKeySet("setGroup()") || Utils.isEmptyString(groupType)) {
return;
}
Expand All @@ -1671,14 +1729,18 @@ public void setGroup(String groupType, Object groupName) {

Identify identify = new Identify().setUserProperty(groupType, groupName);
logEventAsync(Constants.IDENTIFY_EVENT, null, null, identify.userPropertiesOperations,
group, null, getCurrentTimeMillis(), false);
group, null, getCurrentTimeMillis(), false, extra);
}

public void groupIdentify(String groupType, Object groupName, Identify groupIdentify) {
groupIdentify(groupType, groupName, groupIdentify, false);
}

public void groupIdentify(String groupType, Object groupName, Identify groupIdentify, boolean outOfSession) {
groupIdentify(groupType, groupName, groupIdentify, false, null);
}

public void groupIdentify(String groupType, Object groupName, Identify groupIdentify, boolean outOfSession, MiddlewareExtra extra) {
if (groupIdentify == null || groupIdentify.userPropertiesOperations.length() == 0 ||
!contextAndApiKeySet("groupIdentify()") || Utils.isEmptyString(groupType)) {

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/com/amplitude/api/Middleware.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.amplitude.api;

public interface Middleware {
void run(MiddlewarePayload payload, MiddlewareNext next);
}
7 changes: 7 additions & 0 deletions src/main/java/com/amplitude/api/MiddlewareExtra.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.amplitude.api;

import org.json.JSONObject;

public class MiddlewareExtra extends JSONObject {

}
5 changes: 5 additions & 0 deletions src/main/java/com/amplitude/api/MiddlewareNext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.amplitude.api;

public interface MiddlewareNext {
public void run(MiddlewarePayload curPayload);
}
17 changes: 17 additions & 0 deletions src/main/java/com/amplitude/api/MiddlewarePayload.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.amplitude.api;

import org.json.JSONObject;

public class MiddlewarePayload {
public JSONObject event;
public MiddlewareExtra extra;

public MiddlewarePayload(JSONObject event, MiddlewareExtra extra) {
this.event = event;
this.extra = extra;
}

public MiddlewarePayload(JSONObject event) {
this(event, null);
}
}
48 changes: 48 additions & 0 deletions src/main/java/com/amplitude/api/MiddlewareRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.amplitude.api;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class MiddlewareRunner {
private final ConcurrentLinkedQueue<Middleware> middlewares;

public MiddlewareRunner() {
middlewares = new ConcurrentLinkedQueue<>();
}

public void add(Middleware middleware) {
this.middlewares.add(middleware);
}

private void runMiddlewares(List<Middleware> middlewares, MiddlewarePayload payload, MiddlewareNext next) {
if (middlewares.size() == 0 ){
next.run(payload);
return;
}
middlewares.get(0).run(payload, new MiddlewareNext() {
@Override
public void run(MiddlewarePayload curPayload) {
runMiddlewares((middlewares.subList(1, middlewares.size())), curPayload, next);
}
});
}

public boolean run(MiddlewarePayload payload) {
AtomicBoolean middlewareCompleted = new AtomicBoolean(false);
this.run(payload, new MiddlewareNext() {
@Override
public void run(MiddlewarePayload curPayload) {
middlewareCompleted.set(true);
}
});
return middlewareCompleted.get();
}

public void run(MiddlewarePayload payload, MiddlewareNext next) {
List<Middleware> middlewareList = new ArrayList<>(this.middlewares);
runMiddlewares(middlewareList, payload, next);
}
}
47 changes: 47 additions & 0 deletions src/test/java/com/amplitude/api/AmplitudeClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2009,4 +2009,51 @@ public void testSetServerZoneAndUpdateServerUrl() {
assertEquals(euZone, getPrivateFieldValueFromClient(amplitude, "serverZone"));
assertEquals(Constants.EVENT_LOG_EU_URL, amplitude.url);
}

@Test
public void testMiddlewareSupport() throws JSONException {
ShadowLooper looper = Shadows.shadowOf(amplitude.logThread.getLooper());
looper.runToEndOfTasks();
MiddlewareExtra extra = new MiddlewareExtra();
extra.put("description", "extra description");
Middleware middleware = new Middleware() {
@Override
public void run(MiddlewarePayload payload, MiddlewareNext next) {
try {
payload.event.optJSONObject("event_properties").put("description", "extra description");
} catch (JSONException e) {
e.printStackTrace();
}

next.run(payload);
}
};
amplitude.addEventMiddleware(middleware);
amplitude.logEvent("middleware_event_type", new JSONObject().put("user_id", "middleware_user"), null, System.currentTimeMillis(), false, extra);
looper.runToEndOfTasks();
looper.runToEndOfTasks();

assertEquals(getUnsentEventCount(), 1);
JSONArray eventObject = getUnsentEvents(1);;
assertEquals(eventObject.optJSONObject(0).optString("event_type"), "middleware_event_type");
assertEquals(eventObject.optJSONObject(0).optJSONObject("event_properties").getString("description"), "extra description");
assertEquals(eventObject.optJSONObject(0).optJSONObject("event_properties").optString("user_id"), "middleware_user");
}

@Test
public void testWithSwallowMiddleware() throws JSONException {
ShadowLooper looper = Shadows.shadowOf(amplitude.logThread.getLooper());
looper.runToEndOfTasks();
Middleware middleware = new Middleware() {
@Override
public void run(MiddlewarePayload payload, MiddlewareNext next) {
}
};
amplitude.addEventMiddleware(middleware);
amplitude.logEvent("middleware_event_type", new JSONObject().put("user_id", "middleware_user"), null, System.currentTimeMillis(), false, null);
looper.runToEndOfTasks();
looper.runToEndOfTasks();

assertEquals(getUnsentEventCount(), 0);
}
}
Loading

0 comments on commit 781a3d0

Please sign in to comment.