Skip to content

Commit 39a60b0

Browse files
authored
Merge pull request #1 from Azure-Samples/groceryStoreExampmle
In SampleGroceryStore changefeed triggers three core functionalities
2 parents aa65b13 + 36ea327 commit 39a60b0

File tree

1 file changed

+323
-0
lines changed

1 file changed

+323
-0
lines changed
Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.azure.cosmos.examples.workedappexample;
4+
5+
import com.azure.cosmos.ChangeFeedProcessor;
6+
import com.azure.cosmos.ConnectionPolicy;
7+
import com.azure.cosmos.ConsistencyLevel;
8+
import com.azure.cosmos.CosmosAsyncContainer;
9+
import com.azure.cosmos.CosmosAsyncContainerResponse;
10+
import com.azure.cosmos.CosmosAsyncDatabase;
11+
import com.azure.cosmos.CosmosClientBuilder;
12+
import com.azure.cosmos.CosmosClientException;
13+
import com.azure.cosmos.CosmosContainerProperties;
14+
import com.azure.cosmos.CosmosContainerRequestOptions;
15+
import com.azure.cosmos.CosmosAsyncClient;
16+
import com.azure.cosmos.examples.common.AccountSettings;
17+
import com.azure.cosmos.implementation.Utils;
18+
import com.fasterxml.jackson.core.JsonProcessingException;
19+
import com.fasterxml.jackson.databind.JsonNode;
20+
import com.fasterxml.jackson.databind.ObjectMapper;
21+
import com.fasterxml.jackson.databind.node.ObjectNode;
22+
import org.apache.commons.lang3.RandomStringUtils;
23+
import org.apache.commons.lang3.StringUtils;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
import reactor.core.scheduler.Schedulers;
27+
28+
import java.text.DateFormat;
29+
import java.text.ParseException;
30+
import java.text.SimpleDateFormat;
31+
import java.time.Duration;
32+
import java.util.Date;
33+
import java.util.List;
34+
import java.util.concurrent.TimeUnit;
35+
36+
/**
37+
* Sample for Change Feed Processor.
38+
*
39+
*/
40+
public class SampleGroceryStore {
41+
42+
public static int WAIT_FOR_WORK = 60000;
43+
public static final String DATABASE_NAME = "db_" + RandomStringUtils.randomAlphabetic(7);
44+
public static final String COLLECTION_NAME = "coll_" + RandomStringUtils.randomAlphabetic(7);
45+
private static final ObjectMapper OBJECT_MAPPER = Utils.getSimpleObjectMapper();
46+
protected static Logger logger = LoggerFactory.getLogger(SampleGroceryStore.class.getSimpleName());
47+
48+
49+
private static ChangeFeedProcessor changeFeedProcessorInstance;
50+
private static boolean isWorkCompleted = false;
51+
52+
private static CosmosAsyncContainer typeContainer;
53+
private static CosmosAsyncContainer expiryDateContainer;
54+
55+
public static void main (String[]args) {
56+
logger.info("BEGIN Sample");
57+
58+
try {
59+
60+
System.out.println("-->CREATE DocumentClient");
61+
CosmosAsyncClient client = getCosmosClient();
62+
63+
System.out.println("-->CREATE Contoso Grocery Store database: " + DATABASE_NAME);
64+
CosmosAsyncDatabase cosmosDatabase = createNewDatabase(client, DATABASE_NAME);
65+
66+
System.out.println("-->CREATE container for store inventory: " + COLLECTION_NAME);
67+
CosmosAsyncContainer feedContainer = createNewCollection(client, DATABASE_NAME, COLLECTION_NAME, "/id");
68+
69+
System.out.println("-->CREATE container for lease: " + COLLECTION_NAME + "-leases");
70+
CosmosAsyncContainer leaseContainer = createNewLeaseCollection(client, DATABASE_NAME, COLLECTION_NAME + "-leases");
71+
72+
System.out.println("-->CREATE container for materialized view partitioned by 'type': " + COLLECTION_NAME + "-leases");
73+
typeContainer = createNewCollection(client, DATABASE_NAME, COLLECTION_NAME + "-pktype", "/type");
74+
75+
System.out.println("-->CREATE container for materialized view with aggregation rule based on days until expiration " + COLLECTION_NAME + "-leases");
76+
expiryDateContainer = createNewCollection(client, DATABASE_NAME, COLLECTION_NAME + "-pkexpiryDate", "/expiryDaysRemaining");
77+
78+
changeFeedProcessorInstance = getChangeFeedProcessor("SampleHost_1", feedContainer, leaseContainer);
79+
changeFeedProcessorInstance.start()
80+
.subscribeOn(Schedulers.elastic())
81+
.doOnSuccess(aVoid -> {
82+
//Insert 10 documents into the feed container
83+
//createNewDocumentsJSON demonstrates how to insert a JSON object into a Cosmos DB container as an item
84+
createNewDocumentsJSON(feedContainer, 10, Duration.ofSeconds(3));
85+
isWorkCompleted = true;
86+
})
87+
.subscribe();
88+
89+
long remainingWork = WAIT_FOR_WORK;
90+
while (!isWorkCompleted && remainingWork > 0) {
91+
Thread.sleep(100);
92+
remainingWork -= 100;
93+
}
94+
95+
if (isWorkCompleted) {
96+
if (changeFeedProcessorInstance != null) {
97+
changeFeedProcessorInstance.stop().subscribe();
98+
}
99+
} else {
100+
throw new RuntimeException("The change feed processor initialization and automatic create document feeding process did not complete in the expected time");
101+
}
102+
103+
System.out.println("-->DELETE sample's database: " + DATABASE_NAME);
104+
deleteDatabase(cosmosDatabase);
105+
106+
Thread.sleep(500);
107+
108+
} catch (Exception e) {
109+
e.printStackTrace();
110+
}
111+
112+
System.out.println("END Sample");
113+
}
114+
115+
public static ChangeFeedProcessor getChangeFeedProcessor(String hostName, CosmosAsyncContainer feedContainer, CosmosAsyncContainer leaseContainer) {
116+
return ChangeFeedProcessor.changeFeedProcessorBuilder()
117+
.setHostName(hostName)
118+
.setFeedContainer(feedContainer)
119+
.setLeaseContainer(leaseContainer)
120+
.setHandleChanges((List<JsonNode> docs) -> {
121+
//System.out.println("--->setHandleChanges() START");
122+
123+
for (JsonNode document : docs) {
124+
/* System.out.println("---->DOCUMENT RECEIVED: " + OBJECT_MAPPER.writerWithDefaultPrettyPrinter()
125+
.writeValueAsString(document)); */
126+
//Each document update from the feed container branches out to all three user services
127+
updateInventoryAlertService(document);
128+
updateInventoryTypeMaterializedView(document);
129+
updateInventoryExpiryDateAggregationPolicyMaterializedView(document);
130+
131+
//Forward document =>
132+
}
133+
//System.out.println("--->handleChanges() END");
134+
135+
})
136+
.build();
137+
}
138+
139+
private static void updateInventoryAlertService(JsonNode document) {
140+
System.out.println("Alert: Added new item of type " + document.get("type") + "\n");
141+
}
142+
143+
private static void updateInventoryTypeMaterializedView(JsonNode document) {
144+
typeContainer.createItem(document).subscribe();
145+
}
146+
147+
private static void updateInventoryExpiryDateAggregationPolicyMaterializedView(JsonNode document) {
148+
ObjectMapper mapper = new ObjectMapper();
149+
JsonNode transformed_document = null;
150+
151+
//Deep-copy the input document
152+
try {
153+
transformed_document = document.deepCopy();
154+
} catch (Exception e) {
155+
e.printStackTrace();
156+
}
157+
158+
DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
159+
160+
try {
161+
long days_passed = TimeUnit.MILLISECONDS.toDays
162+
(
163+
((Date) formatter.parse("2020-03-30")).getTime() - ((Date) formatter.parse(document.get("expiryDate").textValue())).getTime()
164+
);
165+
166+
((ObjectNode)transformed_document).remove("expiryDate");
167+
((ObjectNode) transformed_document).put("expiryDaysRemaining", String.format("%d", days_passed));
168+
169+
expiryDateContainer.createItem(transformed_document).subscribe();
170+
} catch (ParseException e) {
171+
e.printStackTrace();
172+
}
173+
typeContainer.createItem(document).subscribe();
174+
}
175+
176+
public static CosmosAsyncClient getCosmosClient() {
177+
178+
return new CosmosClientBuilder()
179+
.setEndpoint(AccountSettings.HOST)
180+
.setKey(AccountSettings.MASTER_KEY)
181+
.setConnectionPolicy(ConnectionPolicy.getDefaultPolicy())
182+
.setConsistencyLevel(ConsistencyLevel.EVENTUAL)
183+
.buildAsyncClient();
184+
}
185+
186+
public static CosmosAsyncDatabase createNewDatabase(CosmosAsyncClient client, String databaseName) {
187+
return client.createDatabaseIfNotExists(databaseName).block().getDatabase();
188+
}
189+
190+
public static void deleteDatabase(CosmosAsyncDatabase cosmosDatabase) {
191+
cosmosDatabase.delete().block();
192+
}
193+
194+
public static CosmosAsyncContainer createNewCollection(CosmosAsyncClient client, String databaseName, String collectionName, String partitionKey) {
195+
CosmosAsyncDatabase databaseLink = client.getDatabase(databaseName);
196+
CosmosAsyncContainer collectionLink = databaseLink.getContainer(collectionName);
197+
CosmosAsyncContainerResponse containerResponse = null;
198+
199+
try {
200+
containerResponse = collectionLink.read().block();
201+
202+
if (containerResponse != null) {
203+
throw new IllegalArgumentException(String.format("Collection %s already exists in database %s.", collectionName, databaseName));
204+
}
205+
} catch (RuntimeException ex) {
206+
if (ex instanceof CosmosClientException) {
207+
CosmosClientException cosmosClientException = (CosmosClientException) ex;
208+
209+
if (cosmosClientException.getStatusCode() != 404) {
210+
throw ex;
211+
}
212+
} else {
213+
throw ex;
214+
}
215+
}
216+
217+
CosmosContainerProperties containerSettings = new CosmosContainerProperties(collectionName, partitionKey);
218+
CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions();
219+
containerResponse = databaseLink.createContainer(containerSettings, 10000, requestOptions).block();
220+
221+
if (containerResponse == null) {
222+
throw new RuntimeException(String.format("Failed to create collection %s in database %s.", collectionName, databaseName));
223+
}
224+
225+
return containerResponse.getContainer();
226+
}
227+
228+
public static CosmosAsyncContainer createNewLeaseCollection(CosmosAsyncClient client, String databaseName, String leaseCollectionName) {
229+
CosmosAsyncDatabase databaseLink = client.getDatabase(databaseName);
230+
CosmosAsyncContainer leaseCollectionLink = databaseLink.getContainer(leaseCollectionName);
231+
CosmosAsyncContainerResponse leaseContainerResponse = null;
232+
233+
try {
234+
leaseContainerResponse = leaseCollectionLink.read().block();
235+
236+
if (leaseContainerResponse != null) {
237+
leaseCollectionLink.delete().block();
238+
239+
try {
240+
Thread.sleep(1000);
241+
} catch (InterruptedException ex) {
242+
ex.printStackTrace();
243+
}
244+
}
245+
} catch (RuntimeException ex) {
246+
if (ex instanceof CosmosClientException) {
247+
CosmosClientException cosmosClientException = (CosmosClientException) ex;
248+
249+
if (cosmosClientException.getStatusCode() != 404) {
250+
throw ex;
251+
}
252+
} else {
253+
throw ex;
254+
}
255+
}
256+
257+
CosmosContainerProperties containerSettings = new CosmosContainerProperties(leaseCollectionName, "/id");
258+
CosmosContainerRequestOptions requestOptions = new CosmosContainerRequestOptions();
259+
260+
leaseContainerResponse = databaseLink.createContainer(containerSettings, 400,requestOptions).block();
261+
262+
if (leaseContainerResponse == null) {
263+
throw new RuntimeException(String.format("Failed to create collection %s in database %s.", leaseCollectionName, databaseName));
264+
}
265+
266+
return leaseContainerResponse.getContainer();
267+
}
268+
269+
public static void createNewDocumentsJSON(CosmosAsyncContainer containerClient, int count, Duration delay) {
270+
System.out.println("Creating documents\n");
271+
String suffix = RandomStringUtils.randomAlphabetic(10);
272+
for (int i = 0; i <= count; i++) {
273+
274+
String jsonString = "{\"id\" : \"" + String.format("0%d-%s", i, suffix) + "\""
275+
+ ","
276+
+ "\"brand\" : \"" + ((char)(65+i)) + "\""
277+
+ ","
278+
+ "\"type\" : \"" + ((char)(69+i)) + "\""
279+
+ ","
280+
+ "\"expiryDate\" : \"" + "2020-03-" + StringUtils.leftPad(String.valueOf(5+i), 2, "0") + "\""
281+
+ "}";
282+
283+
ObjectMapper mapper = new ObjectMapper();
284+
JsonNode document = null;
285+
286+
try {
287+
document = mapper.readTree(jsonString);
288+
} catch (Exception e) {
289+
e.printStackTrace();
290+
}
291+
292+
containerClient.createItem(document).subscribe(doc -> {
293+
System.out.println(".\n");
294+
});
295+
296+
long remainingWork = delay.toMillis();
297+
try {
298+
while (remainingWork > 0) {
299+
Thread.sleep(100);
300+
remainingWork -= 100;
301+
}
302+
} catch (InterruptedException iex) {
303+
// exception caught
304+
break;
305+
}
306+
}
307+
}
308+
309+
public static boolean ensureWorkIsDone(Duration delay) {
310+
long remainingWork = delay.toMillis();
311+
try {
312+
while (!isWorkCompleted && remainingWork > 0) {
313+
Thread.sleep(100);
314+
remainingWork -= 100;
315+
}
316+
} catch (InterruptedException iex) {
317+
return false;
318+
}
319+
320+
return remainingWork > 0;
321+
}
322+
323+
}

0 commit comments

Comments
 (0)