diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index c882e71e877a..f7768ac20654 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1641,6 +1641,10 @@ public enum OperationStatusCode { */ public final static boolean REJECT_DECOMMISSIONED_HOSTS_DEFAULT = false; + public final static String REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED = + "hbase.regionserver.replicataion.wal.scope.filter"; + public final static boolean REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED_DEFAULT = false; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a4105a31bfac..f142e7ca2cea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -96,9 +96,11 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.YouAreDeadException; import org.apache.hadoop.hbase.ZNodeClearer; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.locking.EntityLock; import org.apache.hadoop.hbase.client.locking.LockServiceClient; import org.apache.hadoop.hbase.conf.ConfigurationObserver; @@ -2165,13 +2167,38 @@ public List getWALs() { @Override public WAL getWAL(RegionInfo regionInfo) throws IOException { - WAL wal = walFactory.getWAL(regionInfo); + WAL wal = regionBelongsToReplicatedTable(regionInfo) + ? walFactory.getWALInGlobalScope(regionInfo) + : walFactory.getWAL(regionInfo); if (this.walRoller != null) { this.walRoller.addWAL(wal); } return wal; } + private boolean regionBelongsToReplicatedTable(RegionInfo regionInfo) throws IOException { + if (regionInfo == null || tableDescriptors == null) { + return false; + } + + TableDescriptor desc = tableDescriptors.get(regionInfo.getTable()); + if (desc == null) { + return false; + } + + ColumnFamilyDescriptor[] columns = desc.getColumnFamilies(); + if (columns == null || columns.length == 0) { + return false; + } + + for (ColumnFamilyDescriptor cf : desc.getColumnFamilies()) { + if (cf.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL) { + return true; + } + } + return false; + } + public LogRoller getWalRoller() { return walRoller; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java index 9380c6b63050..547ea01860ce 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hbase.replication.regionserver; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED_DEFAULT; + import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; @@ -38,14 +42,24 @@ class ReplicationSourceWALActionListener implements WALActionsListener { private final ReplicationSourceManager manager; + private final boolean filterWALByReplicationScope; + public ReplicationSourceWALActionListener(Configuration conf, ReplicationSourceManager manager) { this.conf = conf; this.manager = manager; + this.filterWALByReplicationScope = conf.getBoolean(REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED, + REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED_DEFAULT); } @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException { - manager.postLogRoll(newPath); + if (filterWALByReplicationScope) { + if (AbstractFSWALProvider.isFileInReplicationScope(newPath)) { + manager.postLogRoll(newPath); + } + } else { + manager.postLogRoll(newPath); + } } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index 13d2886182e4..5adca77ff7cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -231,6 +231,7 @@ static void requestLogRoll(final WAL wal) { public static final String WAL_FILE_NAME_DELIMITER = "."; /** The hbase:meta region's WAL filename extension */ public static final String META_WAL_PROVIDER_ID = ".meta"; + public static final String REPLICATION_WAL_PROVIDER_ID = ".replication"; static final String DEFAULT_PROVIDER_ID = "default"; // Implementation details that currently leak in tests or elsewhere follow @@ -487,6 +488,14 @@ public static boolean isMetaFile(String p) { return p != null && p.endsWith(META_WAL_PROVIDER_ID); } + public static boolean isFileInReplicationScope(Path p) { + return isFileInReplicationScope(p.getName()); + } + + public static boolean isFileInReplicationScope(String p) { + return p != null && p.endsWith(REPLICATION_WAL_PROVIDER_ID); + } + /** * Comparator used to compare WAL files together based on their start time. Just compares start * times and nothing else. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 6b638cdda7ff..2d4ef7f2b641 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.wal; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED; +import static org.apache.hadoop.hbase.HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED_DEFAULT; + import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.io.InterruptedIOException; @@ -118,6 +121,7 @@ enum Providers { // This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and // generate a lot useless data, see HBASE-27775 for more details. private final LazyInitializedWALProvider replicationProvider; + private final WALProvider providerInReplicationScope; /** * Configuration-specified WAL Reader used when a custom reader is requested @@ -151,6 +155,7 @@ private WALFactory(Configuration conf) { // this instance can't create wals, just reader/writers. provider = null; + providerInReplicationScope = null; factoryId = SINGLETON_ID; this.abortable = null; this.excludeDatanodeManager = new ExcludeDatanodeManager(conf); @@ -260,14 +265,29 @@ private WALFactory(Configuration conf, String factoryId, Abortable abortable) th // end required early initialization if (conf.getBoolean(WAL_ENABLED, true)) { WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); + WALProvider providerInReplicationScope = + createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); provider.init(this, conf, null, this.abortable); provider.addWALActionsListener(new MetricsWAL()); this.provider = provider; + + if ( + conf.getBoolean(REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED, + REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED_DEFAULT) + ) { + providerInReplicationScope.init(this, conf, + AbstractFSWALProvider.REPLICATION_WAL_PROVIDER_ID, this.abortable); + providerInReplicationScope.addWALActionsListener(new MetricsWAL()); + this.providerInReplicationScope = providerInReplicationScope; + } else { + this.providerInReplicationScope = null; + } } else { // special handling of existing configuration behavior. LOG.warn("Running with WAL disabled."); provider = new DisabledWALProvider(); provider.init(this, conf, factoryId, null); + providerInReplicationScope = provider; } } @@ -353,7 +373,11 @@ public void shutdown() throws IOException { } public List getWALs() { - return provider.getWALs(); + List wals = provider.getWALs(); + if (providerInReplicationScope != null) { + wals.addAll(providerInReplicationScope.getWALs()); + } + return wals; } @RestrictedApi(explanation = "Should only be called in tests", link = "", @@ -383,6 +407,14 @@ public WAL getWAL(RegionInfo region) throws IOException { return provider.getWAL(region); } + public WAL getWALInGlobalScope(RegionInfo regionInfo) throws IOException { + if (providerInReplicationScope != null) { + return providerInReplicationScope.getWAL(regionInfo); + } + // we have this due to UT. + return provider.getWAL(regionInfo); + } + public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException { return createStreamReader(fs, path, (CancelableProgressable) null); } @@ -578,6 +610,10 @@ public List getAllWALProviders() { if (replication != null) { providers.add(replication); } + WALProvider providerInScope = providerInReplicationScope; + if (providerInScope != null) { + providers.add(providerInScope); + } return providers; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicatorWithReplicationWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicatorWithReplicationWAL.java new file mode 100644 index 000000000000..49166f3445c7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicatorWithReplicationWAL.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.replication.TestReplicationBase; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.experimental.categories.Category; + +@Category(MediumTests.class) +public class TestReplicatorWithReplicationWAL extends TestReplicator { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicatorWithReplicationWAL.class); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Set RPC size limit to 10kb (will be applied to both source and sink clusters) + CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10); + CONF1.setBoolean(REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED, true); + TestReplicationBase.setUpBeforeClass(); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReplicationScopedWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReplicationScopedWAL.java new file mode 100644 index 000000000000..268eb7433f5f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReplicationScopedWAL.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestReplicationScopedWAL { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestReplicationScopedWAL.class); + + static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private final byte[] family1 = Bytes.toBytes("f1"); + private final byte[] family2 = Bytes.toBytes("f2"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED, + true); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testReplicationScopedWAL() throws Exception { + TableName tableName1 = TableName.valueOf("testReplicationScopedWAL1"); + TableName tableName2 = TableName.valueOf("testReplicationScopedWAL2"); + + ColumnFamilyDescriptor column1 = + ColumnFamilyDescriptorBuilder.newBuilder(family1).setScope(REPLICATION_SCOPE_GLOBAL).build(); + + ColumnFamilyDescriptor column2 = ColumnFamilyDescriptorBuilder.of(family2); + + TableDescriptor tableDescriptor1 = + TableDescriptorBuilder.newBuilder(tableName1).setColumnFamily(column1).build(); + TableDescriptor tableDescriptor2 = + TableDescriptorBuilder.newBuilder(tableName2).setColumnFamily(column2).build(); + + Table table1 = TEST_UTIL.createTable(tableDescriptor1, null); + Table table2 = TEST_UTIL.createTable(tableDescriptor2, null); + + Put put1 = new Put(Bytes.toBytes("row1")); + put1.addColumn(family1, Bytes.toBytes("qualifier1"), Bytes.toBytes("value1")); + + Put put2 = new Put(Bytes.toBytes("row2")); + put2.addColumn(family2, Bytes.toBytes("qulifier2"), Bytes.toBytes("value2")); + + table1.put(put1); + table2.put(put2); + + TEST_UTIL.flush(); + FileSystem fs = TEST_UTIL.getTestFileSystem(); + Path walRootDir = CommonFSUtils.getWALRootDir(TEST_UTIL.getConfiguration()); + FileStatus[] statusArray = fs.listStatus(walRootDir); + Assert.assertTrue(statusArray.length >= 2); + + boolean hasReplicated = false; + boolean hasNormal = false; + for (FileStatus status : statusArray) { + Path path = status.getPath(); + FileStatus[] subFiles = fs.listStatus(path); + for (FileStatus subStatus : subFiles) { + if (AbstractFSWALProvider.isFileInReplicationScope(subStatus.getPath())) { + hasReplicated = true; + } else if (!AbstractFSWALProvider.isMetaFile(subStatus.getPath())) { + hasNormal = true; + } + } + } + Assert.assertTrue(hasReplicated); + Assert.assertTrue(hasNormal); + + table1.close(); + table2.close(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 2c994f091f32..645bdd9068d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -710,6 +710,18 @@ public void testCustomReplicationProvider() throws IOException { assertEquals(IOTestProvider.class, replicationWALProvider.getClass()); } + @Test + public void testProviderInReplicationScope() throws IOException { + final Configuration config = new Configuration(); + config.setBoolean(HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED, true); + final WALFactory walFactory = new WALFactory(config, this.currentServername.toString()); + List providers = walFactory.getAllWALProviders(); + // As the meta & replication providers are lazy initialized, we should have two here. + // And these two provider have the same type. + assertEquals(2, providers.size()); + assertEquals(providers.get(0).getClass(), providers.get(1).getClass()); + } + /** * Confirm that we will use different WALs for hbase:meta and hbase:replication */