Skip to content

Commit f90725b

Browse files
authored
PHOENIX-7709 Index committer post writer lazy mode - Async RPC call for verified index mutations (#2297)
1 parent 3b01bdf commit f90725b

File tree

8 files changed

+156
-20
lines changed

8 files changed

+156
-20
lines changed

phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/BaseTaskRunner.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ public <R> Pair<List<R>, List<Future<R>>> submitUninterruptible(TaskBatch<R> tas
116116
throw new EarlyExitFailure("Interrupted and stopped before computation was complete!");
117117
}
118118

119+
@Override
120+
public <R> void submitOnly(TaskBatch<R> tasks) {
121+
for (Task<R> task : tasks.getTasks()) {
122+
this.writerPool.submit(task);
123+
}
124+
}
125+
119126
@Override
120127
public void stop(String why) {
121128
if (this.stopped) {

phoenix-core-client/src/main/java/org/apache/phoenix/hbase/index/parallel/TaskRunner.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,4 +60,23 @@ public <R> Pair<List<R>, List<Future<R>>> submit(TaskBatch<R> tasks)
6060
*/
6161
public <R> Pair<List<R>, List<Future<R>>> submitUninterruptible(TaskBatch<R> tasks)
6262
throws EarlyExitFailure, ExecutionException;
63+
64+
/**
65+
* Submit the given tasks to the pool without waiting for them to complete or collecting results.
66+
* This is a fire-and-forget operation that allows tasks to run asynchronously in the background.
67+
* <p>
68+
* Unlike {@link #submit(TaskBatch)} and {@link #submitUninterruptible(TaskBatch)}, this method
69+
* does not block waiting for task completion and does not return results or futures. It is useful
70+
* for scenarios where you want to initiate background processing but don't need to wait for or
71+
* collect the results.
72+
* <p>
73+
* Tasks are submitted to the underlying thread pool and will execute according to the pool's
74+
* scheduling policy. If any task fails during execution, the failure will be handled internally
75+
* and will not propagate back to the caller since no results are collected.
76+
* @param <R> the type of result that would be returned by the tasks (unused since no results
77+
* are collected)
78+
* @param tasks the batch of tasks to submit for asynchronous execution
79+
* @throws ExecutionException if there is an error submitting the tasks to the thread pool
80+
*/
81+
<R> void submitOnly(TaskBatch<R> tasks) throws ExecutionException;
6382
}

phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/AbstractParallelWriterIndexCommitter.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ public abstract class AbstractParallelWriterIndexCommitter implements IndexCommi
6363
protected QuickFailingTaskRunner pool;
6464
protected KeyValueBuilder kvBuilder;
6565
protected RegionCoprocessorEnvironment env;
66-
protected TaskBatch<Void> tasks;
6766
protected boolean disableIndexOnFailure = false;
6867

6968
// This relies on Hadoop Configuration to handle warning about deprecated configs and
@@ -116,6 +115,39 @@ public void setup(HTableFactory factory, ExecutorService pool, Stoppable stop,
116115
public void write(Multimap<HTableInterfaceReference, Mutation> toWrite,
117116
final boolean allowLocalUpdates, final int clientVersion)
118117
throws SingleIndexWriteFailureException {
118+
TaskBatch<Void> tasks = new TaskBatch<>(toWrite.asMap().size());
119+
addTasks(toWrite, allowLocalUpdates, clientVersion, tasks);
120+
submitTasks(tasks);
121+
}
122+
123+
/**
124+
* Submits the provided task batch for execution. This method defines the task submission strategy
125+
* and must be implemented by concrete subclasses to specify whether tasks should be executed
126+
* synchronously (blocking until completion) or asynchronously (fire-and-forget).
127+
* @param tasks the batch of index write tasks to submit for execution
128+
* @throws SingleIndexWriteFailureException if there is an error during task submission or
129+
* execution (implementation-dependent)
130+
*/
131+
protected abstract void submitTasks(TaskBatch<Void> tasks)
132+
throws SingleIndexWriteFailureException;
133+
134+
/**
135+
* Adds parallel index write tasks to the provided task batch for execution across multiple index
136+
* tables. Each index table gets its own task that will be executed in parallel to optimize write
137+
* performance.
138+
* @param toWrite a multimap containing index table references as keys and their
139+
* corresponding mutations as values. Each table will get its own
140+
* parallel task.
141+
* @param allowLocalUpdates if false, skips creating tasks for writes to the same table as the
142+
* current region to prevent potential deadlocks
143+
* @param clientVersion the Phoenix client version, used for compatibility checks and
144+
* version-specific behavior in the index write operations
145+
* @param tasks the task batch to which the newly created index write tasks will be
146+
* added. This batch needs to be submitted for parallel execution by the
147+
* caller.
148+
*/
149+
private void addTasks(Multimap<HTableInterfaceReference, Mutation> toWrite,
150+
boolean allowLocalUpdates, int clientVersion, TaskBatch<Void> tasks) {
119151
/*
120152
* This bit here is a little odd, so let's explain what's going on. Basically, we want to do the
121153
* writes in parallel to each index table, so each table gets its own task and is submitted to
@@ -128,7 +160,6 @@ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite,
128160
*/
129161

130162
Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
131-
tasks = new TaskBatch<Void>(entries.size());
132163
for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
133164
// get the mutations for each table. We leak the implementation here a little bit to save
134165
// doing a complete copy over of all the index update for each table.

phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/LazyParallelWriterIndexCommitter.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,31 @@
1717
*/
1818
package org.apache.phoenix.hbase.index.write;
1919

20+
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
21+
import org.apache.phoenix.hbase.index.parallel.TaskBatch;
22+
import org.slf4j.Logger;
23+
import org.slf4j.LoggerFactory;
24+
2025
/**
2126
* Like the {@link ParallelWriterIndexCommitter}, but does not block
2227
*/
2328
public class LazyParallelWriterIndexCommitter extends AbstractParallelWriterIndexCommitter {
2429

25-
// for testing
26-
public LazyParallelWriterIndexCommitter(String hbaseVersion) {
27-
super(hbaseVersion);
28-
}
30+
private static final Logger LOGGER =
31+
LoggerFactory.getLogger(LazyParallelWriterIndexCommitter.class);
2932

3033
public LazyParallelWriterIndexCommitter() {
3134
super();
3235
}
36+
37+
@Override
38+
protected void submitTasks(TaskBatch<Void> tasks) throws SingleIndexWriteFailureException {
39+
try {
40+
pool.submitOnly(tasks);
41+
} catch (Exception e) {
42+
LOGGER.error("Error while submitting the task.", e);
43+
propagateFailure(e.getCause());
44+
}
45+
}
46+
3347
}

phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,12 @@
1818
package org.apache.phoenix.hbase.index.write;
1919

2020
import java.util.concurrent.ExecutionException;
21-
import org.apache.hadoop.hbase.client.Mutation;
2221
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
2322
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
24-
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
23+
import org.apache.phoenix.hbase.index.parallel.TaskBatch;
2524
import org.slf4j.Logger;
2625
import org.slf4j.LoggerFactory;
2726

28-
import org.apache.phoenix.thirdparty.com.google.common.collect.Multimap;
29-
3027
/**
3128
* Write index updates to the index tables in parallel. We attempt to early exit from the writes if
3229
* any of the index updates fails. Completion is determined by the following criteria: *
@@ -49,12 +46,7 @@ public ParallelWriterIndexCommitter(String hbaseVersion) {
4946
}
5047

5148
@Override
52-
public void write(Multimap<HTableInterfaceReference, Mutation> toWrite,
53-
final boolean allowLocalUpdates, final int clientVersion)
54-
throws SingleIndexWriteFailureException {
55-
56-
super.write(toWrite, allowLocalUpdates, clientVersion);
57-
// actually submit the tasks to the pool and wait for them to finish/fail
49+
protected void submitTasks(TaskBatch<Void> tasks) throws SingleIndexWriteFailureException {
5850
try {
5951
pool.submitUninterruptible(tasks);
6052
} catch (EarlyExitFailure e) {
@@ -63,6 +55,5 @@ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite,
6355
LOGGER.error("Found a failed index update!");
6456
propagateFailure(e.getCause());
6557
}
66-
6758
}
6859
}

phoenix-core/src/it/java/org/apache/phoenix/end2end/Bson4IT.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
import org.apache.phoenix.compile.ExplainPlan;
4545
import org.apache.phoenix.compile.ExplainPlanAttributes;
4646
import org.apache.phoenix.exception.SQLExceptionCode;
47+
import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSource;
48+
import org.apache.phoenix.hbase.index.metrics.GlobalIndexCheckerSourceImpl;
49+
import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
4750
import org.apache.phoenix.jdbc.PhoenixPreparedStatement;
4851
import org.apache.phoenix.jdbc.PhoenixStatement;
4952
import org.apache.phoenix.schema.types.PDouble;
@@ -68,7 +71,7 @@
6871
/**
6972
* Tests for BSON.
7073
*/
71-
@Category(ParallelStatsDisabledTest.class)
74+
@Category(NeedsOwnMiniClusterTest.class)
7275
@RunWith(Parameterized.class)
7376
public class Bson4IT extends ParallelStatsDisabledIT {
7477

@@ -291,7 +294,8 @@ public void testBsonValueWithBinaryEncoded() throws Exception {
291294
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
292295

293296
String ddl = "CREATE TABLE " + tableName + " (PK1 VARCHAR NOT NULL, C1 VARCHAR, COL BSON"
294-
+ " CONSTRAINT pk PRIMARY KEY(PK1))";
297+
+ " CONSTRAINT pk PRIMARY KEY(PK1))"
298+
+ " \"org.apache.hadoop.hbase.index.lazy.post_batch.write\"=true";
295299

296300
final String indexDdl1;
297301
if (!this.coveredIndex) {
@@ -367,6 +371,7 @@ public void testBsonValueWithBinaryEncoded() throws Exception {
367371
stmt.executeUpdate();
368372

369373
conn.commit();
374+
Thread.sleep(500);
370375

371376
ResultSet rs = conn.createStatement().executeQuery("SELECT count(*) FROM " + tableName);
372377
assertTrue(rs.next());
@@ -450,9 +455,25 @@ public void testBsonValueWithBinaryEncoded() throws Exception {
450455
assertFalse(rs.next());
451456

452457
validateExplainPlan(ps, tableName, "FULL SCAN ");
458+
verifyNoReadRepair();
453459
}
454460
}
455461

462+
private void verifyNoReadRepair() {
463+
GlobalIndexCheckerSourceImpl indexCheckerSource =
464+
(GlobalIndexCheckerSourceImpl) MetricsIndexerSourceFactory.getInstance()
465+
.getGlobalIndexCheckerSource();
466+
467+
long indexRepairs = indexCheckerSource.getMetricsRegistry()
468+
.getCounter(GlobalIndexCheckerSource.INDEX_REPAIR, 0).value();
469+
assertEquals("No index repairs should occur during test execution", 0, indexRepairs);
470+
471+
long indexRepairFailures = indexCheckerSource.getMetricsRegistry()
472+
.getCounter(GlobalIndexCheckerSource.INDEX_REPAIR_FAILURE, 0).value();
473+
assertEquals("No index repair failures should occur during test execution", 0,
474+
indexRepairFailures);
475+
}
476+
456477
@Test
457478
public void testBsonValueFunctionWithBSONType() throws Exception {
458479
Assume.assumeTrue(this.coveredIndex && this.columnEncoded); // Since indexing on BSON not

phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
6868
private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_";
6969
private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_";
7070
private static final int ROW_LOCK_WAIT_TIME = 10000;
71-
private static final int MAX_LOOKBACK_AGE = 1000000;
71+
protected static final int MAX_LOOKBACK_AGE = 1000000;
7272
private final Object lock = new Object();
7373

7474
public ConcurrentMutationsExtendedIT(boolean uncovered) {
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.phoenix.end2end;
19+
20+
import java.util.Map;
21+
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
22+
import org.apache.phoenix.hbase.index.IndexRegionObserver;
23+
import org.apache.phoenix.query.QueryServices;
24+
import org.apache.phoenix.util.ReadOnlyProps;
25+
import org.junit.Assume;
26+
import org.junit.BeforeClass;
27+
import org.junit.experimental.categories.Category;
28+
29+
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
30+
31+
/**
32+
* Test class that extends ConcurrentMutationsExtendedIT with lazy post batch write enabled.
33+
*/
34+
@Category(NeedsOwnMiniClusterTest.class)
35+
public class ConcurrentMutationsLazyPostBatchWriteIT extends ConcurrentMutationsExtendedIT {
36+
37+
public ConcurrentMutationsLazyPostBatchWriteIT(boolean uncovered) {
38+
super(uncovered);
39+
Assume.assumeFalse("Only covered index supports lazy post batch write mode", uncovered);
40+
}
41+
42+
@BeforeClass
43+
public static synchronized void doSetup() throws Exception {
44+
Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
45+
props.put(QueryServices.GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB, Long.toString(0));
46+
props.put(IndexRegionObserver.INDEX_LAZY_POST_BATCH_WRITE, "true");
47+
props.put(BaseScannerRegionObserverConstants.PHOENIX_MAX_LOOKBACK_AGE_CONF_KEY,
48+
Integer.toString(MAX_LOOKBACK_AGE));
49+
props.put("hbase.rowlock.wait.duration", "100");
50+
props.put("phoenix.index.concurrent.wait.duration.ms", "10");
51+
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
52+
}
53+
}

0 commit comments

Comments
 (0)