Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-29027 Introduce a new WALProvider to generate WAL files consumed by Replication #6532

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions hbase-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@
<groupId>net.revelc.code</groupId>
<artifactId>warbucks-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why introduce it, and still use source 8?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDE miss import. Reverted the change

<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,10 @@ public enum OperationStatusCode {
*/
public final static boolean REJECT_DECOMMISSIONED_HOSTS_DEFAULT = false;

public final static String REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED =
"hbase.regionserver.replicataion.wal.scope.filter";
public final static boolean REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED_DEFAULT = false;

private HConstants() {
// Can't be instantiated with this ctor.
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.ZNodeClearer;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.locking.EntityLock;
import org.apache.hadoop.hbase.client.locking.LockServiceClient;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
Expand Down Expand Up @@ -2165,13 +2167,37 @@ public List<WAL> getWALs() {

@Override
public WAL getWAL(RegionInfo regionInfo) throws IOException {
WAL wal = walFactory.getWAL(regionInfo);
WAL wal = regionBelongsToReplicatedTable(regionInfo) ?
walFactory.getWALInGlobalScope(regionInfo) : walFactory.getWAL(regionInfo);
if (this.walRoller != null) {
this.walRoller.addWAL(wal);
}
return wal;
}

private boolean regionBelongsToReplicatedTable(RegionInfo regionInfo) throws IOException {
if (regionInfo == null) {
return false;
}

TableDescriptor desc = tableDescriptors.get(regionInfo.getTable());
if (desc == null) {
return false;
}

ColumnFamilyDescriptor[] columns = desc.getColumnFamilies();
if (columns == null || columns.length == 0) {
return false;
}

for (ColumnFamilyDescriptor cf : desc.getColumnFamilies()) {
if (cf.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL) {
return true;
}
}
return false;
}

public LogRoller getWalRoller() {
return walRoller;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.apache.hadoop.hbase.HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED_DEFAULT;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
Expand All @@ -38,14 +41,24 @@ class ReplicationSourceWALActionListener implements WALActionsListener {

private final ReplicationSourceManager manager;

private final boolean filterWALByReplicationScope;

public ReplicationSourceWALActionListener(Configuration conf, ReplicationSourceManager manager) {
this.conf = conf;
this.manager = manager;
this.filterWALByReplicationScope =
conf.getBoolean(REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED, REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED_DEFAULT);
}

@Override
public void postLogRoll(Path oldPath, Path newPath) throws IOException {
manager.postLogRoll(newPath);
if (filterWALByReplicationScope) {
if (AbstractFSWALProvider.isFileInReplicationScope(newPath)) {
manager.postLogRoll(newPath);
}
} else {
manager.postLogRoll(newPath);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ static void requestLogRoll(final WAL wal) {
public static final String WAL_FILE_NAME_DELIMITER = ".";
/** The hbase:meta region's WAL filename extension */
public static final String META_WAL_PROVIDER_ID = ".meta";
public static final String REPLICATION_WAL_PROVIDER_ID = ".replication";
static final String DEFAULT_PROVIDER_ID = "default";

// Implementation details that currently leak in tests or elsewhere follow
Expand Down Expand Up @@ -487,6 +488,14 @@ public static boolean isMetaFile(String p) {
return p != null && p.endsWith(META_WAL_PROVIDER_ID);
}

public static boolean isFileInReplicationScope(Path p) {
return isFileInReplicationScope(p.getName());
}

public static boolean isFileInReplicationScope(String p) {
return p != null && p.endsWith(REPLICATION_WAL_PROVIDER_ID);
}

/**
* Comparator used to compare WAL files together based on their start time. Just compares start
* times and nothing else.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.wal;

import static org.apache.hadoop.hbase.HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED_DEFAULT;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.InterruptedIOException;
Expand Down Expand Up @@ -118,6 +120,7 @@ enum Providers {
// This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and
// generate a lot useless data, see HBASE-27775 for more details.
private final LazyInitializedWALProvider replicationProvider;
private final WALProvider providerInReplicationScope;

/**
* Configuration-specified WAL Reader used when a custom reader is requested
Expand Down Expand Up @@ -151,6 +154,7 @@ private WALFactory(Configuration conf) {

// this instance can't create wals, just reader/writers.
provider = null;
providerInReplicationScope = null;
factoryId = SINGLETON_ID;
this.abortable = null;
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
Expand Down Expand Up @@ -260,14 +264,27 @@ private WALFactory(Configuration conf, String factoryId, Abortable abortable) th
// end required early initialization
if (conf.getBoolean(WAL_ENABLED, true)) {
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
WALProvider providerInReplicationScope =
createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
provider.init(this, conf, null, this.abortable);
provider.addWALActionsListener(new MetricsWAL());
this.provider = provider;

if (conf.getBoolean(REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED,
REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED_DEFAULT)) {
providerInReplicationScope.init(this, conf,
AbstractFSWALProvider.REPLICATION_WAL_PROVIDER_ID, this.abortable);
providerInReplicationScope.addWALActionsListener(new MetricsWAL());
this.providerInReplicationScope = providerInReplicationScope;
} else {
this.providerInReplicationScope = null;
}
} else {
// special handling of existing configuration behavior.
LOG.warn("Running with WAL disabled.");
provider = new DisabledWALProvider();
provider.init(this, conf, factoryId, null);
providerInReplicationScope = provider;
}
}

Expand Down Expand Up @@ -353,7 +370,11 @@ public void shutdown() throws IOException {
}

public List<WAL> getWALs() {
return provider.getWALs();
List<WAL> wals = provider.getWALs();
if (providerInReplicationScope != null) {
wals.addAll(providerInReplicationScope.getWALs());
}
return wals;
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
Expand Down Expand Up @@ -383,6 +404,14 @@ public WAL getWAL(RegionInfo region) throws IOException {
return provider.getWAL(region);
}

public WAL getWALInGlobalScope(RegionInfo regionInfo) throws IOException {
if (providerInReplicationScope != null) {
return providerInReplicationScope.getWAL(regionInfo);
} else {
throw new IllegalStateException("WAL Provider for replication is not correct initialized.");
}
}

public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
return createStreamReader(fs, path, (CancelableProgressable) null);
}
Expand Down Expand Up @@ -578,6 +607,10 @@ public List<WALProvider> getAllWALProviders() {
if (replication != null) {
providers.add(replication);
}
WALProvider providerInScope = providerInReplicationScope;
if (providerInScope != null) {
providers.add(providerInScope);
}
return providers;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;

import static org.apache.hadoop.hbase.HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;

@Category(MediumTests.class)
public class TestReplicatorWithReplicationWAL extends TestReplicator {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicatorWithReplicationWAL.class);

@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Set RPC size limit to 10kb (will be applied to both source and sink clusters)
CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
CONF1.setBoolean(REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED, true);
TestReplicationBase.setUpBeforeClass();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;

import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestReplicationScopedWAL {
@ClassRule public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationScopedWAL.class);

static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();

private final byte[] family1 = Bytes.toBytes("f1");
private final byte[] family2 = Bytes.toBytes("f2");

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration()
.setBoolean(HConstants.REPLICATION_WAL_FILTER_BY_SCOPE_ENABLED, true);
TEST_UTIL.startMiniCluster();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testReplicationScopedWAL() throws Exception {
TableName tableName1 = TableName.valueOf("testReplicationScopedWAL1");
TableName tableName2 = TableName.valueOf("testReplicationScopedWAL2");

ColumnFamilyDescriptor column1 =
ColumnFamilyDescriptorBuilder.newBuilder(family1).setScope(REPLICATION_SCOPE_GLOBAL).build();

ColumnFamilyDescriptor column2 = ColumnFamilyDescriptorBuilder.of(family2);

TableDescriptor tableDescriptor1 =
TableDescriptorBuilder.newBuilder(tableName1).setColumnFamily(column1).build();
TableDescriptor tableDescriptor2 =
TableDescriptorBuilder.newBuilder(tableName2).setColumnFamily(column2).build();

Table table1 = TEST_UTIL.createTable(tableDescriptor1, null);
Table table2 = TEST_UTIL.createTable(tableDescriptor2, null);

Put put1 = new Put(Bytes.toBytes("row1"));
put1.addColumn(family1, Bytes.toBytes("qualifier1"), Bytes.toBytes("value1"));

Put put2 = new Put(Bytes.toBytes("row2"));
put2.addColumn(family2, Bytes.toBytes("qulifier2"), Bytes.toBytes("value2"));

table1.put(put1);
table2.put(put2);

TEST_UTIL.flush();
FileSystem fs = TEST_UTIL.getTestFileSystem();
Path walRootDir = CommonFSUtils.getWALRootDir(TEST_UTIL.getConfiguration());
FileStatus[] statusArray = fs.listStatus(walRootDir);
Assert.assertTrue(statusArray.length >= 2);

boolean hasReplicated = false;
boolean hasNormal = false;
for (FileStatus status : statusArray) {
Path path = status.getPath();
if (AbstractFSWALProvider.isFileInReplicationScope(path)) {
hasReplicated = true;
} else if (!AbstractFSWALProvider.isMetaFile(path)) {
hasNormal = true;
}
}
Assert.assertTrue(hasReplicated);
Assert.assertTrue(hasNormal);

table1.close();
table2.close();
}
}
Loading