Skip to content

Commit

Permalink
HDFS-16690. Automatically format unformatted JNs with JournalNodeSync…
Browse files Browse the repository at this point in the history
…er (#6925). Contributed by Aswin M Prabhu.

Signed-off-by: He Xiaoqiao <[email protected]>
  • Loading branch information
aswinmprabhu authored Jul 23, 2024
1 parent e000cbf commit e2a0dca
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
"dfs.journalnode.sync.interval";
public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
public static final String DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY =
"dfs.journalnode.enable.sync.format";
public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT = false;
public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY =
"dfs.journalnode.edit-cache-size.bytes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.security.KerberosInfo;
Expand Down Expand Up @@ -51,4 +52,13 @@ GetEditLogManifestResponseProto getEditLogManifestFromJournal(
String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
throws IOException;

/**
* Get the storage info for the specified journal.
* @param jid the journal identifier
* @param nameServiceId the name service id
* @return the storage info object
*/
StorageInfoProto getStorageInfo(String jid, String nameServiceId)
throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetStorageInfoRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;

Expand Down Expand Up @@ -60,4 +62,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
throw new ServiceException(e);
}
}

@Override
public StorageInfoProto getStorageInfo(
RpcController controller, GetStorageInfoRequestProto request)
throws ServiceException {
try {
return impl.getStorageInfo(
request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null
);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.hdfs.qjournal.protocolPB;

import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -75,6 +77,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
req.build()));
}

@Override
public StorageInfoProto getStorageInfo(String jid, String nameServiceId)
throws IOException {
InterQJournalProtocolProtos.GetStorageInfoRequestProto.Builder req =
InterQJournalProtocolProtos.GetStorageInfoRequestProto.newBuilder()
.setJid(convertJournalId(jid));
if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
return ipc(() -> rpcProxy.getStorageInfo(NULL_CONTROLLER, req.build()));
}

private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {
return QJournalProtocolProtos.JournalIdProto.newBuilder()
.setIdentifier(jid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.slf4j.Logger;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -71,14 +72,14 @@ public class JournalNodeRpcServer implements QJournalProtocol,

JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
this.jn = jn;

Configuration confCopy = new Configuration(conf);

// Ensure that nagling doesn't kick in, which could cause latency issues.
confCopy.setBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
true);

InetSocketAddress addr = getAddress(confCopy);
String bindHost = conf.getTrimmed(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, null);
if (bindHost == null) {
Expand All @@ -104,7 +105,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
this.handlerCount = confHandlerCount;
LOG.info("The number of JournalNodeRpcServer handlers is {}.",
this.handlerCount);

this.server = new RPC.Builder(confCopy)
.setProtocol(QJournalProtocolPB.class)
.setInstance(service)
Expand Down Expand Up @@ -149,15 +150,15 @@ void start() {
public InetSocketAddress getAddress() {
return server.getListenerAddress();
}

void join() throws InterruptedException {
this.server.join();
}

void stop() {
this.server.stop();
}

static InetSocketAddress getAddress(Configuration conf) {
String addr = conf.get(
DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
Expand Down Expand Up @@ -211,7 +212,7 @@ public void journal(RequestInfo reqInfo,
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
}

@Override
public void heartbeat(RequestInfo reqInfo) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
Expand Down Expand Up @@ -245,17 +246,24 @@ public GetEditLogManifestResponseProto getEditLogManifest(
String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk)
throws IOException {

RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
.getEditLogManifest(sinceTxId, inProgressOk);

return GetEditLogManifestResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest))
.setHttpPort(jn.getBoundHttpAddress().getPort())
.setFromURL(jn.getHttpServerURI())
.build();
}

@Override
public StorageInfoProto getStorageInfo(String jid,
String nameServiceId) throws IOException {
StorageInfo storage = jn.getOrCreateJournal(jid, nameServiceId).getStorage();
return PBHelper.convert(storage);
}

@Override
public GetJournaledEditsResponseProto getJournaledEdits(String jid,
String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -79,6 +82,7 @@ public class JournalNodeSyncer {
private int numOtherJNs;
private int journalNodeIndexForSync = 0;
private final long journalSyncInterval;
private final boolean tryFormatting;
private final int logSegmentTransferTimeout;
private final DataTransferThrottler throttler;
private final JournalMetrics metrics;
Expand All @@ -98,6 +102,9 @@ public class JournalNodeSyncer {
logSegmentTransferTimeout = conf.getInt(
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
tryFormatting = conf.getBoolean(
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY,
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT);
throttler = getThrottler(conf);
metrics = journal.getMetrics();
journalSyncerStarted = false;
Expand Down Expand Up @@ -171,6 +178,8 @@ private void startSyncJournalsDaemon() {
// Wait for journal to be formatted to create edits.sync directory
while(!journal.isFormatted()) {
try {
// Format the journal with namespace info from the other JNs if it is not formatted
formatWithSyncer();
Thread.sleep(journalSyncInterval);
} catch (InterruptedException e) {
LOG.error("JournalNodeSyncer daemon received Runtime exception.", e);
Expand All @@ -187,7 +196,15 @@ private void startSyncJournalsDaemon() {
while(shouldSync) {
try {
if (!journal.isFormatted()) {
LOG.warn("Journal cannot sync. Not formatted.");
LOG.warn("Journal cannot sync. Not formatted. Trying to format with the syncer");
formatWithSyncer();
if (journal.isFormatted() && !createEditsSyncDir()) {
LOG.error("Failed to create directory for downloading log " +
"segments: {}. Stopping Journal Node Sync.",
journal.getStorage().getEditsSyncDir());
return;
}
continue;
} else {
syncJournals();
}
Expand Down Expand Up @@ -233,6 +250,68 @@ private void syncJournals() {
journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs;
}

private void formatWithSyncer() {
if (!tryFormatting) {
return;
}
LOG.info("Trying to format the journal with the syncer");
try {
StorageInfo storage = null;
for (JournalNodeProxy jnProxy : otherJNProxies) {
if (!hasEditLogs(jnProxy)) {
// This avoids a race condition between `hdfs namenode -format` and
// JN syncer by checking if the other JN is not newly formatted.
continue;
}
try {
HdfsServerProtos.StorageInfoProto storageInfoResponse =
jnProxy.jnProxy.getStorageInfo(jid, nameServiceId);
storage = PBHelper.convert(
storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE
);
if (storage.getNamespaceID() == 0) {
LOG.error("Got invalid StorageInfo from " + jnProxy);
storage = null;
continue;
}
LOG.info("Got StorageInfo " + storage + " from " + jnProxy);
break;
} catch (IOException e) {
LOG.error("Could not get StorageInfo from " + jnProxy, e);
}
}
if (storage == null) {
LOG.error("Could not get StorageInfo from any JournalNode. " +
"JournalNodeSyncer cannot format the journal.");
return;
}
NamespaceInfo nsInfo = new NamespaceInfo(storage);
journal.format(nsInfo, true);
} catch (IOException e) {
LOG.error("Exception in formatting the journal with the syncer", e);
}
}

private boolean hasEditLogs(JournalNodeProxy journalProxy) {
GetEditLogManifestResponseProto editLogManifest;
try {
editLogManifest = journalProxy.jnProxy.getEditLogManifestFromJournal(
jid, nameServiceId, 0, false);
} catch (IOException e) {
LOG.error("Could not get edit log manifest from " + journalProxy, e);
return false;
}

List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(
editLogManifest.getManifest()).getLogs();
if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) {
LOG.warn("Journal at " + journalProxy.jnAddr + " has no edit logs");
return false;
}

return true;
}

private void syncWithJournalAtIndex(int index) {
LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
+ jn.getBoundIpcAddress().getPort() + " with "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ package hadoop.hdfs.qjournal;
import "HdfsServer.proto";
import "QJournalProtocol.proto";

message GetStorageInfoRequestProto {
required JournalIdProto jid = 1;
optional string nameServiceId = 2;
}

service InterQJournalProtocolService {
rpc getEditLogManifestFromJournal(GetEditLogManifestRequestProto)
returns (GetEditLogManifestResponseProto);

rpc getStorageInfo(GetStorageInfoRequestProto)
returns (StorageInfoProto);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5071,6 +5071,16 @@
</description>
</property>

<property>
<name>dfs.journalnode.enable.sync.format</name>
<value>false</value>
<description>
If true, the journal node syncer daemon that tries to sync edit
logs between journal nodes will try to format its journal if it is not.
It will query the other journal nodes for the storage info required to format.
</description>
</property>

<property>
<name>dfs.journalnode.edit-cache-size.bytes</name>
<value></value>
Expand Down
Loading

0 comments on commit e2a0dca

Please sign in to comment.