Skip to content

Commit 1b8918c

Browse files
authored
[CAMEL-22401] Add retry strategy for Couchbase GET operation + consumerRetryPause (#19064)
1 parent f245d3a commit 1b8918c

File tree

9 files changed

+144
-89
lines changed

9 files changed

+144
-89
lines changed

catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/couchbase.json

Lines changed: 41 additions & 40 deletions
Large diffs are not rendered by default.

components/camel-couchbase/src/generated/java/org/apache/camel/component/couchbase/CouchbaseEndpointConfigurer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
4141
case "connectTimeout": target.setConnectTimeout(property(camelContext, java.time.Duration.class, value).toMillis()); return true;
4242
case "consumerprocessedstrategy":
4343
case "consumerProcessedStrategy": target.setConsumerProcessedStrategy(property(camelContext, java.lang.String.class, value)); return true;
44+
case "consumerretrypause":
45+
case "consumerRetryPause": target.setConsumerRetryPause(property(camelContext, int.class, value)); return true;
4446
case "delay": target.setDelay(property(camelContext, long.class, value)); return true;
4547
case "descending": target.setDescending(property(camelContext, boolean.class, value)); return true;
4648
case "designdocumentname":
@@ -125,6 +127,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
125127
case "connectTimeout": return long.class;
126128
case "consumerprocessedstrategy":
127129
case "consumerProcessedStrategy": return java.lang.String.class;
130+
case "consumerretrypause":
131+
case "consumerRetryPause": return int.class;
128132
case "delay": return long.class;
129133
case "descending": return boolean.class;
130134
case "designdocumentname":
@@ -210,6 +214,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
210214
case "connectTimeout": return target.getConnectTimeout();
211215
case "consumerprocessedstrategy":
212216
case "consumerProcessedStrategy": return target.getConsumerProcessedStrategy();
217+
case "consumerretrypause":
218+
case "consumerRetryPause": return target.getConsumerRetryPause();
213219
case "delay": return target.getDelay();
214220
case "descending": return target.isDescending();
215221
case "designdocumentname":

components/camel-couchbase/src/generated/java/org/apache/camel/component/couchbase/CouchbaseEndpointUriFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class CouchbaseEndpointUriFactory extends org.apache.camel.support.compon
2323
private static final Set<String> SECRET_PROPERTY_NAMES;
2424
private static final Set<String> MULTI_VALUE_PREFIXES;
2525
static {
26-
Set<String> props = new HashSet<>(48);
26+
Set<String> props = new HashSet<>(49);
2727
props.add("additionalHosts");
2828
props.add("autoStartIdForInserts");
2929
props.add("backoffErrorThreshold");
@@ -34,6 +34,7 @@ public class CouchbaseEndpointUriFactory extends org.apache.camel.support.compon
3434
props.add("collection");
3535
props.add("connectTimeout");
3636
props.add("consumerProcessedStrategy");
37+
props.add("consumerRetryPause");
3738
props.add("delay");
3839
props.add("descending");
3940
props.add("designDocumentName");

components/camel-couchbase/src/generated/resources/META-INF/org/apache/camel/component/couchbase/couchbase.json

Lines changed: 41 additions & 40 deletions
Large diffs are not rendered by default.

components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseCollectionOperation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,11 @@ protected static Boolean setDocument(
7676
* @param queryTimeout
7777
* @return
7878
*/
79-
protected static GetResult getDocument(Collection collection, String id, long queryTimeout) {
79+
protected static GetResult getDocument(Collection collection, String id, long queryTimeout, long retryPause) {
8080
GetOptions options = GetOptions.getOptions()
81-
.timeout(Duration.ofMillis(queryTimeout));
81+
.timeout(Duration.ofMillis(queryTimeout))
82+
.retryStrategy(BestEffortRetryStrategy.withExponentialBackoff(Duration.ofMillis(retryPause),
83+
Duration.ofMillis(retryPause), 1));
8284
return collection.get(id, options);
8385
}
8486

components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseConsumer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ protected int poll() throws Exception {
137137
Object doc;
138138
String id = row.id().get();
139139
if (endpoint.isFullDocument()) {
140-
doc = CouchbaseCollectionOperation.getDocument(collection, id, endpoint.getQueryTimeout());
140+
doc = CouchbaseCollectionOperation.getDocument(collection, id, endpoint.getQueryTimeout(),
141+
endpoint.getConsumerRetryPause());
141142
} else {
142143
doc = row.valueAs(Object.class);
143144
}
@@ -158,7 +159,7 @@ protected int poll() throws Exception {
158159
LOG.trace("Deleting doc with ID {}", id);
159160
}
160161
CouchbaseCollectionOperation.removeDocument(collection, id, endpoint.getWriteQueryTimeout(),
161-
endpoint.getProducerRetryPause());
162+
endpoint.getConsumerRetryPause());
162163
} else if ("filter".equalsIgnoreCase(consumerProcessedStrategy)) {
163164
if (LOG.isTraceEnabled()) {
164165
LOG.trace("Filtering out ID {}", id);

components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseEndpoint.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ public class CouchbaseEndpoint extends ScheduledPollEndpoint implements Endpoint
131131
private String rangeEndKey = "";
132132
@UriParam(label = "consumer", defaultValue = "false")
133133
private boolean fullDocument = true;
134+
@UriParam(label = "consumer", defaultValue = "5000")
135+
private int consumerRetryPause = DEFAULT_PAUSE_BETWEEN_RETRIES;
134136

135137
// Consumer strategy
136138
@UriParam(label = "consumer", defaultValue = DEFAULT_CONSUME_PROCESSED_STRATEGY)
@@ -380,12 +382,23 @@ public int getProducerRetryPause() {
380382
}
381383

382384
/**
383-
* Define the retry pause between different attempts
385+
* Define the producer retry pause between different attempts
384386
*/
385387
public void setProducerRetryPause(int producerRetryPause) {
386388
this.producerRetryPause = producerRetryPause;
387389
}
388390

391+
public int getConsumerRetryPause() {
392+
return consumerRetryPause;
393+
}
394+
395+
/**
396+
* Define the consumer retry pause between different attempts
397+
*/
398+
public void setConsumerRetryPause(int consumerRetryPause) {
399+
this.consumerRetryPause = consumerRetryPause;
400+
}
401+
389402
public String getDesignDocumentName() {
390403
return designDocumentName;
391404
}

components/camel-couchbase/src/main/java/org/apache/camel/component/couchbase/CouchbaseProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void process(Exchange exchange) throws Exception {
143143
exchange.getMessage().setBody(result);
144144
} else if (endpoint.getOperation().equals(COUCHBASE_GET)) {
145145
LOG.trace("Type of operation: GET");
146-
GetResult result = CouchbaseCollectionOperation.getDocument(collection, id, queryTimeout);
146+
GetResult result = CouchbaseCollectionOperation.getDocument(collection, id, queryTimeout, producerRetryPause);
147147
exchange.getMessage().setBody(result);
148148
} else if (endpoint.getOperation().equals(COUCHBASE_DELETE)) {
149149
LOG.trace("Type of operation: DELETE");

dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/CouchbaseEndpointBuilderFactory.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,36 @@ default CouchbaseEndpointConsumerBuilder consumerProcessedStrategy(String consum
116116
doSetProperty("consumerProcessedStrategy", consumerProcessedStrategy);
117117
return this;
118118
}
119+
/**
120+
* Define the consumer retry pause between different attempts.
121+
*
122+
* The option is a: <code>int</code> type.
123+
*
124+
* Default: 5000
125+
* Group: consumer
126+
*
127+
* @param consumerRetryPause the value to set
128+
* @return the dsl builder
129+
*/
130+
default CouchbaseEndpointConsumerBuilder consumerRetryPause(int consumerRetryPause) {
131+
doSetProperty("consumerRetryPause", consumerRetryPause);
132+
return this;
133+
}
134+
/**
135+
* Define the consumer retry pause between different attempts.
136+
*
137+
* The option will be converted to a <code>int</code> type.
138+
*
139+
* Default: 5000
140+
* Group: consumer
141+
*
142+
* @param consumerRetryPause the value to set
143+
* @return the dsl builder
144+
*/
145+
default CouchbaseEndpointConsumerBuilder consumerRetryPause(String consumerRetryPause) {
146+
doSetProperty("consumerRetryPause", consumerRetryPause);
147+
return this;
148+
}
119149
/**
120150
* Define if this operation is descending or not.
121151
*
@@ -1228,7 +1258,7 @@ default CouchbaseEndpointProducerBuilder producerRetryAttempts(String producerRe
12281258
return this;
12291259
}
12301260
/**
1231-
* Define the retry pause between different attempts.
1261+
* Define the producer retry pause between different attempts.
12321262
*
12331263
* The option is a: <code>int</code> type.
12341264
*
@@ -1243,7 +1273,7 @@ default CouchbaseEndpointProducerBuilder producerRetryPause(int producerRetryPau
12431273
return this;
12441274
}
12451275
/**
1246-
* Define the retry pause between different attempts.
1276+
* Define the producer retry pause between different attempts.
12471277
*
12481278
* The option will be converted to a <code>int</code> type.
12491279
*

0 commit comments

Comments
 (0)