data-platform team mailing list archive
-
data-platform team
-
Mailing list archive
-
Message #00325
[Merge] soss/+source/hadoop:release-3.3.6-ubuntu4-protobuf2 into soss/+source/hadoop:lp-3.3.6
Enrico Deusebio has proposed merging soss/+source/hadoop:release-3.3.6-ubuntu4-protobuf2 into soss/+source/hadoop:lp-3.3.6.
Requested reviews:
Canonical Data Platform (data-platform)
For more details, see:
https://code.launchpad.net/~data-platform/soss/+source/hadoop/+git/hadoop/+merge/480055
--
Your team Canonical Data Platform is requested to review the proposed merge of soss/+source/hadoop:release-3.3.6-ubuntu4-protobuf2 into soss/+source/hadoop:lp-3.3.6.
diff --git a/BUILDING.txt b/BUILDING.txt
index 42ec263..6394532 100644
--- a/BUILDING.txt
+++ b/BUILDING.txt
@@ -293,6 +293,30 @@ Maven build goals:
package. This option requires that -Dpmdk.lib is specified. With -Dbundle.pmdk provided,
the build will fail if -Dpmdk.lib is not specified.
+Controlling the redistribution of the protobuf-2.5 dependency
+
+ The protobuf 2.5.0 library is used at compile time to compile the class
+ org.apache.hadoop.ipc.ProtobufHelper; this class known to have been used by
+ external projects in the past. Protobuf 2.5 is not used elsewhere in
+ the Hadoop codebase; alongside the move to Protobuf 3.x a private successor
+ class, org.apache.hadoop.ipc.internal.ShadedProtobufHelper is now used.
+
+ The hadoop-common JAR still declares a dependency on protobuf-2.5, but this
+ is likely to change in the future. The maven scope of the dependency can be
+ set with the common.protobuf2.scope option.
+ It can be set to "provided" in a build:
+ -Dcommon.protobuf2.scope=provided
+ If this is done then protobuf-2.5.0.jar will no longer be exported as a dependency,
+ and will then be omitted from the share/hadoop/common/lib/ directory of
+ any Hadoop distribution built. Any application declaring a dependency on hadoop-commmon
+ will no longer get the dependency; if they need it then they must explicitly declare it:
+
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>2.5.0</version>
+ </dependency>
+
----------------------------------------------------------------------------------
Building components separately
diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index b885891..fdc90ed 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -451,8 +451,7 @@
</Match>
<Match>
- <Class name="org.apache.hadoop.ipc.ProtobufHelper" />
- <Method name="getFixedByteString" />
+ <Class name="org.apache.hadoop.ipc.internal.ShadedProtobufHelper" />
<Bug pattern="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION" />
</Match>
</FindBugsFilter>
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 1cd9f0c..4470a55 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -262,10 +262,11 @@
<artifactId>re2j</artifactId>
<scope>compile</scope>
</dependency>
+ <!-- Needed for compilation, though no longer in production. -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
- <scope>compile</scope>
+ <scope>${common.protobuf2.scope}</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
@@ -496,11 +497,11 @@
<!--These classes have direct Protobuf references for backward compatibility reasons-->
<excludes>
<exclude>**/ProtobufHelper.java</exclude>
- <exclude>**/RpcWritable.java</exclude>
<exclude>**/ProtobufRpcEngineCallback.java</exclude>
<exclude>**/ProtobufRpcEngine.java</exclude>
<exclude>**/ProtobufRpcEngine2.java</exclude>
<exclude>**/ProtobufRpcEngineProtos.java</exclude>
+ <exclude>**/ProtobufWrapperLegacy.java</exclude>
</excludes>
</configuration>
</execution>
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
index 2cbfd0d..f035c1c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java
@@ -37,14 +37,13 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestPr
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToObserverRequestProto;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/**
* This class is the client side translator to translate the requests made on
@@ -84,60 +83,39 @@ public class HAServiceProtocolClientSideTranslatorPB implements
@Override
public void monitorHealth() throws IOException {
- try {
- rpcProxy.monitorHealth(NULL_CONTROLLER, MONITOR_HEALTH_REQ);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.monitorHealth(NULL_CONTROLLER, MONITOR_HEALTH_REQ));
}
@Override
public void transitionToActive(StateChangeRequestInfo reqInfo) throws IOException {
- try {
- TransitionToActiveRequestProto req =
- TransitionToActiveRequestProto.newBuilder()
+ TransitionToActiveRequestProto req =
+ TransitionToActiveRequestProto.newBuilder()
.setReqInfo(convert(reqInfo)).build();
-
- rpcProxy.transitionToActive(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.transitionToActive(NULL_CONTROLLER, req));
}
@Override
public void transitionToStandby(StateChangeRequestInfo reqInfo) throws IOException {
- try {
- TransitionToStandbyRequestProto req =
+ TransitionToStandbyRequestProto req =
TransitionToStandbyRequestProto.newBuilder()
- .setReqInfo(convert(reqInfo)).build();
- rpcProxy.transitionToStandby(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ .setReqInfo(convert(reqInfo)).build();
+ ipc(() -> rpcProxy.transitionToStandby(NULL_CONTROLLER, req));
}
@Override
public void transitionToObserver(StateChangeRequestInfo reqInfo)
throws IOException {
- try {
- TransitionToObserverRequestProto req =
- TransitionToObserverRequestProto.newBuilder()
- .setReqInfo(convert(reqInfo)).build();
- rpcProxy.transitionToObserver(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ TransitionToObserverRequestProto req =
+ TransitionToObserverRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo)).build();
+ ipc(() -> rpcProxy.transitionToObserver(NULL_CONTROLLER, req));
}
@Override
public HAServiceStatus getServiceStatus() throws IOException {
GetServiceStatusResponseProto status;
- try {
- status = rpcProxy.getServiceStatus(NULL_CONTROLLER,
- GET_SERVICE_STATUS_REQ);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ status = ipc(() -> rpcProxy.getServiceStatus(NULL_CONTROLLER,
+ GET_SERVICE_STATUS_REQ));
HAServiceStatus ret = new HAServiceStatus(
convert(status.getState()));
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java
index 3777207..307629f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java
@@ -27,15 +27,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ZKFCProtocol;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto;
import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class ZKFCProtocolClientSideTranslatorPB implements
@@ -57,24 +56,16 @@ public class ZKFCProtocolClientSideTranslatorPB implements
@Override
public void cedeActive(int millisToCede) throws IOException,
AccessControlException {
- try {
- CedeActiveRequestProto req = CedeActiveRequestProto.newBuilder()
- .setMillisToCede(millisToCede)
- .build();
- rpcProxy.cedeActive(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ CedeActiveRequestProto req = CedeActiveRequestProto.newBuilder()
+ .setMillisToCede(millisToCede)
+ .build();
+ ipc(() -> rpcProxy.cedeActive(NULL_CONTROLLER, req));
}
@Override
public void gracefulFailover() throws IOException, AccessControlException {
- try {
- rpcProxy.gracefulFailover(NULL_CONTROLLER,
- GracefulFailoverRequestProto.getDefaultInstance());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.gracefulFailover(NULL_CONTROLLER,
+ GracefulFailoverRequestProto.getDefaultInstance()));
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufHelper.java
index 9ed0640..da4dfe6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufHelper.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufHelper.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.ipc;
import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -30,31 +30,37 @@ import org.apache.hadoop.thirdparty.protobuf.ByteString;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
/**
- * Helper methods for protobuf related RPC implementation
+ * Helper methods for protobuf related RPC implementation.
+ * This is deprecated because it references protobuf 2.5 classes
+ * as well as the shaded ones -and so needs an unshaded protobuf-2.5
+ * JAR on the classpath during execution.
+ * It MUST NOT be used internally; it is retained in case existing,
+ * external applications already use it.
+ * @deprecated hadoop code MUST use {@link ShadedProtobufHelper}.
*/
@InterfaceAudience.Private
+@Deprecated
public class ProtobufHelper {
+
private ProtobufHelper() {
// Hidden constructor for class with only static helper methods
}
/**
- * Return the IOException thrown by the remote server wrapped in
+ * Return the IOException thrown by the remote server wrapped in
* ServiceException as cause.
* @param se ServiceException that wraps IO exception thrown by the server
* @return Exception wrapped in ServiceException or
* a new IOException that wraps the unexpected ServiceException.
*/
public static IOException getRemoteException(ServiceException se) {
- Throwable e = se.getCause();
- if (e == null) {
- return new IOException(se);
- }
- return e instanceof IOException ? (IOException) e : new IOException(se);
+ return ShadedProtobufHelper.getRemoteException(se);
}
/**
- * Kept for backward compatible.
+ * Extract the remote exception from an unshaded version of the protobuf
+ * libraries.
+ * Kept for backward compatibility.
* Return the IOException thrown by the remote server wrapped in
* ServiceException as cause.
* @param se ServiceException that wraps IO exception thrown by the server
@@ -72,28 +78,12 @@ public class ProtobufHelper {
}
/**
- * Map used to cache fixed strings to ByteStrings. Since there is no
- * automatic expiration policy, only use this for strings from a fixed, small
- * set.
- * <p/>
- * This map should not be accessed directly. Used the getFixedByteString
- * methods instead.
- */
- private final static ConcurrentHashMap<Object, ByteString>
- FIXED_BYTESTRING_CACHE = new ConcurrentHashMap<>();
-
- /**
* Get the ByteString for frequently used fixed and small set strings.
* @param key string
* @return the ByteString for frequently used fixed and small set strings.
*/
public static ByteString getFixedByteString(Text key) {
- ByteString value = FIXED_BYTESTRING_CACHE.get(key);
- if (value == null) {
- value = ByteString.copyFromUtf8(key.toString());
- FIXED_BYTESTRING_CACHE.put(new Text(key.copyBytes()), value);
- }
- return value;
+ return ShadedProtobufHelper.getFixedByteString(key);
}
/**
@@ -102,34 +92,40 @@ public class ProtobufHelper {
* @return ByteString for frequently used fixed and small set strings.
*/
public static ByteString getFixedByteString(String key) {
- ByteString value = FIXED_BYTESTRING_CACHE.get(key);
- if (value == null) {
- value = ByteString.copyFromUtf8(key);
- FIXED_BYTESTRING_CACHE.put(key, value);
- }
- return value;
+ return ShadedProtobufHelper.getFixedByteString(key);
}
+ /**
+ * Get the byte string of a non-null byte array.
+ * If the array is 0 bytes long, return a singleton to reduce object allocation.
+ * @param bytes bytes to convert.
+ * @return a value
+ */
public static ByteString getByteString(byte[] bytes) {
// return singleton to reduce object allocation
- return (bytes.length == 0) ? ByteString.EMPTY : ByteString.copyFrom(bytes);
+ return ShadedProtobufHelper.getByteString(bytes);
}
+ /**
+ * Get a token from a TokenProto payload.
+ * @param tokenProto marshalled token
+ * @return the token.
+ */
public static Token<? extends TokenIdentifier> tokenFromProto(
TokenProto tokenProto) {
- Token<? extends TokenIdentifier> token = new Token<>(
- tokenProto.getIdentifier().toByteArray(),
- tokenProto.getPassword().toByteArray(), new Text(tokenProto.getKind()),
- new Text(tokenProto.getService()));
- return token;
+ return ShadedProtobufHelper.tokenFromProto(tokenProto);
}
+ /**
+ * Create a {@code TokenProto} instance
+ * from a hadoop token.
+ * This builds and caches the fields
+ * (identifier, password, kind, service) but not
+ * renewer or any payload.
+ * @param tok token
+ * @return a marshallable protobuf class.
+ */
public static TokenProto protoFromToken(Token<?> tok) {
- TokenProto.Builder builder = TokenProto.newBuilder().
- setIdentifier(getByteString(tok.getIdentifier())).
- setPassword(getByteString(tok.getPassword())).
- setKindBytes(getFixedByteString(tok.getKind())).
- setServiceBytes(getFixedByteString(tok.getService()));
- return builder.build();
+ return ShadedProtobufHelper.protoFromToken(tok);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufWrapperLegacy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufWrapperLegacy.java
new file mode 100644
index 0000000..0f264e0
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufWrapperLegacy.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.Preconditions;
+
+/**
+ * A RpcWritable wrapper for unshaded protobuf messages.
+ * This class isolates unshaded protobuf classes from
+ * the rest of the RPC codebase, so it can operate without
+ * needing that on the classpath <i>at runtime</i>.
+ * The classes are needed at compile time; and if
+ * unshaded protobuf messages are to be marshalled, they
+ * will need to be on the classpath then.
+ * That is implicit: it is impossible to pass in a class
+ * which is a protobuf message unless that condition is met.
+ */
+@InterfaceAudience.Private
+public class ProtobufWrapperLegacy extends RpcWritable {
+
+ private com.google.protobuf.Message message;
+
+ /**
+ * Construct.
+ * The type of the parameter is Object so as to keep the casting internal
+ * to this class.
+ * @param message message to wrap.
+ * @throws IllegalArgumentException if the class is not a protobuf message.
+ */
+ public ProtobufWrapperLegacy(Object message) {
+ Preconditions.checkArgument(isUnshadedProtobufMessage(message),
+ "message class is not an unshaded protobuf message %s",
+ message.getClass());
+ this.message = (com.google.protobuf.Message) message;
+ }
+
+ public com.google.protobuf.Message getMessage() {
+ return message;
+ }
+
+
+ @Override
+ public void writeTo(ResponseBuffer out) throws IOException {
+ int length = message.getSerializedSize();
+ length += com.google.protobuf.CodedOutputStream.
+ computeUInt32SizeNoTag(length);
+ out.ensureCapacity(length);
+ message.writeDelimitedTo(out);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ protected <T> T readFrom(ByteBuffer bb) throws IOException {
+ // using the parser with a byte[]-backed coded input stream is the
+ // most efficient way to deserialize a protobuf. it has a direct
+ // path to the PB ctor that doesn't create multi-layered streams
+ // that internally buffer.
+ com.google.protobuf.CodedInputStream cis =
+ com.google.protobuf.CodedInputStream.newInstance(
+ bb.array(), bb.position() + bb.arrayOffset(), bb.remaining());
+ try {
+ cis.pushLimit(cis.readRawVarint32());
+ message = message.getParserForType().parseFrom(cis);
+ cis.checkLastTagWas(0);
+ } finally {
+ // advance over the bytes read.
+ bb.position(bb.position() + cis.getTotalBytesRead());
+ }
+ return (T) message;
+ }
+
+ /**
+ * Has protobuf been looked for and is known as absent?
+ * Saves a check on every message.
+ */
+ private static final AtomicBoolean PROTOBUF_KNOWN_NOT_FOUND =
+ new AtomicBoolean(false);
+
+ /**
+ * Is a message an unshaded protobuf message?
+ * @param payload payload
+ * @return true if protobuf.jar is on the classpath and the payload is a Message
+ */
+ public static boolean isUnshadedProtobufMessage(Object payload) {
+ if (PROTOBUF_KNOWN_NOT_FOUND.get()) {
+ // protobuf is known to be absent. fail fast without examining
+ // jars or generating exceptions.
+ return false;
+ }
+ // load the protobuf message class.
+ // if it does not load, then the payload is guaranteed not to be
+ // an unshaded protobuf message
+ // this relies on classloader caching for performance
+ try {
+ Class<?> protobufMessageClazz =
+ Class.forName("com.google.protobuf.Message");
+ return protobufMessageClazz.isAssignableFrom(payload.getClass());
+ } catch (ClassNotFoundException e) {
+ PROTOBUF_KNOWN_NOT_FOUND.set(true);
+ return false;
+ }
+
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
index 4af35ad..3f0b5e4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcClientUtil.java
@@ -31,9 +31,9 @@ import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureReq
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.GetProtocolSignatureResponseProto;
import org.apache.hadoop.ipc.protobuf.ProtocolInfoProtos.ProtocolSignatureProto;
import org.apache.hadoop.net.NetUtils;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/**
* This class maintains a cache of protocol versions and corresponding protocol
@@ -122,12 +122,8 @@ public class RpcClientUtil {
builder.setProtocol(protocol.getName());
builder.setRpcKind(rpcKind.toString());
GetProtocolSignatureResponseProto resp;
- try {
- resp = protocolInfoProxy.getProtocolSignature(NULL_CONTROLLER,
- builder.build());
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ resp = ipc(() -> protocolInfoProxy.getProtocolSignature(NULL_CONTROLLER,
+ builder.build()));
versionMap = convertProtocolSignatureProtos(resp
.getProtocolSignatureList());
putVersionSignatureMap(serverAddress, protocol.getName(),
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java
index f5f0d07..612c9b0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcWritable.java
@@ -41,9 +41,11 @@ public abstract class RpcWritable implements Writable {
if (o instanceof RpcWritable) {
return (RpcWritable)o;
} else if (o instanceof Message) {
+ // hadoop shaded protobuf
return new ProtobufWrapper((Message)o);
- } else if (o instanceof com.google.protobuf.Message) {
- return new ProtobufWrapperLegacy((com.google.protobuf.Message) o);
+ } else if (ProtobufWrapperLegacy.isUnshadedProtobufMessage(o)) {
+ // unshaded protobuf
+ return new ProtobufWrapperLegacy(o);
} else if (o instanceof Writable) {
return new WritableWrapper((Writable)o);
}
@@ -134,49 +136,6 @@ public abstract class RpcWritable implements Writable {
}
}
- // adapter for Protobufs.
- static class ProtobufWrapperLegacy extends RpcWritable {
- private com.google.protobuf.Message message;
-
- ProtobufWrapperLegacy(com.google.protobuf.Message message) {
- this.message = message;
- }
-
- com.google.protobuf.Message getMessage() {
- return message;
- }
-
- @Override
- void writeTo(ResponseBuffer out) throws IOException {
- int length = message.getSerializedSize();
- length += com.google.protobuf.CodedOutputStream.
- computeUInt32SizeNoTag(length);
- out.ensureCapacity(length);
- message.writeDelimitedTo(out);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- <T> T readFrom(ByteBuffer bb) throws IOException {
- // using the parser with a byte[]-backed coded input stream is the
- // most efficient way to deserialize a protobuf. it has a direct
- // path to the PB ctor that doesn't create multi-layered streams
- // that internally buffer.
- com.google.protobuf.CodedInputStream cis =
- com.google.protobuf.CodedInputStream.newInstance(
- bb.array(), bb.position() + bb.arrayOffset(), bb.remaining());
- try {
- cis.pushLimit(cis.readRawVarint32());
- message = message.getParserForType().parseFrom(cis);
- cis.checkLastTagWas(0);
- } finally {
- // advance over the bytes read.
- bb.position(bb.position() + cis.getTotalBytesRead());
- }
- return (T)message;
- }
- }
-
/**
* adapter to allow decoding of writables and protobufs from a byte buffer.
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/internal/ShadedProtobufHelper.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/internal/ShadedProtobufHelper.java
new file mode 100644
index 0000000..c0dcddd
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/internal/ShadedProtobufHelper.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ipc.internal;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.thirdparty.protobuf.ByteString;
+import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+/**
+ * Helper methods for protobuf related RPC implementation using the
+ * hadoop {@code org.apache.hadoop.thirdparty.protobuf} shaded version.
+ * This is <i>absolutely private to hadoop-* modules</i>.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class ShadedProtobufHelper {
+
+ private ShadedProtobufHelper() {
+ // Hidden constructor for class with only static helper methods
+ }
+
+ /**
+ * Return the IOException thrown by the remote server wrapped in
+ * ServiceException as cause.
+ * The signature of this method changes with updates to the hadoop-thirdparty
+ * shaded protobuf library.
+ * @param se ServiceException that wraps IO exception thrown by the server
+ * @return Exception wrapped in ServiceException or
+ * a new IOException that wraps the unexpected ServiceException.
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static IOException getRemoteException(ServiceException se) {
+ Throwable e = se.getCause();
+ if (e == null) {
+ return new IOException(se);
+ }
+ return e instanceof IOException
+ ? (IOException) e
+ : new IOException(se);
+ }
+
+ /**
+ * Map used to cache fixed strings to ByteStrings. Since there is no
+ * automatic expiration policy, only use this for strings from a fixed, small
+ * set.
+ * <p>
+ * This map should not be accessed directly. Used the getFixedByteString
+ * methods instead.
+ */
+ private static final ConcurrentHashMap<Object, ByteString>
+ FIXED_BYTESTRING_CACHE = new ConcurrentHashMap<>();
+
+ /**
+ * Get the ByteString for frequently used fixed and small set strings.
+ * @param key Hadoop Writable Text string
+ * @return the ByteString for frequently used fixed and small set strings.
+ */
+ public static ByteString getFixedByteString(Text key) {
+ ByteString value = FIXED_BYTESTRING_CACHE.get(key);
+ if (value == null) {
+ value = ByteString.copyFromUtf8(key.toString());
+ FIXED_BYTESTRING_CACHE.put(new Text(key.copyBytes()), value);
+ }
+ return value;
+ }
+
+ /**
+ * Get the ByteString for frequently used fixed and small set strings.
+ * @param key string
+ * @return ByteString for frequently used fixed and small set strings.
+ */
+ public static ByteString getFixedByteString(String key) {
+ ByteString value = FIXED_BYTESTRING_CACHE.get(key);
+ if (value == null) {
+ value = ByteString.copyFromUtf8(key);
+ FIXED_BYTESTRING_CACHE.put(key, value);
+ }
+ return value;
+ }
+
+ /**
+ * Get the byte string of a non-null byte array.
+ * If the array is 0 bytes long, return a singleton to reduce object allocation.
+ * @param bytes bytes to convert.
+ * @return the protobuf byte string representation of the array.
+ */
+ public static ByteString getByteString(byte[] bytes) {
+ // return singleton to reduce object allocation
+ return (bytes.length == 0)
+ ? ByteString.EMPTY
+ : ByteString.copyFrom(bytes);
+ }
+
+ /**
+ * Create a hadoop token from a protobuf token.
+ * @param tokenProto token
+ * @return a new token
+ */
+ public static Token<? extends TokenIdentifier> tokenFromProto(
+ TokenProto tokenProto) {
+ Token<? extends TokenIdentifier> token = new Token<>(
+ tokenProto.getIdentifier().toByteArray(),
+ tokenProto.getPassword().toByteArray(),
+ new Text(tokenProto.getKind()),
+ new Text(tokenProto.getService()));
+ return token;
+ }
+
+ /**
+ * Create a {@code TokenProto} instance
+ * from a hadoop token.
+ * This builds and caches the fields
+ * (identifier, password, kind, service) but not
+ * renewer or any payload.
+ * @param tok token
+ * @return a marshallable protobuf class.
+ */
+ public static TokenProto protoFromToken(Token<?> tok) {
+ TokenProto.Builder builder = TokenProto.newBuilder().
+ setIdentifier(getByteString(tok.getIdentifier())).
+ setPassword(getByteString(tok.getPassword())).
+ setKindBytes(getFixedByteString(tok.getKind())).
+ setServiceBytes(getFixedByteString(tok.getService()));
+ return builder.build();
+ }
+
+ /**
+ * Evaluate a protobuf call, converting any ServiceException to an IOException.
+ * @param call invocation to make
+ * @return the result of the call
+ * @param <T> type of the result
+ * @throws IOException any translated protobuf exception
+ */
+ public static <T> T ipc(IpcCall<T> call) throws IOException {
+ try {
+ return call.call();
+ } catch (ServiceException e) {
+ throw getRemoteException(e);
+ }
+ }
+
+ @FunctionalInterface
+ public interface IpcCall<T> {
+ T call() throws ServiceException;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/internal/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/internal/package-info.java
new file mode 100644
index 0000000..0ff2ba9
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/internal/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * IPC internal classes not for any use by libraries outside
+ * the apache hadoop source tree.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "YARN"})
+@InterfaceStability.Unstable
+package org.apache.hadoop.ipc.internal;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package-info.java
index cb35e93..f91fa23 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package-info.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/package-info.java
@@ -18,8 +18,12 @@
/**
* Tools to help define network clients and servers.
+ * Other ASF projects use this package, often with their own shaded/unshaded
+ * versions of protobuf messages.
+ * Changes to the API signatures will break things, especially changes to
+ * {@link org.apache.hadoop.ipc.RPC} and {@link org.apache.hadoop.ipc.RpcEngine}.
*/
-@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "MapReduce"})
+@InterfaceAudience.LimitedPrivate({"HBase", "HDFS", "MapReduce", "YARN", "Hive", "Ozone"})
@InterfaceStability.Evolving
package org.apache.hadoop.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/GenericRefreshProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/GenericRefreshProtocolClientSideTranslatorPB.java
index f8a3e25..78413be 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/GenericRefreshProtocolClientSideTranslatorPB.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/GenericRefreshProtocolClientSideTranslatorPB.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RefreshResponse;
@@ -34,9 +33,9 @@ import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshRequestProto;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseProto;
import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshResponseCollectionProto;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class GenericRefreshProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, GenericRefreshProtocol, Closeable {
@@ -59,17 +58,13 @@ public class GenericRefreshProtocolClientSideTranslatorPB implements
public Collection<RefreshResponse> refresh(String identifier, String[] args) throws IOException {
List<String> argList = Arrays.asList(args);
- try {
- GenericRefreshRequestProto request = GenericRefreshRequestProto.newBuilder()
- .setIdentifier(identifier)
- .addAllArgs(argList)
- .build();
+ GenericRefreshRequestProto request = GenericRefreshRequestProto.newBuilder()
+ .setIdentifier(identifier).addAllArgs(argList).build();
+
+ GenericRefreshResponseCollectionProto resp = ipc(() ->
+ rpcProxy.refresh(NULL_CONTROLLER, request));
+ return unpack(resp);
- GenericRefreshResponseCollectionProto resp = rpcProxy.refresh(NULL_CONTROLLER, request);
- return unpack(resp);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
}
private Collection<RefreshResponse> unpack(GenericRefreshResponseCollectionProto collection) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/RefreshCallQueueProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/RefreshCallQueueProtocolClientSideTranslatorPB.java
index e378a93..46182ca 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/RefreshCallQueueProtocolClientSideTranslatorPB.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/protocolPB/RefreshCallQueueProtocolClientSideTranslatorPB.java
@@ -21,16 +21,14 @@ package org.apache.hadoop.ipc.protocolPB;
import java.io.Closeable;
import java.io.IOException;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueRequestProto;
-import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class RefreshCallQueueProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, RefreshCallQueueProtocol, Closeable {
@@ -55,12 +53,8 @@ public class RefreshCallQueueProtocolClientSideTranslatorPB implements
@Override
public void refreshCallQueue() throws IOException {
- try {
- rpcProxy.refreshCallQueue(NULL_CONTROLLER,
- VOID_REFRESH_CALL_QUEUE_REQUEST);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.refreshCallQueue(NULL_CONTROLLER,
+ VOID_REFRESH_CALL_QUEUE_REQUEST));
}
@Override
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java
index ef309cb..bbbcc95 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Credentials.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.security;
+import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import java.io.BufferedInputStream;
@@ -46,7 +47,6 @@ import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.proto.SecurityProtos.CredentialsKVProto;
@@ -382,7 +382,7 @@ public class Credentials implements Writable {
CredentialsKVProto.Builder kv = CredentialsKVProto.newBuilder().
setAliasBytes(ByteString.copyFrom(
e.getKey().getBytes(), 0, e.getKey().getLength())).
- setToken(ProtobufHelper.protoFromToken(e.getValue()));
+ setToken(ShadedProtobufHelper.protoFromToken(e.getValue()));
storage.addTokens(kv.build());
}
@@ -404,7 +404,7 @@ public class Credentials implements Writable {
CredentialsProto storage = CredentialsProto.parseDelimitedFrom((DataInputStream)in);
for (CredentialsKVProto kv : storage.getTokensList()) {
addToken(new Text(kv.getAliasBytes().toByteArray()),
- ProtobufHelper.tokenFromProto(kv.getToken()));
+ ShadedProtobufHelper.tokenFromProto(kv.getToken()));
}
for (CredentialsKVProto kv : storage.getSecretsList()) {
addSecretKey(new Text(kv.getAliasBytes().toByteArray()),
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java
index 6e7a856..49887bd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/protocolPB/RefreshAuthorizationPolicyProtocolClientSideTranslatorPB.java
@@ -21,16 +21,14 @@ package org.apache.hadoop.security.protocolPB;
import java.io.Closeable;
import java.io.IOException;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.proto.RefreshAuthorizationPolicyProtocolProtos.RefreshServiceAclRequestProto;
-import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolPB;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class RefreshAuthorizationPolicyProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, RefreshAuthorizationPolicyProtocol, Closeable {
@@ -55,12 +53,8 @@ public class RefreshAuthorizationPolicyProtocolClientSideTranslatorPB implements
@Override
public void refreshServiceAcl() throws IOException {
- try {
- rpcProxy.refreshServiceAcl(NULL_CONTROLLER,
- VOID_REFRESH_SERVICE_ACL_REQUEST);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.refreshServiceAcl(NULL_CONTROLLER,
+ VOID_REFRESH_SERVICE_ACL_REQUEST));
}
@Override
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java
index ac40038..cb80d06 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/protocolPB/RefreshUserMappingsProtocolClientSideTranslatorPB.java
@@ -21,16 +21,15 @@ package org.apache.hadoop.security.protocolPB;
import java.io.Closeable;
import java.io.IOException;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshSuperUserGroupsConfigurationRequestProto;
import org.apache.hadoop.security.proto.RefreshUserMappingsProtocolProtos.RefreshUserToGroupsMappingsRequestProto;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class RefreshUserMappingsProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, RefreshUserMappingsProtocol, Closeable {
@@ -59,22 +58,14 @@ public class RefreshUserMappingsProtocolClientSideTranslatorPB implements
@Override
public void refreshUserToGroupsMappings() throws IOException {
- try {
- rpcProxy.refreshUserToGroupsMappings(NULL_CONTROLLER,
- VOID_REFRESH_USER_TO_GROUPS_MAPPING_REQUEST);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.refreshUserToGroupsMappings(NULL_CONTROLLER,
+ VOID_REFRESH_USER_TO_GROUPS_MAPPING_REQUEST));
}
@Override
public void refreshSuperUserGroupsConfiguration() throws IOException {
- try {
- rpcProxy.refreshSuperUserGroupsConfiguration(NULL_CONTROLLER,
- VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_REQUEST);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.refreshSuperUserGroupsConfiguration(NULL_CONTROLLER,
+ VOID_REFRESH_SUPERUSER_GROUPS_CONFIGURATION_REQUEST));
}
@Override
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java
index b0c0fda..8d8885a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/tools/protocolPB/GetUserMappingsProtocolClientSideTranslatorPB.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.tools.protocolPB;
import java.io.Closeable;
import java.io.IOException;
-import org.apache.hadoop.ipc.ProtobufHelper;
+
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
@@ -29,7 +29,8 @@ import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetGroupsForU
import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetGroupsForUserResponseProto;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
public class GetUserMappingsProtocolClientSideTranslatorPB implements
ProtocolMetaInterface, GetUserMappingsProtocol, Closeable {
@@ -53,11 +54,7 @@ public class GetUserMappingsProtocolClientSideTranslatorPB implements
GetGroupsForUserRequestProto request = GetGroupsForUserRequestProto
.newBuilder().setUser(user).build();
GetGroupsForUserResponseProto resp;
- try {
- resp = rpcProxy.getGroupsForUser(NULL_CONTROLLER, request);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ resp = ipc(() -> rpcProxy.getGroupsForUser(NULL_CONTROLLER, request));
return resp.getGroupsList().toArray(new String[resp.getGroupsCount()]);
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestShadedProtobufHelper.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestShadedProtobufHelper.java
new file mode 100644
index 0000000..fb4e831
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestShadedProtobufHelper.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.test.LambdaTestUtils.verifyCause;
+
+/**
+ * Test methods in {@link ShadedProtobufHelper}.
+ */
+public class TestShadedProtobufHelper extends AbstractHadoopTestBase {
+
+ @Test
+ public void testExtractRemoteExceptionNoCause() throws Throwable {
+ ServiceException source = new ServiceException("empty");
+
+ IOException ex = ShadedProtobufHelper.getRemoteException(source);
+ verifyCause(ServiceException.class, ex);
+ }
+
+ @Test
+ public void testExtractRemoteExceptionIOECause() throws Throwable {
+ IOException source = new IOException("ioe");
+
+ IOException ex = ShadedProtobufHelper.getRemoteException(
+ new ServiceException(source));
+ // if not the same, throw
+ if (!(ex == source)) {
+ throw ex;
+ }
+ }
+
+ @Test
+ public void testExtractRemoteExceptionOtherCause() throws Throwable {
+ NullPointerException source = new NullPointerException("npe");
+
+ IOException ex = ShadedProtobufHelper.getRemoteException(
+ new ServiceException(source));
+ // if not the same, throw
+ ServiceException c1 = verifyCause(ServiceException.class, ex);
+ verifyCause(NullPointerException.class, c1);
+ }
+
+ @Test
+ public void testIPCWrapperServiceException() throws Throwable {
+ intercept(IOException.class, "expected", () -> {
+ ipc(() -> {
+ throw new ServiceException("expected");
+ });
+ });
+ }
+
+ @Test
+ public void testIPCWrapperNPE() throws Throwable {
+ final IOException ex = intercept(IOException.class, "npe", () -> {
+ ipc(() -> {
+ throw new ServiceException(new NullPointerException("npe"));
+ });
+ });
+ ServiceException c1 = verifyCause(ServiceException.class, ex);
+ verifyCause(NullPointerException.class, c1);
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
index 3e0d31d..da6abc5 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
@@ -819,7 +819,7 @@ public final class LambdaTestUtils {
if (cause == null || !clazz.isAssignableFrom(cause.getClass())) {
throw caught;
} else {
- return (E) caught;
+ return (E) cause;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
index 47234e8..9aa3413 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.DiskBa
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus.Result;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -76,13 +75,13 @@ import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
+
/**
* This class is the client side translator to translate the requests made on
* {@link ClientDatanodeProtocol} interfaces to the RPC server implementing
@@ -197,31 +196,19 @@ public class ClientDatanodeProtocolTranslatorPB implements
GetReplicaVisibleLengthRequestProto req =
GetReplicaVisibleLengthRequestProto.newBuilder()
.setBlock(PBHelperClient.convert(b)).build();
- try {
- return rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.getReplicaVisibleLength(NULL_CONTROLLER, req).getLength());
}
@Override
public void refreshNamenodes() throws IOException {
- try {
- rpcProxy.refreshNamenodes(NULL_CONTROLLER, VOID_REFRESH_NAMENODES);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.refreshNamenodes(NULL_CONTROLLER, VOID_REFRESH_NAMENODES));
}
@Override
public void deleteBlockPool(String bpid, boolean force) throws IOException {
DeleteBlockPoolRequestProto req = DeleteBlockPoolRequestProto.newBuilder()
.setBlockPool(bpid).setForce(force).build();
- try {
- rpcProxy.deleteBlockPool(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.deleteBlockPool(NULL_CONTROLLER, req));
}
@Override
@@ -232,11 +219,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
.setBlock(PBHelperClient.convert(block))
.setToken(PBHelperClient.convert(token)).build();
GetBlockLocalPathInfoResponseProto resp;
- try {
- resp = rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ resp = ipc(() -> rpcProxy.getBlockLocalPathInfo(NULL_CONTROLLER, req));
return new BlockLocalPathInfo(PBHelperClient.convert(resp.getBlock()),
resp.getLocalPath(), resp.getLocalMetaPath());
}
@@ -257,94 +240,61 @@ public class ClientDatanodeProtocolTranslatorPB implements
public void shutdownDatanode(boolean forUpgrade) throws IOException {
ShutdownDatanodeRequestProto request = ShutdownDatanodeRequestProto
.newBuilder().setForUpgrade(forUpgrade).build();
- try {
- rpcProxy.shutdownDatanode(NULL_CONTROLLER, request);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.shutdownDatanode(NULL_CONTROLLER, request));
}
@Override
public void evictWriters() throws IOException {
- try {
- rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.evictWriters(NULL_CONTROLLER, VOID_EVICT_WRITERS));
}
@Override
public DatanodeLocalInfo getDatanodeInfo() throws IOException {
GetDatanodeInfoResponseProto response;
- try {
- response = rpcProxy.getDatanodeInfo(NULL_CONTROLLER,
- VOID_GET_DATANODE_INFO);
- return PBHelperClient.convert(response.getLocalInfo());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ response = ipc(() -> rpcProxy.getDatanodeInfo(NULL_CONTROLLER,
+ VOID_GET_DATANODE_INFO));
+ return PBHelperClient.convert(response.getLocalInfo());
}
@Override
public void startReconfiguration() throws IOException {
- try {
- rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG));
}
@Override
public ReconfigurationTaskStatus getReconfigurationStatus()
throws IOException {
- try {
- return ReconfigurationProtocolUtils.getReconfigurationStatus(
- rpcProxy
- .getReconfigurationStatus(
- NULL_CONTROLLER,
- VOID_GET_RECONFIG_STATUS));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ReconfigurationProtocolUtils.getReconfigurationStatus(
+ ipc(() -> rpcProxy.getReconfigurationStatus(
+ NULL_CONTROLLER,
+ VOID_GET_RECONFIG_STATUS)));
}
@Override
public List<String> listReconfigurableProperties() throws IOException {
ListReconfigurablePropertiesResponseProto response;
- try {
- response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
- VOID_LIST_RECONFIGURABLE_PROPERTIES);
- return response.getNameList();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ response = ipc(() -> rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
+ VOID_LIST_RECONFIGURABLE_PROPERTIES));
+ return response.getNameList();
}
@Override
public void triggerBlockReport(BlockReportOptions options)
throws IOException {
- try {
- TriggerBlockReportRequestProto.Builder builder = TriggerBlockReportRequestProto.newBuilder().
- setIncremental(options.isIncremental());
- if (options.getNamenodeAddr() != null) {
- builder.setNnAddress(NetUtils.getHostPortString(options.getNamenodeAddr()));
- }
- rpcProxy.triggerBlockReport(NULL_CONTROLLER, builder.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ TriggerBlockReportRequestProto.Builder builder = TriggerBlockReportRequestProto.newBuilder().
+ setIncremental(options.isIncremental());
+ if (options.getNamenodeAddr() != null) {
+ builder.setNnAddress(NetUtils.getHostPortString(options.getNamenodeAddr()));
}
+ ipc(() -> rpcProxy.triggerBlockReport(NULL_CONTROLLER, builder.build()));
}
@Override
public long getBalancerBandwidth() throws IOException {
GetBalancerBandwidthResponseProto response;
- try {
- response = rpcProxy.getBalancerBandwidth(NULL_CONTROLLER,
- VOID_GET_BALANCER_BANDWIDTH);
- return response.getBandwidth();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ response = ipc(() -> rpcProxy.getBalancerBandwidth(NULL_CONTROLLER,
+ VOID_GET_BALANCER_BANDWIDTH));
+ return response.getBandwidth();
}
/**
@@ -363,19 +313,15 @@ public class ClientDatanodeProtocolTranslatorPB implements
public void submitDiskBalancerPlan(String planID, long planVersion,
String planFile, String planData, boolean skipDateCheck)
throws IOException {
- try {
- SubmitDiskBalancerPlanRequestProto request =
- SubmitDiskBalancerPlanRequestProto.newBuilder()
- .setPlanID(planID)
- .setPlanVersion(planVersion)
- .setPlanFile(planFile)
- .setPlan(planData)
- .setIgnoreDateCheck(skipDateCheck)
- .build();
- rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ SubmitDiskBalancerPlanRequestProto request =
+ SubmitDiskBalancerPlanRequestProto.newBuilder()
+ .setPlanID(planID)
+ .setPlanVersion(planVersion)
+ .setPlanFile(planFile)
+ .setPlan(planData)
+ .setIgnoreDateCheck(skipDateCheck)
+ .build();
+ ipc(() -> rpcProxy.submitDiskBalancerPlan(NULL_CONTROLLER, request));
}
/**
@@ -387,13 +333,9 @@ public class ClientDatanodeProtocolTranslatorPB implements
@Override
public void cancelDiskBalancePlan(String planID)
throws IOException {
- try {
- CancelPlanRequestProto request = CancelPlanRequestProto.newBuilder()
- .setPlanID(planID).build();
- rpcProxy.cancelDiskBalancerPlan(NULL_CONTROLLER, request);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ CancelPlanRequestProto request = CancelPlanRequestProto.newBuilder()
+ .setPlanID(planID).build();
+ ipc(() -> rpcProxy.cancelDiskBalancerPlan(NULL_CONTROLLER, request));
}
/**
@@ -401,56 +343,44 @@ public class ClientDatanodeProtocolTranslatorPB implements
*/
@Override
public DiskBalancerWorkStatus queryDiskBalancerPlan() throws IOException {
- try {
- QueryPlanStatusRequestProto request =
- QueryPlanStatusRequestProto.newBuilder().build();
- QueryPlanStatusResponseProto response =
- rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request);
- DiskBalancerWorkStatus.Result result = Result.NO_PLAN;
- if(response.hasResult()) {
- result = DiskBalancerWorkStatus.Result.values()[
- response.getResult()];
- }
-
- return new DiskBalancerWorkStatus(result,
- response.hasPlanID() ? response.getPlanID() : null,
- response.hasPlanFile() ? response.getPlanFile() : null,
- response.hasCurrentStatus() ? response.getCurrentStatus() : null);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ QueryPlanStatusRequestProto request =
+ QueryPlanStatusRequestProto.newBuilder().build();
+ QueryPlanStatusResponseProto response =
+ ipc(() -> rpcProxy.queryDiskBalancerPlan(NULL_CONTROLLER, request));
+ DiskBalancerWorkStatus.Result result = Result.NO_PLAN;
+ if(response.hasResult()) {
+ result = DiskBalancerWorkStatus.Result.values()[
+ response.getResult()];
}
+
+ return new DiskBalancerWorkStatus(result,
+ response.hasPlanID() ? response.getPlanID() : null,
+ response.hasPlanFile() ? response.getPlanFile() : null,
+ response.hasCurrentStatus() ? response.getCurrentStatus() : null);
}
@Override
public String getDiskBalancerSetting(String key) throws IOException {
- try {
- DiskBalancerSettingRequestProto request =
- DiskBalancerSettingRequestProto.newBuilder().setKey(key).build();
- DiskBalancerSettingResponseProto response =
- rpcProxy.getDiskBalancerSetting(NULL_CONTROLLER, request);
- return response.hasValue() ? response.getValue() : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ DiskBalancerSettingRequestProto request =
+ DiskBalancerSettingRequestProto.newBuilder().setKey(key).build();
+ DiskBalancerSettingResponseProto response =
+ ipc(() -> rpcProxy.getDiskBalancerSetting(NULL_CONTROLLER, request));
+ return response.hasValue() ? response.getValue() : null;
}
@Override
public List<DatanodeVolumeInfo> getVolumeReport() throws IOException {
- try {
- List<DatanodeVolumeInfo> volumeInfoList = new ArrayList<>();
- GetVolumeReportResponseProto volumeReport = rpcProxy.getVolumeReport(
- NULL_CONTROLLER, VOID_GET_DATANODE_STORAGE_INFO);
- List<DatanodeVolumeInfoProto> volumeProtoList = volumeReport
- .getVolumeInfoList();
- for (DatanodeVolumeInfoProto proto : volumeProtoList) {
- volumeInfoList.add(new DatanodeVolumeInfo(proto.getPath(), proto
- .getUsedSpace(), proto.getFreeSpace(), proto.getReservedSpace(),
- proto.getReservedSpaceForReplicas(), proto.getNumBlocks(),
- PBHelperClient.convertStorageType(proto.getStorageType())));
- }
- return volumeInfoList;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ List<DatanodeVolumeInfo> volumeInfoList = new ArrayList<>();
+ GetVolumeReportResponseProto volumeReport = ipc(() -> rpcProxy.getVolumeReport(
+ NULL_CONTROLLER, VOID_GET_DATANODE_STORAGE_INFO));
+ List<DatanodeVolumeInfoProto> volumeProtoList = volumeReport
+ .getVolumeInfoList();
+ for (DatanodeVolumeInfoProto proto : volumeProtoList) {
+ volumeInfoList.add(new DatanodeVolumeInfo(proto.getPath(), proto
+ .getUsedSpace(), proto.getFreeSpace(), proto.getReservedSpace(),
+ proto.getReservedSpaceForReplicas(), proto.getNumBlocks(),
+ PBHelperClient.convertStorageType(proto.getStorageType())));
}
+ return volumeInfoList;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index ca5d978..52f057f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -237,7 +237,6 @@ import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.AsyncCallHandler;
import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -256,6 +255,9 @@ import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.util.concurrent.AsyncGet;
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.getRemoteException;
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
+
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
* while translating from the parameter types used in ClientProtocol to the
@@ -330,25 +332,17 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setOffset(offset)
.setLength(length)
.build();
- try {
- GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null,
- req);
- return resp.hasLocations() ?
- PBHelperClient.convert(resp.getLocations()) : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetBlockLocationsResponseProto resp = ipc(() -> rpcProxy.getBlockLocations(null,
+ req));
+ return resp.hasLocations() ?
+ PBHelperClient.convert(resp.getLocations()) : null;
}
@Override
public FsServerDefaults getServerDefaults() throws IOException {
GetServerDefaultsRequestProto req = VOID_GET_SERVER_DEFAULT_REQUEST;
- try {
- return PBHelperClient
- .convert(rpcProxy.getServerDefaults(null, req).getServerDefaults());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient
+ .convert(ipc(() -> rpcProxy.getServerDefaults(null, req).getServerDefaults()));
}
@Override
@@ -379,13 +373,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
builder.addAllCryptoProtocolVersion(
PBHelperClient.convert(supportedVersions));
CreateRequestProto req = builder.build();
- try {
- CreateResponseProto res = rpcProxy.create(null, req);
- return res.hasFs() ? PBHelperClient.convert(res.getFs()) : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
-
+ CreateResponseProto res = ipc(() -> rpcProxy.create(null, req));
+ return res.hasFs() ? PBHelperClient.convert(res.getFs()) : null;
}
@Override
@@ -396,11 +385,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setNewLength(newLength)
.setClientName(clientName)
.build();
- try {
- return rpcProxy.truncate(null, req).getResult();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.truncate(null, req).getResult());
}
@Override
@@ -410,16 +395,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setClientName(clientName).setFlag(
PBHelperClient.convertCreateFlag(flag))
.build();
- try {
- AppendResponseProto res = rpcProxy.append(null, req);
- LocatedBlock lastBlock = res.hasBlock() ? PBHelperClient
- .convertLocatedBlockProto(res.getBlock()) : null;
- HdfsFileStatus stat = (res.hasStat()) ?
- PBHelperClient.convert(res.getStat()) : null;
- return new LastBlockWithStatus(lastBlock, stat);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ AppendResponseProto res = ipc(() -> rpcProxy.append(null, req));
+ LocatedBlock lastBlock = res.hasBlock() ? PBHelperClient
+ .convertLocatedBlockProto(res.getBlock()) : null;
+ HdfsFileStatus stat = (res.hasStat()) ?
+ PBHelperClient.convert(res.getStat()) : null;
+ return new LastBlockWithStatus(lastBlock, stat);
}
@Override
@@ -429,11 +410,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setSrc(src)
.setReplication(replication)
.build();
- try {
- return rpcProxy.setReplication(null, req).getResult();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.setReplication(null, req)).getResult();
}
@Override
@@ -443,15 +420,11 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setSrc(src)
.setPermission(PBHelperClient.convert(permission))
.build();
- try {
- if (Client.isAsynchronousMode()) {
- rpcProxy.setPermission(null, req);
- setAsyncReturnValue();
- } else {
- rpcProxy.setPermission(null, req);
- }
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ if (Client.isAsynchronousMode()) {
+ ipc(() -> rpcProxy.setPermission(null, req));
+ setAsyncReturnValue();
+ } else {
+ ipc(() -> rpcProxy.setPermission(null, req));
}
}
@@ -483,15 +456,11 @@ public class ClientNamenodeProtocolTranslatorPB implements
req.setUsername(username);
if (groupname != null)
req.setGroupname(groupname);
- try {
- if (Client.isAsynchronousMode()) {
- rpcProxy.setOwner(null, req.build());
- setAsyncReturnValue();
- } else {
- rpcProxy.setOwner(null, req.build());
- }
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ if (Client.isAsynchronousMode()) {
+ ipc(() -> rpcProxy.setOwner(null, req.build()));
+ setAsyncReturnValue();
+ } else {
+ ipc(() -> rpcProxy.setOwner(null, req.build()));
}
}
@@ -501,11 +470,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
AbandonBlockRequestProto req = AbandonBlockRequestProto.newBuilder()
.setB(PBHelperClient.convert(b)).setSrc(src).setHolder(holder)
.setFileId(fileId).build();
- try {
- rpcProxy.abandonBlock(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.abandonBlock(null, req));
}
@Override
@@ -526,12 +491,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
req.addAllFlags(PBHelperClient.convertAddBlockFlags(
addBlockFlags));
}
- try {
- return PBHelperClient.convertLocatedBlockProto(
- rpcProxy.addBlock(null, req.build()).getBlock());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convertLocatedBlockProto(
+ ipc(() -> rpcProxy.addBlock(null, req.build())).getBlock());
}
@Override
@@ -550,12 +511,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setNumAdditionalNodes(numAdditionalNodes)
.setClientName(clientName)
.build();
- try {
- return PBHelperClient.convertLocatedBlockProto(
- rpcProxy.getAdditionalDatanode(null, req).getBlock());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convertLocatedBlockProto(
+ ipc(() -> rpcProxy.getAdditionalDatanode(null, req)).getBlock());
}
@Override
@@ -567,11 +524,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setFileId(fileId);
if (last != null)
req.setLast(PBHelperClient.convert(last));
- try {
- return rpcProxy.complete(null, req.build()).getResult();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.complete(null, req.build())).getResult();
}
@Override
@@ -580,11 +533,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.addAllBlocks(Arrays.asList(
PBHelperClient.convertLocatedBlocks(blocks)))
.build();
- try {
- rpcProxy.reportBadBlocks(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.reportBadBlocks(null, req));
}
@Override
@@ -593,11 +542,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setSrc(src)
.setDst(dst).build();
- try {
- return rpcProxy.rename(null, req).getResult();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.rename(null, req)).getResult();
}
@@ -622,17 +567,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
setOverwriteDest(overwrite).
setMoveToTrash(toTrash).
build();
- try {
- if (Client.isAsynchronousMode()) {
- rpcProxy.rename2(null, req);
- setAsyncReturnValue();
- } else {
- rpcProxy.rename2(null, req);
- }
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ if (Client.isAsynchronousMode()) {
+ ipc(() -> rpcProxy.rename2(null, req));
+ setAsyncReturnValue();
+ } else {
+ ipc(() -> rpcProxy.rename2(null, req));
}
-
}
@Override
@@ -640,11 +580,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
ConcatRequestProto req = ConcatRequestProto.newBuilder().
setTrg(trg).
addAllSrcs(Arrays.asList(srcs)).build();
- try {
- rpcProxy.concat(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.concat(null, req));
}
@@ -652,11 +588,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
public boolean delete(String src, boolean recursive) throws IOException {
DeleteRequestProto req = DeleteRequestProto.newBuilder().setSrc(src)
.setRecursive(recursive).build();
- try {
- return rpcProxy.delete(null, req).getResult();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.delete(null, req).getResult());
}
@Override
@@ -671,11 +603,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
builder.setUnmasked(PBHelperClient.convert(unmasked));
}
MkdirsRequestProto req = builder.build();
- try {
- return rpcProxy.mkdirs(null, req).getResult();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.mkdirs(null, req)).getResult();
}
@Override
@@ -685,16 +613,11 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setSrc(src)
.setStartAfter(ByteString.copyFrom(startAfter))
.setNeedLocation(needLocation).build();
- try {
- GetListingResponseProto result = rpcProxy.getListing(null, req);
-
- if (result.hasDirList()) {
- return PBHelperClient.convert(result.getDirList());
- }
- return null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ GetListingResponseProto result = ipc(() -> rpcProxy.getListing(null, req));
+ if (result.hasDirList()) {
+ return PBHelperClient.convert(result.getDirList());
}
+ return null;
}
@Override
@@ -706,50 +629,43 @@ public class ClientNamenodeProtocolTranslatorPB implements
.addAllPaths(Arrays.asList(srcs))
.setStartAfter(ByteString.copyFrom(startAfter))
.setNeedLocation(needLocation).build();
- try {
- GetBatchedListingResponseProto result =
- rpcProxy.getBatchedListing(null, req);
-
- if (result.getListingsCount() > 0) {
- HdfsPartialListing[] listingArray =
- new HdfsPartialListing[result.getListingsCount()];
- int listingIdx = 0;
- for (BatchedDirectoryListingProto proto : result.getListingsList()) {
- HdfsPartialListing listing;
- if (proto.hasException()) {
- HdfsProtos.RemoteExceptionProto reProto = proto.getException();
- RemoteException ex = new RemoteException(
- reProto.getClassName(), reProto.getMessage());
- listing = new HdfsPartialListing(proto.getParentIdx(), ex);
- } else {
- List<HdfsFileStatus> statuses =
- PBHelperClient.convertHdfsFileStatus(
- proto.getPartialListingList());
- listing = new HdfsPartialListing(proto.getParentIdx(), statuses);
- }
- listingArray[listingIdx++] = listing;
+ GetBatchedListingResponseProto result =
+ ipc(() -> rpcProxy.getBatchedListing(null, req));
+
+ if (result.getListingsCount() > 0) {
+ HdfsPartialListing[] listingArray =
+ new HdfsPartialListing[result.getListingsCount()];
+ int listingIdx = 0;
+ for (BatchedDirectoryListingProto proto : result.getListingsList()) {
+ HdfsPartialListing listing;
+ if (proto.hasException()) {
+ HdfsProtos.RemoteExceptionProto reProto = proto.getException();
+ RemoteException ex = new RemoteException(
+ reProto.getClassName(), reProto.getMessage());
+ listing = new HdfsPartialListing(proto.getParentIdx(), ex);
+ } else {
+ List<HdfsFileStatus> statuses =
+ PBHelperClient.convertHdfsFileStatus(
+ proto.getPartialListingList());
+ listing = new HdfsPartialListing(proto.getParentIdx(), statuses);
}
- BatchedDirectoryListing batchedListing =
- new BatchedDirectoryListing(listingArray, result.getHasMore(),
- result.getStartAfter().toByteArray());
- return batchedListing;
+ listingArray[listingIdx++] = listing;
}
- return null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ BatchedDirectoryListing batchedListing =
+ new BatchedDirectoryListing(listingArray, result.getHasMore(),
+ result.getStartAfter().toByteArray());
+ return batchedListing;
}
+ return null;
}
@Override
- public void renewLease(String clientName) throws IOException {
- RenewLeaseRequestProto req = RenewLeaseRequestProto.newBuilder()
- .setClientName(clientName).build();
- try {
- rpcProxy.renewLease(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ public void renewLease(String clientName)
+ throws IOException {
+ RenewLeaseRequestProto.Builder builder = RenewLeaseRequestProto
+ .newBuilder().setClientName(clientName);
+ ipc(() -> rpcProxy.renewLease(null, builder.build()));
}
@Override
@@ -758,41 +674,25 @@ public class ClientNamenodeProtocolTranslatorPB implements
RecoverLeaseRequestProto req = RecoverLeaseRequestProto.newBuilder()
.setSrc(src)
.setClientName(clientName).build();
- try {
- return rpcProxy.recoverLease(null, req).getResult();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.recoverLease(null, req)).getResult();
}
@Override
public long[] getStats() throws IOException {
- try {
- return PBHelperClient.convert(rpcProxy.getFsStats(null,
- VOID_GET_FSSTATUS_REQUEST));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(ipc(() -> rpcProxy.getFsStats(null,
+ VOID_GET_FSSTATUS_REQUEST)));
}
@Override
public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
- try {
- return PBHelperClient.convert(rpcProxy.getFsReplicatedBlockStats(null,
- VOID_GET_FS_REPLICATED_BLOCK_STATS_REQUEST));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(ipc(() -> rpcProxy.getFsReplicatedBlockStats(null,
+ VOID_GET_FS_REPLICATED_BLOCK_STATS_REQUEST)));
}
@Override
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
- try {
- return PBHelperClient.convert(rpcProxy.getFsECBlockGroupStats(null,
- VOID_GET_FS_ECBLOCKGROUP_STATS_REQUEST));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(ipc(() -> rpcProxy.getFsECBlockGroupStats(null,
+ VOID_GET_FS_ECBLOCKGROUP_STATS_REQUEST)));
}
@Override
@@ -801,12 +701,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
GetDatanodeReportRequestProto req = GetDatanodeReportRequestProto
.newBuilder()
.setType(PBHelperClient.convert(type)).build();
- try {
- return PBHelperClient.convert(
- rpcProxy.getDatanodeReport(null, req).getDiList());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(
+ ipc(() -> rpcProxy.getDatanodeReport(null, req)).getDiList());
}
@Override
@@ -815,13 +711,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
final GetDatanodeStorageReportRequestProto req
= GetDatanodeStorageReportRequestProto.newBuilder()
.setType(PBHelperClient.convert(type)).build();
- try {
- return PBHelperClient.convertDatanodeStorageReports(
- rpcProxy.getDatanodeStorageReport(null, req)
- .getDatanodeStorageReportsList());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convertDatanodeStorageReports(
+ ipc(() -> rpcProxy.getDatanodeStorageReport(null, req)
+ .getDatanodeStorageReportsList()));
}
@Override
@@ -830,11 +722,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.newBuilder()
.setFilename(filename)
.build();
- try {
- return rpcProxy.getPreferredBlockSize(null, req).getBsize();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.getPreferredBlockSize(null, req)).getBsize();
}
@Override
@@ -843,33 +731,21 @@ public class ClientNamenodeProtocolTranslatorPB implements
SetSafeModeRequestProto req = SetSafeModeRequestProto.newBuilder()
.setAction(PBHelperClient.convert(action))
.setChecked(isChecked).build();
- try {
- return rpcProxy.setSafeMode(null, req).getResult();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.setSafeMode(null, req)).getResult();
}
@Override
public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
- try {
- SaveNamespaceRequestProto req = SaveNamespaceRequestProto.newBuilder()
- .setTimeWindow(timeWindow).setTxGap(txGap).build();
- return rpcProxy.saveNamespace(null, req).getSaved();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ SaveNamespaceRequestProto req = SaveNamespaceRequestProto.newBuilder()
+ .setTimeWindow(timeWindow).setTxGap(txGap).build();
+ return ipc(() -> rpcProxy.saveNamespace(null, req)).getSaved();
}
@Override
public long rollEdits() throws IOException {
- try {
- RollEditsResponseProto resp = rpcProxy.rollEdits(null,
- VOID_ROLLEDITS_REQUEST);
- return resp.getNewSegmentTxId();
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ RollEditsResponseProto resp = ipc(() -> rpcProxy.rollEdits(null,
+ VOID_ROLLEDITS_REQUEST));
+ return resp.getNewSegmentTxId();
}
@Override
@@ -877,40 +753,24 @@ public class ClientNamenodeProtocolTranslatorPB implements
RestoreFailedStorageRequestProto req = RestoreFailedStorageRequestProto
.newBuilder()
.setArg(arg).build();
- try {
- return rpcProxy.restoreFailedStorage(null, req).getResult();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.restoreFailedStorage(null, req)).getResult();
}
@Override
public void refreshNodes() throws IOException {
- try {
- rpcProxy.refreshNodes(null, VOID_REFRESH_NODES_REQUEST);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.refreshNodes(null, VOID_REFRESH_NODES_REQUEST));
}
@Override
public void finalizeUpgrade() throws IOException {
- try {
- rpcProxy.finalizeUpgrade(null, VOID_FINALIZE_UPGRADE_REQUEST);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.finalizeUpgrade(null, VOID_FINALIZE_UPGRADE_REQUEST));
}
@Override
public boolean upgradeStatus() throws IOException {
- try {
- final UpgradeStatusResponseProto proto = rpcProxy.upgradeStatus(
- null, VOID_UPGRADE_STATUS_REQUEST);
- return proto.getUpgradeFinalized();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ final UpgradeStatusResponseProto proto = ipc(() -> rpcProxy.upgradeStatus(
+ null, VOID_UPGRADE_STATUS_REQUEST));
+ return proto.getUpgradeFinalized();
}
@Override
@@ -918,16 +778,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
throws IOException {
final RollingUpgradeRequestProto r = RollingUpgradeRequestProto.newBuilder()
.setAction(PBHelperClient.convert(action)).build();
- try {
- final RollingUpgradeResponseProto proto =
- rpcProxy.rollingUpgrade(null, r);
- if (proto.hasRollingUpgradeInfo()) {
- return PBHelperClient.convert(proto.getRollingUpgradeInfo());
- }
- return null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ final RollingUpgradeResponseProto proto =
+ ipc(() -> rpcProxy.rollingUpgrade(null, r));
+ if (proto.hasRollingUpgradeInfo()) {
+ return PBHelperClient.convert(proto.getRollingUpgradeInfo());
}
+ return null;
}
@Override
@@ -937,24 +793,15 @@ public class ClientNamenodeProtocolTranslatorPB implements
ListCorruptFileBlocksRequestProto.newBuilder().setPath(path);
if (cookie != null)
req.setCookie(cookie);
- try {
- return PBHelperClient.convert(
- rpcProxy.listCorruptFileBlocks(null, req.build()).getCorrupt());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(
+ ipc(() -> rpcProxy.listCorruptFileBlocks(null, req.build())).getCorrupt());
}
@Override
public void metaSave(String filename) throws IOException {
MetaSaveRequestProto req = MetaSaveRequestProto.newBuilder()
.setFilename(filename).build();
- try {
- rpcProxy.metaSave(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
-
+ ipc(() -> rpcProxy.metaSave(null, req));
}
@Override
@@ -962,12 +809,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
GetFileInfoRequestProto req = GetFileInfoRequestProto.newBuilder()
.setSrc(src)
.build();
- try {
- GetFileInfoResponseProto res = rpcProxy.getFileInfo(null, req);
- return res.hasFs() ? PBHelperClient.convert(res.getFs()) : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetFileInfoResponseProto res = ipc(() -> rpcProxy.getFileInfo(null, req));
+ return res.hasFs() ? PBHelperClient.convert(res.getFs()) : null;
}
@Override
@@ -978,27 +821,19 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setSrc(src)
.setNeedBlockToken(needBlockToken)
.build();
- try {
- GetLocatedFileInfoResponseProto res =
- rpcProxy.getLocatedFileInfo(null, req);
- return (HdfsLocatedFileStatus) (res.hasFs()
- ? PBHelperClient.convert(res.getFs())
- : null);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetLocatedFileInfoResponseProto res =
+ ipc(() -> rpcProxy.getLocatedFileInfo(null, req));
+ return (HdfsLocatedFileStatus) (res.hasFs()
+ ? PBHelperClient.convert(res.getFs())
+ : null);
}
@Override
public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
GetFileLinkInfoRequestProto req = GetFileLinkInfoRequestProto.newBuilder()
.setSrc(src).build();
- try {
- GetFileLinkInfoResponseProto result = rpcProxy.getFileLinkInfo(null, req);
- return result.hasFs() ? PBHelperClient.convert(result.getFs()) : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetFileLinkInfoResponseProto result = ipc(() -> rpcProxy.getFileLinkInfo(null, req));
+ return result.hasFs() ? PBHelperClient.convert(result.getFs()) : null;
}
@Override
@@ -1007,12 +842,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
.newBuilder()
.setPath(path)
.build();
- try {
- return PBHelperClient.convert(rpcProxy.getContentSummary(null, req)
- .getSummary());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(ipc(() -> rpcProxy.getContentSummary(null, req))
+ .getSummary());
}
@Override
@@ -1027,11 +858,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
builder.setStorageType(PBHelperClient.convertStorageType(type));
}
final SetQuotaRequestProto req = builder.build();
- try {
- rpcProxy.setQuota(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.setQuota(null, req));
}
@Override
@@ -1040,11 +867,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
FsyncRequestProto req = FsyncRequestProto.newBuilder().setSrc(src)
.setClient(client).setLastBlockLength(lastBlockLength)
.setFileId(fileId).build();
- try {
- rpcProxy.fsync(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.fsync(null, req));
}
@Override
@@ -1054,11 +877,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setMtime(mtime)
.setAtime(atime)
.build();
- try {
- rpcProxy.setTimes(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.setTimes(null, req));
}
@Override
@@ -1070,23 +889,15 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setDirPerm(PBHelperClient.convert(dirPerm))
.setCreateParent(createParent)
.build();
- try {
- rpcProxy.createSymlink(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.createSymlink(null, req));
}
@Override
public String getLinkTarget(String path) throws IOException {
GetLinkTargetRequestProto req = GetLinkTargetRequestProto.newBuilder()
.setPath(path).build();
- try {
- GetLinkTargetResponseProto rsp = rpcProxy.getLinkTarget(null, req);
- return rsp.hasTargetPath() ? rsp.getTargetPath() : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetLinkTargetResponseProto rsp = ipc(() -> rpcProxy.getLinkTarget(null, req));
+ return rsp.hasTargetPath() ? rsp.getTargetPath() : null;
}
@Override
@@ -1097,12 +908,8 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setBlock(PBHelperClient.convert(block))
.setClientName(clientName)
.build();
- try {
- return PBHelperClient.convertLocatedBlockProto(
- rpcProxy.updateBlockForPipeline(null, req).getBlock());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convertLocatedBlockProto(
+ ipc(() -> rpcProxy.updateBlockForPipeline(null, req)).getBlock());
}
@Override
@@ -1116,11 +923,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.addAllNewNodes(Arrays.asList(PBHelperClient.convert(newNodes)))
.addAllStorageIDs(storageIDs == null ? null : Arrays.asList(storageIDs))
.build();
- try {
- rpcProxy.updatePipeline(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.updatePipeline(null, req));
}
@Override
@@ -1130,14 +933,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
.newBuilder()
.setRenewer(renewer == null ? "" : renewer.toString())
.build();
- try {
- GetDelegationTokenResponseProto resp =
- rpcProxy.getDelegationToken(null, req);
- return resp.hasToken() ?
- PBHelperClient.convertDelegationToken(resp.getToken()) : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetDelegationTokenResponseProto resp =
+ ipc(() -> rpcProxy.getDelegationToken(null, req));
+ return resp.hasToken() ?
+ PBHelperClient.convertDelegationToken(resp.getToken()) : null;
}
@Override
@@ -1147,11 +946,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
RenewDelegationTokenRequestProto.newBuilder().
setToken(PBHelperClient.convert(token)).
build();
- try {
- return rpcProxy.renewDelegationToken(null, req).getNewExpiryTime();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.renewDelegationToken(null, req)).getNewExpiryTime();
}
@Override
@@ -1161,11 +956,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.newBuilder()
.setToken(PBHelperClient.convert(token))
.build();
- try {
- rpcProxy.cancelDelegationToken(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.cancelDelegationToken(null, req));
}
@Override
@@ -1174,11 +965,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
SetBalancerBandwidthRequestProto.newBuilder()
.setBandwidth(bandwidth)
.build();
- try {
- rpcProxy.setBalancerBandwidth(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.setBalancerBandwidth(null, req));
}
@Override
@@ -1190,14 +977,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override
public DataEncryptionKey getDataEncryptionKey() throws IOException {
- try {
- GetDataEncryptionKeyResponseProto rsp = rpcProxy.getDataEncryptionKey(
- null, VOID_GET_DATA_ENCRYPTIONKEY_REQUEST);
- return rsp.hasDataEncryptionKey() ?
- PBHelperClient.convert(rsp.getDataEncryptionKey()) : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetDataEncryptionKeyResponseProto rsp = ipc(() -> rpcProxy.getDataEncryptionKey(
+ null, VOID_GET_DATA_ENCRYPTIONKEY_REQUEST));
+ return rsp.hasDataEncryptionKey() ?
+ PBHelperClient.convert(rsp.getDataEncryptionKey()) : null;
}
@@ -1205,11 +988,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
public boolean isFileClosed(String src) throws IOException {
IsFileClosedRequestProto req = IsFileClosedRequestProto.newBuilder()
.setSrc(src).build();
- try {
- return rpcProxy.isFileClosed(null, req).getResult();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.isFileClosed(null, req)).getResult();
}
@Override
@@ -1226,11 +1005,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
builder.setSnapshotName(snapshotName);
}
final CreateSnapshotRequestProto req = builder.build();
- try {
- return rpcProxy.createSnapshot(null, req).getSnapshotPath();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.createSnapshot(null, req)).getSnapshotPath();
}
@Override
@@ -1238,33 +1013,21 @@ public class ClientNamenodeProtocolTranslatorPB implements
throws IOException {
DeleteSnapshotRequestProto req = DeleteSnapshotRequestProto.newBuilder()
.setSnapshotRoot(snapshotRoot).setSnapshotName(snapshotName).build();
- try {
- rpcProxy.deleteSnapshot(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.deleteSnapshot(null, req));
}
@Override
public void allowSnapshot(String snapshotRoot) throws IOException {
AllowSnapshotRequestProto req = AllowSnapshotRequestProto.newBuilder()
.setSnapshotRoot(snapshotRoot).build();
- try {
- rpcProxy.allowSnapshot(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.allowSnapshot(null, req));
}
@Override
public void disallowSnapshot(String snapshotRoot) throws IOException {
DisallowSnapshotRequestProto req = DisallowSnapshotRequestProto
.newBuilder().setSnapshotRoot(snapshotRoot).build();
- try {
- rpcProxy.disallowSnapshot(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.disallowSnapshot(null, req));
}
@Override
@@ -1273,11 +1036,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
RenameSnapshotRequestProto req = RenameSnapshotRequestProto.newBuilder()
.setSnapshotRoot(snapshotRoot).setSnapshotOldName(snapshotOldName)
.setSnapshotNewName(snapshotNewName).build();
- try {
- rpcProxy.renameSnapshot(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.renameSnapshot(null, req));
}
@Override
@@ -1285,17 +1044,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
throws IOException {
GetSnapshottableDirListingRequestProto req =
GetSnapshottableDirListingRequestProto.newBuilder().build();
- try {
- GetSnapshottableDirListingResponseProto result = rpcProxy
- .getSnapshottableDirListing(null, req);
+ GetSnapshottableDirListingResponseProto result = ipc(() -> rpcProxy
+ .getSnapshottableDirListing(null, req));
- if (result.hasSnapshottableDirList()) {
- return PBHelperClient.convert(result.getSnapshottableDirList());
- }
- return null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ if (result.hasSnapshottableDirList()) {
+ return PBHelperClient.convert(result.getSnapshottableDirList());
}
+ return null;
}
@Override
@@ -1304,14 +1059,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
GetSnapshotDiffReportRequestProto req = GetSnapshotDiffReportRequestProto
.newBuilder().setSnapshotRoot(snapshotRoot)
.setFromSnapshot(fromSnapshot).setToSnapshot(toSnapshot).build();
- try {
- GetSnapshotDiffReportResponseProto result =
- rpcProxy.getSnapshotDiffReport(null, req);
+ GetSnapshotDiffReportResponseProto result =
+ ipc(() -> rpcProxy.getSnapshotDiffReport(null, req));
- return PBHelperClient.convert(result.getDiffReport());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(result.getDiffReport());
}
@Override
@@ -1325,58 +1076,42 @@ public class ClientNamenodeProtocolTranslatorPB implements
HdfsProtos.SnapshotDiffReportCursorProto.newBuilder()
.setStartPath(PBHelperClient.getByteString(startPath))
.setIndex(index).build()).build();
- try {
- GetSnapshotDiffReportListingResponseProto result =
- rpcProxy.getSnapshotDiffReportListing(null, req);
+ GetSnapshotDiffReportListingResponseProto result =
+ ipc(() -> rpcProxy.getSnapshotDiffReportListing(null, req));
- return PBHelperClient.convert(result.getDiffReport());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(result.getDiffReport());
}
@Override
public long addCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags) throws IOException {
- try {
- AddCacheDirectiveRequestProto.Builder builder =
- AddCacheDirectiveRequestProto.newBuilder().
- setInfo(PBHelperClient.convert(directive));
- if (!flags.isEmpty()) {
- builder.setCacheFlags(PBHelperClient.convertCacheFlags(flags));
- }
- return rpcProxy.addCacheDirective(null, builder.build()).getId();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ AddCacheDirectiveRequestProto.Builder builder =
+ AddCacheDirectiveRequestProto.newBuilder().
+ setInfo(PBHelperClient.convert(directive));
+ if (!flags.isEmpty()) {
+ builder.setCacheFlags(PBHelperClient.convertCacheFlags(flags));
}
+ return ipc(() -> rpcProxy.addCacheDirective(null, builder.build())).getId();
}
@Override
public void modifyCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags) throws IOException {
- try {
- ModifyCacheDirectiveRequestProto.Builder builder =
- ModifyCacheDirectiveRequestProto.newBuilder().
- setInfo(PBHelperClient.convert(directive));
- if (!flags.isEmpty()) {
- builder.setCacheFlags(PBHelperClient.convertCacheFlags(flags));
- }
- rpcProxy.modifyCacheDirective(null, builder.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ ModifyCacheDirectiveRequestProto.Builder builder =
+ ModifyCacheDirectiveRequestProto.newBuilder().
+ setInfo(PBHelperClient.convert(directive));
+ if (!flags.isEmpty()) {
+ builder.setCacheFlags(PBHelperClient.convertCacheFlags(flags));
}
+ ipc(() -> rpcProxy.modifyCacheDirective(null, builder.build()));
}
@Override
public void removeCacheDirective(long id)
throws IOException {
- try {
- rpcProxy.removeCacheDirective(null,
- RemoveCacheDirectiveRequestProto.newBuilder().
- setId(id).build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.removeCacheDirective(null,
+ RemoveCacheDirectiveRequestProto.newBuilder().
+ setId(id).build()));
}
private static class BatchedCacheEntries
@@ -1410,16 +1145,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
if (filter == null) {
filter = new CacheDirectiveInfo.Builder().build();
}
- try {
- return new BatchedCacheEntries(
- rpcProxy.listCacheDirectives(null,
- ListCacheDirectivesRequestProto.newBuilder().
- setPrevId(prevId).
- setFilter(PBHelperClient.convert(filter)).
- build()));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ CacheDirectiveInfo f = filter;
+ return new BatchedCacheEntries(
+ ipc(() -> rpcProxy.listCacheDirectives(null,
+ ListCacheDirectivesRequestProto.newBuilder().
+ setPrevId(prevId).
+ setFilter(PBHelperClient.convert(f)).
+ build())));
}
@Override
@@ -1427,11 +1159,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
AddCachePoolRequestProto.Builder builder =
AddCachePoolRequestProto.newBuilder();
builder.setInfo(PBHelperClient.convert(info));
- try {
- rpcProxy.addCachePool(null, builder.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.addCachePool(null, builder.build()));
}
@Override
@@ -1439,22 +1167,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
ModifyCachePoolRequestProto.Builder builder =
ModifyCachePoolRequestProto.newBuilder();
builder.setInfo(PBHelperClient.convert(req));
- try {
- rpcProxy.modifyCachePool(null, builder.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.modifyCachePool(null, builder.build()));
}
@Override
public void removeCachePool(String cachePoolName) throws IOException {
- try {
- rpcProxy.removeCachePool(null,
- RemoveCachePoolRequestProto.newBuilder().
- setPoolName(cachePoolName).build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.removeCachePool(null,
+ RemoveCachePoolRequestProto.newBuilder().
+ setPoolName(cachePoolName).build()));
}
private static class BatchedCachePoolEntries
@@ -1485,14 +1205,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
@Override
public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
throws IOException {
- try {
- return new BatchedCachePoolEntries(
- rpcProxy.listCachePools(null,
- ListCachePoolsRequestProto.newBuilder().
- setPrevPoolName(prevKey).build()));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return new BatchedCachePoolEntries(
+ ipc(() -> rpcProxy.listCachePools(null,
+ ListCachePoolsRequestProto.newBuilder().
+ setPrevPoolName(prevKey).build())));
}
@Override
@@ -1501,11 +1217,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
ModifyAclEntriesRequestProto req = ModifyAclEntriesRequestProto
.newBuilder().setSrc(src)
.addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec)).build();
- try {
- rpcProxy.modifyAclEntries(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.modifyAclEntries(null, req));
}
@Override
@@ -1514,33 +1226,21 @@ public class ClientNamenodeProtocolTranslatorPB implements
RemoveAclEntriesRequestProto req = RemoveAclEntriesRequestProto
.newBuilder().setSrc(src)
.addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec)).build();
- try {
- rpcProxy.removeAclEntries(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.removeAclEntries(null, req));
}
@Override
public void removeDefaultAcl(String src) throws IOException {
RemoveDefaultAclRequestProto req = RemoveDefaultAclRequestProto
.newBuilder().setSrc(src).build();
- try {
- rpcProxy.removeDefaultAcl(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.removeDefaultAcl(null, req));
}
@Override
public void removeAcl(String src) throws IOException {
RemoveAclRequestProto req = RemoveAclRequestProto.newBuilder()
.setSrc(src).build();
- try {
- rpcProxy.removeAcl(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.removeAcl(null, req));
}
@Override
@@ -1549,15 +1249,11 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setSrc(src)
.addAllAclSpec(PBHelperClient.convertAclEntryProto(aclSpec))
.build();
- try {
- if (Client.isAsynchronousMode()) {
- rpcProxy.setAcl(null, req);
- setAsyncReturnValue();
- } else {
- rpcProxy.setAcl(null, req);
- }
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ if (Client.isAsynchronousMode()) {
+ ipc(() -> rpcProxy.setAcl(null, req));
+ setAsyncReturnValue();
+ } else {
+ ipc(() -> rpcProxy.setAcl(null, req));
}
}
@@ -1589,7 +1285,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
return PBHelperClient.convert(rpcProxy.getAclStatus(null, req));
}
} catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ throw getRemoteException(e);
}
}
@@ -1603,11 +1299,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
builder.setKeyName(keyName);
}
CreateEncryptionZoneRequestProto req = builder.build();
- try {
- rpcProxy.createEncryptionZone(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.createEncryptionZone(null, req));
}
@Override
@@ -1616,16 +1308,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
GetEZForPathRequestProto.newBuilder();
builder.setSrc(src);
final GetEZForPathRequestProto req = builder.build();
- try {
- final EncryptionZonesProtos.GetEZForPathResponseProto response =
- rpcProxy.getEZForPath(null, req);
- if (response.hasZone()) {
- return PBHelperClient.convert(response.getZone());
- } else {
- return null;
- }
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ final EncryptionZonesProtos.GetEZForPathResponseProto response =
+ ipc(() -> rpcProxy.getEZForPath(null, req));
+ if (response.hasZone()) {
+ return PBHelperClient.convert(response.getZone());
+ } else {
+ return null;
}
}
@@ -1636,18 +1324,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
ListEncryptionZonesRequestProto.newBuilder()
.setId(id)
.build();
- try {
- EncryptionZonesProtos.ListEncryptionZonesResponseProto response =
- rpcProxy.listEncryptionZones(null, req);
- List<EncryptionZone> elements =
- Lists.newArrayListWithCapacity(response.getZonesCount());
- for (EncryptionZoneProto p : response.getZonesList()) {
- elements.add(PBHelperClient.convert(p));
- }
- return new BatchedListEntries<>(elements, response.getHasMore());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ EncryptionZonesProtos.ListEncryptionZonesResponseProto response =
+ ipc(() -> rpcProxy.listEncryptionZones(null, req));
+ List<EncryptionZone> elements =
+ Lists.newArrayListWithCapacity(response.getZonesCount());
+ for (EncryptionZoneProto p : response.getZonesList()) {
+ elements.add(PBHelperClient.convert(p));
}
+ return new BatchedListEntries<>(elements, response.getHasMore());
}
@Override
@@ -1660,11 +1344,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
builder.setEcPolicyName(ecPolicyName);
}
SetErasureCodingPolicyRequestProto req = builder.build();
- try {
- rpcProxy.setErasureCodingPolicy(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.setErasureCodingPolicy(null, req));
}
@Override
@@ -1673,11 +1353,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
UnsetErasureCodingPolicyRequestProto.newBuilder();
builder.setSrc(src);
UnsetErasureCodingPolicyRequestProto req = builder.build();
- try {
- rpcProxy.unsetErasureCodingPolicy(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.unsetErasureCodingPolicy(null, req));
}
@Override
@@ -1687,14 +1363,10 @@ public class ClientNamenodeProtocolTranslatorPB implements
GetECTopologyResultForPoliciesRequestProto.newBuilder();
builder.addAllPolicies(Arrays.asList(policyNames));
GetECTopologyResultForPoliciesRequestProto req = builder.build();
- try {
- GetECTopologyResultForPoliciesResponseProto response =
- rpcProxy.getECTopologyResultForPolicies(null, req);
- return PBHelperClient
- .convertECTopologyVerifierResultProto(response.getResponse());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetECTopologyResultForPoliciesResponseProto response =
+ ipc(() -> rpcProxy.getECTopologyResultForPolicies(null, req));
+ return PBHelperClient
+ .convertECTopologyVerifierResultProto(response.getResponse());
}
@Override
@@ -1704,11 +1376,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
ReencryptEncryptionZoneRequestProto.newBuilder();
builder.setZone(zone).setAction(PBHelperClient.convert(action));
ReencryptEncryptionZoneRequestProto req = builder.build();
- try {
- rpcProxy.reencryptEncryptionZone(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.reencryptEncryptionZone(null, req));
}
@Override
@@ -1716,18 +1384,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
throws IOException {
final ListReencryptionStatusRequestProto req =
ListReencryptionStatusRequestProto.newBuilder().setId(id).build();
- try {
- ListReencryptionStatusResponseProto response =
- rpcProxy.listReencryptionStatus(null, req);
- List<ZoneReencryptionStatus> elements =
- Lists.newArrayListWithCapacity(response.getStatusesCount());
- for (ZoneReencryptionStatusProto p : response.getStatusesList()) {
- elements.add(PBHelperClient.convert(p));
- }
- return new BatchedListEntries<>(elements, response.getHasMore());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ ListReencryptionStatusResponseProto response =
+ ipc(() -> rpcProxy.listReencryptionStatus(null, req));
+ List<ZoneReencryptionStatus> elements =
+ Lists.newArrayListWithCapacity(response.getStatusesCount());
+ for (ZoneReencryptionStatusProto p : response.getStatusesList()) {
+ elements.add(PBHelperClient.convert(p));
}
+ return new BatchedListEntries<>(elements, response.getHasMore());
}
@Override
@@ -1738,11 +1402,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
.setXAttr(PBHelperClient.convertXAttrProto(xAttr))
.setFlag(PBHelperClient.convert(flag))
.build();
- try {
- rpcProxy.setXAttr(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.setXAttr(null, req));
}
@Override
@@ -1754,11 +1414,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
builder.addAllXAttrs(PBHelperClient.convertXAttrProto(xAttrs));
}
GetXAttrsRequestProto req = builder.build();
- try {
- return PBHelperClient.convert(rpcProxy.getXAttrs(null, req));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(ipc(() -> rpcProxy.getXAttrs(null, req)));
}
@Override
@@ -1767,11 +1423,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
ListXAttrsRequestProto.newBuilder();
builder.setSrc(src);
ListXAttrsRequestProto req = builder.build();
- try {
- return PBHelperClient.convert(rpcProxy.listXAttrs(null, req));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(ipc(() -> rpcProxy.listXAttrs(null, req)));
}
@Override
@@ -1779,22 +1431,14 @@ public class ClientNamenodeProtocolTranslatorPB implements
RemoveXAttrRequestProto req = RemoveXAttrRequestProto
.newBuilder().setSrc(src)
.setXAttr(PBHelperClient.convertXAttrProto(xAttr)).build();
- try {
- rpcProxy.removeXAttr(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.removeXAttr(null, req));
}
@Override
public void checkAccess(String path, FsAction mode) throws IOException {
CheckAccessRequestProto req = CheckAccessRequestProto.newBuilder()
.setPath(path).setMode(PBHelperClient.convert(mode)).build();
- try {
- rpcProxy.checkAccess(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.checkAccess(null, req));
}
@Override
@@ -1802,66 +1446,42 @@ public class ClientNamenodeProtocolTranslatorPB implements
throws IOException {
SetStoragePolicyRequestProto req = SetStoragePolicyRequestProto
.newBuilder().setSrc(src).setPolicyName(policyName).build();
- try {
- rpcProxy.setStoragePolicy(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.setStoragePolicy(null, req));
}
@Override
public void unsetStoragePolicy(String src) throws IOException {
UnsetStoragePolicyRequestProto req = UnsetStoragePolicyRequestProto
.newBuilder().setSrc(src).build();
- try {
- rpcProxy.unsetStoragePolicy(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.unsetStoragePolicy(null, req));
}
@Override
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
GetStoragePolicyRequestProto request = GetStoragePolicyRequestProto
.newBuilder().setPath(path).build();
- try {
- return PBHelperClient.convert(rpcProxy.getStoragePolicy(null, request)
- .getStoragePolicy());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(ipc(() -> rpcProxy.getStoragePolicy(null, request))
+ .getStoragePolicy());
}
@Override
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
- try {
- GetStoragePoliciesResponseProto response = rpcProxy
- .getStoragePolicies(null, VOID_GET_STORAGE_POLICIES_REQUEST);
- return PBHelperClient.convertStoragePolicies(response.getPoliciesList());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetStoragePoliciesResponseProto response = ipc(() -> rpcProxy
+ .getStoragePolicies(null, VOID_GET_STORAGE_POLICIES_REQUEST));
+ return PBHelperClient.convertStoragePolicies(response.getPoliciesList());
}
public long getCurrentEditLogTxid() throws IOException {
GetCurrentEditLogTxidRequestProto req = GetCurrentEditLogTxidRequestProto
.getDefaultInstance();
- try {
- return rpcProxy.getCurrentEditLogTxid(null, req).getTxid();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.getCurrentEditLogTxid(null, req)).getTxid();
}
@Override
public EventBatchList getEditsFromTxid(long txid) throws IOException {
GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder()
.setTxid(txid).build();
- try {
- return PBHelperClient.convert(rpcProxy.getEditsFromTxid(null, req));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(ipc(() -> rpcProxy.getEditsFromTxid(null, req)));
}
@Override
@@ -1873,17 +1493,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
AddErasureCodingPoliciesRequestProto req =
AddErasureCodingPoliciesRequestProto.newBuilder()
.addAllEcPolicies(protos).build();
- try {
- AddErasureCodingPoliciesResponseProto rep = rpcProxy
- .addErasureCodingPolicies(null, req);
- AddErasureCodingPolicyResponse[] responses =
- rep.getResponsesList().stream()
- .map(PBHelperClient::convertAddErasureCodingPolicyResponse)
- .toArray(AddErasureCodingPolicyResponse[]::new);
- return responses;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ AddErasureCodingPoliciesResponseProto rep = ipc(() -> rpcProxy
+ .addErasureCodingPolicies(null, req));
+ AddErasureCodingPolicyResponse[] responses =
+ rep.getResponsesList().stream()
+ .map(PBHelperClient::convertAddErasureCodingPolicyResponse)
+ .toArray(AddErasureCodingPolicyResponse[]::new);
+ return responses;
}
@Override
@@ -1893,11 +1509,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
RemoveErasureCodingPolicyRequestProto.newBuilder();
builder.setEcPolicyName(ecPolicyName);
RemoveErasureCodingPolicyRequestProto req = builder.build();
- try {
- rpcProxy.removeErasureCodingPolicy(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.removeErasureCodingPolicy(null, req));
}
@Override
@@ -1907,11 +1519,7 @@ public class ClientNamenodeProtocolTranslatorPB implements
EnableErasureCodingPolicyRequestProto.newBuilder();
builder.setEcPolicyName(ecPolicyName);
EnableErasureCodingPolicyRequestProto req = builder.build();
- try {
- rpcProxy.enableErasureCodingPolicy(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.enableErasureCodingPolicy(null, req));
}
@Override
@@ -1921,45 +1529,33 @@ public class ClientNamenodeProtocolTranslatorPB implements
DisableErasureCodingPolicyRequestProto.newBuilder();
builder.setEcPolicyName(ecPolicyName);
DisableErasureCodingPolicyRequestProto req = builder.build();
- try {
- rpcProxy.disableErasureCodingPolicy(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.disableErasureCodingPolicy(null, req));
}
@Override
public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
throws IOException {
- try {
- GetErasureCodingPoliciesResponseProto response = rpcProxy
- .getErasureCodingPolicies(null, VOID_GET_EC_POLICIES_REQUEST);
- ErasureCodingPolicyInfo[] ecPolicies =
- new ErasureCodingPolicyInfo[response.getEcPoliciesCount()];
- int i = 0;
- for (ErasureCodingPolicyProto proto : response.getEcPoliciesList()) {
- ecPolicies[i++] =
- PBHelperClient.convertErasureCodingPolicyInfo(proto);
- }
- return ecPolicies;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ GetErasureCodingPoliciesResponseProto response = ipc(() -> rpcProxy
+ .getErasureCodingPolicies(null, VOID_GET_EC_POLICIES_REQUEST));
+ ErasureCodingPolicyInfo[] ecPolicies =
+ new ErasureCodingPolicyInfo[response.getEcPoliciesCount()];
+ int i = 0;
+ for (ErasureCodingPolicyProto proto : response.getEcPoliciesList()) {
+ ecPolicies[i++] =
+ PBHelperClient.convertErasureCodingPolicyInfo(proto);
}
+ return ecPolicies;
}
@Override
public Map<String, String> getErasureCodingCodecs() throws IOException {
- try {
- GetErasureCodingCodecsResponseProto response = rpcProxy
- .getErasureCodingCodecs(null, VOID_GET_EC_CODEC_REQUEST);
- Map<String, String> ecCodecs = new HashMap<>();
- for (CodecProto codec : response.getCodecList()) {
- ecCodecs.put(codec.getCodec(), codec.getCoders());
- }
- return ecCodecs;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ GetErasureCodingCodecsResponseProto response = ipc(() -> rpcProxy
+ .getErasureCodingCodecs(null, VOID_GET_EC_CODEC_REQUEST));
+ Map<String, String> ecCodecs = new HashMap<>();
+ for (CodecProto codec : response.getCodecList()) {
+ ecCodecs.put(codec.getCodec(), codec.getCoders());
}
+ return ecCodecs;
}
@Override
@@ -1967,29 +1563,21 @@ public class ClientNamenodeProtocolTranslatorPB implements
throws IOException {
GetErasureCodingPolicyRequestProto req =
GetErasureCodingPolicyRequestProto.newBuilder().setSrc(src).build();
- try {
- GetErasureCodingPolicyResponseProto response =
- rpcProxy.getErasureCodingPolicy(null, req);
- if (response.hasEcPolicy()) {
- return PBHelperClient.convertErasureCodingPolicy(
- response.getEcPolicy());
- }
- return null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ GetErasureCodingPolicyResponseProto response =
+ ipc(() -> rpcProxy.getErasureCodingPolicy(null, req));
+ if (response.hasEcPolicy()) {
+ return PBHelperClient.convertErasureCodingPolicy(
+ response.getEcPolicy());
}
+ return null;
}
@Override
public QuotaUsage getQuotaUsage(String path) throws IOException {
GetQuotaUsageRequestProto req =
GetQuotaUsageRequestProto.newBuilder().setPath(path).build();
- try {
- return PBHelperClient.convert(rpcProxy.getQuotaUsage(null, req)
- .getUsage());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(ipc(() -> rpcProxy.getQuotaUsage(null, req))
+ .getUsage());
}
@Deprecated
@@ -2010,51 +1598,35 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
req.setPath(path);
- try {
- ListOpenFilesResponseProto response =
- rpcProxy.listOpenFiles(null, req.build());
- List<OpenFileEntry> openFileEntries =
- Lists.newArrayListWithCapacity(response.getEntriesCount());
- for (OpenFilesBatchResponseProto p : response.getEntriesList()) {
- openFileEntries.add(PBHelperClient.convert(p));
- }
- return new BatchedListEntries<>(openFileEntries, response.getHasMore());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ ListOpenFilesResponseProto response =
+ ipc(() -> rpcProxy.listOpenFiles(null, req.build()));
+ List<OpenFileEntry> openFileEntries =
+ Lists.newArrayListWithCapacity(response.getEntriesCount());
+ for (OpenFilesBatchResponseProto p : response.getEntriesList()) {
+ openFileEntries.add(PBHelperClient.convert(p));
}
+ return new BatchedListEntries<>(openFileEntries, response.getHasMore());
}
@Override
public void msync() throws IOException {
MsyncRequestProto.Builder req = MsyncRequestProto.newBuilder();
- try {
- rpcProxy.msync(null, req.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.msync(null, req.build()));
}
@Override
public void satisfyStoragePolicy(String src) throws IOException {
SatisfyStoragePolicyRequestProto req =
SatisfyStoragePolicyRequestProto.newBuilder().setSrc(src).build();
- try {
- rpcProxy.satisfyStoragePolicy(null, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.satisfyStoragePolicy(null, req));
}
@Override
public DatanodeInfo[] getSlowDatanodeReport() throws IOException {
GetSlowDatanodeReportRequestProto req =
GetSlowDatanodeReportRequestProto.newBuilder().build();
- try {
- return PBHelperClient.convert(
- rpcProxy.getSlowDatanodeReport(null, req).getDatanodeInfoProtoList());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelperClient.convert(
+ ipc(() -> rpcProxy.getSlowDatanodeReport(null, req)).getDatanodeInfoProtoList());
}
@Override
@@ -2062,22 +1634,18 @@ public class ClientNamenodeProtocolTranslatorPB implements
throws IOException {
HAServiceStateRequestProto req =
HAServiceStateRequestProto.newBuilder().build();
- try {
- HAServiceStateProto res =
- rpcProxy.getHAServiceState(null, req).getState();
- switch(res) {
- case ACTIVE:
- return HAServiceProtocol.HAServiceState.ACTIVE;
- case STANDBY:
- return HAServiceProtocol.HAServiceState.STANDBY;
- case OBSERVER:
- return HAServiceProtocol.HAServiceState.OBSERVER;
- case INITIALIZING:
- default:
- return HAServiceProtocol.HAServiceState.INITIALIZING;
- }
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ HAServiceStateProto res =
+ ipc(() -> rpcProxy.getHAServiceState(null, req)).getState();
+ switch(res) {
+ case ACTIVE:
+ return HAServiceProtocol.HAServiceState.ACTIVE;
+ case STANDBY:
+ return HAServiceProtocol.HAServiceState.STANDBY;
+ case OBSERVER:
+ return HAServiceProtocol.HAServiceState.OBSERVER;
+ case INITIALIZING:
+ default:
+ return HAServiceProtocol.HAServiceState.INITIALIZING;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index b3932f9..fb0fce7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -207,7 +207,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.erasurecode.ECSchema;
-import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.internal.ShadedProtobufHelper;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ChunkedArrayList;
@@ -234,7 +234,7 @@ public class PBHelperClient {
FsAction.values();
private static ByteString getFixedByteString(String key) {
- return ProtobufHelper.getFixedByteString(key);
+ return ShadedProtobufHelper.getFixedByteString(key);
}
/**
@@ -257,7 +257,8 @@ public class PBHelperClient {
public static ByteString getByteString(byte[] bytes) {
// return singleton to reduce object allocation
- return ProtobufHelper.getByteString(bytes);
+ // return singleton to reduce object allocation
+ return ShadedProtobufHelper.getByteString(bytes);
}
public static ShmId convert(ShortCircuitShmIdProto shmId) {
@@ -325,7 +326,7 @@ public class PBHelperClient {
}
public static TokenProto convert(Token<?> tok) {
- return ProtobufHelper.protoFromToken(tok);
+ return ShadedProtobufHelper.protoFromToken(tok);
}
public static ShortCircuitShmIdProto convert(ShmId shmId) {
@@ -803,8 +804,8 @@ public class PBHelperClient {
public static Token<BlockTokenIdentifier> convert(
TokenProto blockToken) {
- return (Token<BlockTokenIdentifier>) ProtobufHelper
- .tokenFromProto(blockToken);
+ return (Token<BlockTokenIdentifier>) ShadedProtobufHelper.tokenFromProto(
+ blockToken);
}
// DatanodeId
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolTranslatorPB.java
index ce8a89b..eadee12 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ReconfigurationProtocolTranslatorPB.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.GetRe
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ListReconfigurablePropertiesResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.StartReconfigurationRequestProto;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
@@ -44,7 +43,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/**
* This class is the client side translator to translate the requests made on
@@ -102,37 +102,25 @@ public class ReconfigurationProtocolTranslatorPB implements
@Override
public void startReconfiguration() throws IOException {
- try {
- rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.startReconfiguration(NULL_CONTROLLER, VOID_START_RECONFIG));
}
@Override
public ReconfigurationTaskStatus getReconfigurationStatus()
throws IOException {
- try {
- return ReconfigurationProtocolUtils.getReconfigurationStatus(
- rpcProxy
- .getReconfigurationStatus(
- NULL_CONTROLLER,
- VOID_GET_RECONFIG_STATUS));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ReconfigurationProtocolUtils.getReconfigurationStatus(
+ ipc(() -> rpcProxy
+ .getReconfigurationStatus(
+ NULL_CONTROLLER,
+ VOID_GET_RECONFIG_STATUS)));
}
@Override
public List<String> listReconfigurableProperties() throws IOException {
ListReconfigurablePropertiesResponseProto response;
- try {
- response = rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
- VOID_LIST_RECONFIGURABLE_PROPERTIES);
- return response.getNameList();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ response = ipc(() -> rpcProxy.listReconfigurableProperties(NULL_CONTROLLER,
+ VOID_LIST_RECONFIGURABLE_PROPERTIES));
+ return response.getNameList();
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
index 6faa174..18ab613 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/pom.xml
@@ -151,7 +151,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
- <scope>compile</scope>
+ <scope>${transient.protobuf2.scope}</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java
index 6c26217..987b3ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/protocolPB/RouterAdminProtocolTranslatorPB.java
@@ -97,7 +97,6 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMou
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.RemoveMountTableEntryResponsePBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryRequestPBImpl;
import org.apache.hadoop.hdfs.server.federation.store.protocol.impl.pb.UpdateMountTableEntryResponsePBImpl;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@@ -105,6 +104,8 @@ import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.getRemoteException;
+
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
* while translating from the parameter types used in ClientProtocol to the
@@ -150,7 +151,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.addMountTableEntry(null, proto);
return new AddMountTableEntryResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -165,7 +167,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.updateMountTableEntry(null, proto);
return new UpdateMountTableEntryResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -180,7 +183,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.removeMountTableEntry(null, proto);
return new RemoveMountTableEntryResponsePBImpl(responseProto);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -195,7 +199,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.getMountTableEntries(null, proto);
return new GetMountTableEntriesResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -209,7 +214,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.enterSafeMode(null, proto);
return new EnterSafeModeResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -223,7 +229,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.leaveSafeMode(null, proto);
return new LeaveSafeModeResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -237,7 +244,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.getSafeMode(null, proto);
return new GetSafeModeResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -252,7 +260,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.disableNameservice(null, proto);
return new DisableNameserviceResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -267,7 +276,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.enableNameservice(null, proto);
return new EnableNameserviceResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -281,7 +291,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.getDisabledNameservices(null, proto);
return new GetDisabledNameservicesResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -296,7 +307,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.refreshMountTableEntries(null, proto);
return new RefreshMountTableEntriesResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -311,7 +323,8 @@ public class RouterAdminProtocolTranslatorPB
rpcProxy.getDestination(null, proto);
return new GetDestinationResponsePBImpl(response);
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
@@ -325,7 +338,8 @@ public class RouterAdminProtocolTranslatorPB
return new RefreshSuperUserGroupsConfigurationResponsePBImpl(response)
.getStatus();
} catch (ServiceException e) {
- throw new IOException(ProtobufHelper.getRemoteException(e).getMessage());
+
+ throw new IOException(getRemoteException(e).getMessage());
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
index 4dd0693..5b42700 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
@@ -194,9 +194,9 @@ public class RouterAdminServer extends AbstractService
RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService.
newReflectiveBlockingService(refreshCallQueueXlator);
- DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, adminServer);
- DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, adminServer);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 289848d..39b7a67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -305,11 +305,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
.build();
// Add all the RPC protocols that the Router implements
- DFSUtil.addPBProtocol(
+ DFSUtil.addInternalPBProtocol(
conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer);
- DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, this.rpcServer);
- DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, this.rpcServer);
// Set service-level authorization security policy
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
index a4755c2..9b62e9f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java
@@ -199,7 +199,7 @@ public class MockNamenode {
BlockingService nnProtoPbService =
NamenodeProtocolService.newReflectiveBlockingService(
nnProtoXlator);
- DFSUtil.addPBProtocol(
+ DFSUtil.addInternalPBProtocol(
conf, NamenodeProtocolPB.class, nnProtoPbService, rpcServer);
DatanodeProtocolServerSideTranslatorPB dnProtoPbXlator =
@@ -207,7 +207,7 @@ public class MockNamenode {
BlockingService dnProtoPbService =
DatanodeProtocolService.newReflectiveBlockingService(
dnProtoPbXlator);
- DFSUtil.addPBProtocol(
+ DFSUtil.addInternalPBProtocol(
conf, DatanodeProtocolPB.class, dnProtoPbService, rpcServer);
HAServiceProtocolServerSideTranslatorPB haServiceProtoXlator =
@@ -215,7 +215,7 @@ public class MockNamenode {
BlockingService haProtoPbService =
HAServiceProtocolService.newReflectiveBlockingService(
haServiceProtoXlator);
- DFSUtil.addPBProtocol(
+ DFSUtil.addInternalPBProtocol(
conf, HAServiceProtocolPB.class, haProtoPbService, rpcServer);
this.rpcServer.addTerseExceptions(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
index 22dba01..0d8a8e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml
@@ -135,7 +135,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
- <scope>compile</scope>
+ <scope>${transient.protobuf2.scope}</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 8f5e05f..3a354cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -67,6 +67,7 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
@@ -1312,7 +1313,30 @@ public class DFSUtil {
}
/**
- * Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server}
+ * Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server}.
+ * This method is for exclusive use by the hadoop libraries, as its signature
+ * changes with the version of the shaded protobuf library it has been built with.
+ * @param conf configuration
+ * @param protocol Protocol interface
+ * @param service service that implements the protocol
+ * @param server RPC server to which the protocol & implementation is
+ * added to
+ * @throws IOException failure
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public static void addInternalPBProtocol(Configuration conf,
+ Class<?> protocol,
+ BlockingService service,
+ RPC.Server server) throws IOException {
+ RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine2.class);
+ server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
+ }
+
+ /**
+ * Add protobuf based protocol to the {@link org.apache.hadoop.ipc.RPC.Server}.
+ * Deprecated as it will only reliably compile if an unshaded protobuf library
+ * is also on the classpath.
* @param conf configuration
* @param protocol Protocol interface
* @param service service that implements the protocol
@@ -1320,17 +1344,17 @@ public class DFSUtil {
* added to
* @throws IOException
*/
+ @Deprecated
public static void addPBProtocol(Configuration conf, Class<?> protocol,
BlockingService service, RPC.Server server) throws IOException {
- RPC.setProtocolEngine(conf, protocol, ProtobufRpcEngine2.class);
- server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, protocol, service);
+ addInternalPBProtocol(conf, protocol, service, server);
}
/**
* Add protobuf based protocol to the {@link RPC.Server}.
* This engine uses Protobuf 2.5.0. Recommended to upgrade to
* Protobuf 3.x from hadoop-thirdparty and use
- * {@link DFSUtil#addPBProtocol(Configuration, Class, BlockingService,
+ * {@link DFSUtil#addInternalPBProtocol(Configuration, Class, BlockingService,
* RPC.Server)}.
* @param conf configuration
* @param protocol Protocol interface
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java
index 220e9e2..cdeb827 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeLifelineProtocolClientSideTranslatorPB.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeLifelineProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
@@ -38,7 +37,8 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/**
* This class is the client side translator to translate the requests made on
@@ -96,11 +96,7 @@ public class DatanodeLifelineProtocolClientSideTranslatorPB implements
builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary(
volumeFailureSummary));
}
- try {
- rpcProxy.sendLifeline(NULL_CONTROLLER, builder.build());
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.sendLifeline(NULL_CONTROLLER, builder.build()));
}
@Override // ProtocolMetaInterface
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index add19e9..e9df4df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -61,7 +61,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
@@ -71,10 +70,11 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import javax.annotation.Nonnull;
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
+
/**
* This class is the client side translator to translate the requests made on
* {@link DatanodeProtocol} interfaces to the RPC server implementing
@@ -123,11 +123,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
RegisterDatanodeRequestProto.Builder builder = RegisterDatanodeRequestProto
.newBuilder().setRegistration(PBHelper.convert(registration));
RegisterDatanodeResponseProto resp;
- try {
- resp = rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build());
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ resp = ipc(() -> rpcProxy.registerDatanode(NULL_CONTROLLER, builder.build()));
+
return PBHelper.convert(resp.getRegistration());
}
@@ -164,11 +161,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
}
HeartbeatResponseProto resp;
- try {
- resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ resp = ipc(() -> rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()));
+
DatanodeCommand[] cmds = new DatanodeCommand[resp.getCmdsList().size()];
int index = 0;
for (DatanodeCommandProto p : resp.getCmdsList()) {
@@ -214,11 +208,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
}
builder.setContext(PBHelper.convert(context));
BlockReportResponseProto resp;
- try {
- resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ resp = ipc(() -> rpcProxy.blockReport(NULL_CONTROLLER, builder.build()));
return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null;
}
@@ -234,11 +224,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
}
CacheReportResponseProto resp;
- try {
- resp = rpcProxy.cacheReport(NULL_CONTROLLER, builder.build());
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ resp = ipc(() -> rpcProxy.cacheReport(NULL_CONTROLLER, builder.build()));
if (resp.hasCmd()) {
return PBHelper.convert(resp.getCmd());
}
@@ -263,11 +249,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
}
builder.addBlocks(repBuilder.build());
}
- try {
- rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build());
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.blockReceivedAndDeleted(NULL_CONTROLLER, builder.build()));
}
@Override
@@ -276,21 +258,13 @@ public class DatanodeProtocolClientSideTranslatorPB implements
ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder()
.setRegistartion(PBHelper.convert(registration))
.setErrorCode(errorCode).setMsg(msg).build();
- try {
- rpcProxy.errorReport(NULL_CONTROLLER, req);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.errorReport(NULL_CONTROLLER, req));
}
@Override
public NamespaceInfo versionRequest() throws IOException {
- try {
- return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER,
- VOID_VERSION_REQUEST).getInfo());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelper.convert(ipc(() -> rpcProxy.versionRequest(NULL_CONTROLLER,
+ VOID_VERSION_REQUEST).getInfo()));
}
@Override
@@ -301,11 +275,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
builder.addBlocks(i, PBHelperClient.convertLocatedBlock(blocks[i]));
}
ReportBadBlocksRequestProto req = builder.build();
- try {
- rpcProxy.reportBadBlocks(NULL_CONTROLLER, req);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.reportBadBlocks(NULL_CONTROLLER, req));
}
@Override
@@ -326,11 +296,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
builder.addNewTargetStorages(newtargetstorages[i]);
}
CommitBlockSynchronizationRequestProto req = builder.build();
- try {
- rpcProxy.commitBlockSynchronization(NULL_CONTROLLER, req);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.commitBlockSynchronization(NULL_CONTROLLER, req));
}
@Override // ProtocolMetaInterface
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
index f00cfd4..9d29e09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InMemoryAliasMapProtocolClientSideTranslatorPB.java
@@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hdfs.protocolPB;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -30,7 +29,6 @@ import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMapProtocol;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.namenode.ha.AbstractNNFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.InMemoryAliasMapFailoverProxyProvider;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
@@ -54,6 +52,7 @@ import static org.apache.hadoop.hdfs.DFSUtilClient.getNameServiceIds;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX;
import static org.apache.hadoop.hdfs.protocol.proto.AliasMapProtocolProtos.*;
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.*;
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/**
* This class is the client side translator to translate requests made to the
@@ -136,29 +135,24 @@ public class InMemoryAliasMapProtocolClientSideTranslatorPB
builder.setMarker(PBHelperClient.convert(marker.get()));
}
ListRequestProto request = builder.build();
- try {
- ListResponseProto response = rpcProxy.list(null, request);
- List<KeyValueProto> fileRegionsList = response.getFileRegionsList();
-
- List<FileRegion> fileRegions = fileRegionsList
- .stream()
- .map(kv -> new FileRegion(
- PBHelperClient.convert(kv.getKey()),
- PBHelperClient.convert(kv.getValue())
- ))
- .collect(Collectors.toList());
- BlockProto nextMarker = response.getNextMarker();
-
- if (nextMarker.isInitialized()) {
- return new InMemoryAliasMap.IterationResult(fileRegions,
- Optional.of(PBHelperClient.convert(nextMarker)));
- } else {
- return new InMemoryAliasMap.IterationResult(fileRegions,
- Optional.empty());
- }
-
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ ListResponseProto response = ipc(() -> rpcProxy.list(null, request));
+ List<KeyValueProto> fileRegionsList = response.getFileRegionsList();
+
+ List<FileRegion> fileRegions = fileRegionsList
+ .stream()
+ .map(kv -> new FileRegion(
+ PBHelperClient.convert(kv.getKey()),
+ PBHelperClient.convert(kv.getValue())
+ ))
+ .collect(Collectors.toList());
+ BlockProto nextMarker = response.getNextMarker();
+
+ if (nextMarker.isInitialized()) {
+ return new InMemoryAliasMap.IterationResult(fileRegions,
+ Optional.of(PBHelperClient.convert(nextMarker)));
+ } else {
+ return new InMemoryAliasMap.IterationResult(fileRegions,
+ Optional.empty());
}
}
@@ -175,19 +169,15 @@ public class InMemoryAliasMapProtocolClientSideTranslatorPB
.newBuilder()
.setKey(PBHelperClient.convert(block))
.build();
- try {
- ReadResponseProto response = rpcProxy.read(null, request);
-
- ProvidedStorageLocationProto providedStorageLocation =
- response.getValue();
- if (providedStorageLocation.isInitialized()) {
- return Optional.of(PBHelperClient.convert(providedStorageLocation));
- }
- return Optional.empty();
+ ReadResponseProto response = ipc(() -> rpcProxy.read(null, request));
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ ProvidedStorageLocationProto providedStorageLocation =
+ response.getValue();
+ if (providedStorageLocation.isInitialized()) {
+ return Optional.of(PBHelperClient.convert(providedStorageLocation));
}
+ return Optional.empty();
+
}
@Override
@@ -206,22 +196,14 @@ public class InMemoryAliasMapProtocolClientSideTranslatorPB
.build())
.build();
- try {
- rpcProxy.write(null, request);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.write(null, request));
}
@Override
public String getBlockPoolId() throws IOException {
- try {
- BlockPoolResponseProto response = rpcProxy.getBlockPoolId(null,
- BlockPoolRequestProto.newBuilder().build());
- return response.getBlockPoolId();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ BlockPoolResponseProto response = ipc(() -> rpcProxy.getBlockPoolId(null,
+ BlockPoolRequestProto.newBuilder().build()));
+ return response.getBlockPoolId();
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
index 031b0e4..ae81be9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/InterDatanodeProtocolTranslatorPB.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateR
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
@@ -42,7 +41,8 @@ import org.apache.hadoop.ipc.RpcClientUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/**
* This class is the client side translator to translate the requests made on
@@ -79,11 +79,7 @@ public class InterDatanodeProtocolTranslatorPB implements
InitReplicaRecoveryRequestProto req = InitReplicaRecoveryRequestProto
.newBuilder().setBlock(PBHelper.convert(rBlock)).build();
InitReplicaRecoveryResponseProto resp;
- try {
- resp = rpcProxy.initReplicaRecovery(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ resp = ipc(() -> rpcProxy.initReplicaRecovery(NULL_CONTROLLER, req));
if (!resp.getReplicaFound()) {
// No replica found on the remote node.
return null;
@@ -108,12 +104,9 @@ public class InterDatanodeProtocolTranslatorPB implements
.setBlock(PBHelperClient.convert(oldBlock))
.setNewLength(newLength).setNewBlockId(newBlockId)
.setRecoveryId(recoveryId).build();
- try {
- return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req
- ).getStorageUuid();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req)
+ .getStorageUuid());
+
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java
index 9310dd3..b0a326e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java
@@ -29,13 +29,12 @@ import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegme
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/**
* This class is the client side translator to translate the requests made on
@@ -69,11 +68,17 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
.setNumTxns(numTxns)
.setRecords(PBHelperClient.getByteString(records))
.build();
- try {
- rpcProxy.journal(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.journal(NULL_CONTROLLER, req));
+ }
+
+ @Override
+ public FenceResponse fence(JournalInfo journalInfo, long epoch,
+ String fencerInfo) throws IOException {
+ FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch)
+ .setJournalInfo(PBHelper.convert(journalInfo)).build();
+ FenceResponseProto resp = ipc(() -> rpcProxy.fence(NULL_CONTROLLER, req));
+ return new FenceResponse(resp.getPreviousEpoch(),
+ resp.getLastTransactionId(), resp.getInSync());
}
@Override
@@ -84,25 +89,7 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
.setEpoch(epoch)
.setTxid(txid)
.build();
- try {
- rpcProxy.startLogSegment(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
- }
-
- @Override
- public FenceResponse fence(JournalInfo journalInfo, long epoch,
- String fencerInfo) throws IOException {
- FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch)
- .setJournalInfo(PBHelper.convert(journalInfo)).build();
- try {
- FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req);
- return new FenceResponse(resp.getPreviousEpoch(),
- resp.getLastTransactionId(), resp.getInSync());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.startLogSegment(NULL_CONTROLLER, req));
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
index 603e14d..60056b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/NamenodeProtocolTranslatorPB.java
@@ -51,14 +51,13 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/**
* This class is the client side translator to translate the requests made on
@@ -107,63 +106,39 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
GetBlocksRequestProto req = GetBlocksRequestProto.newBuilder()
.setDatanode(PBHelperClient.convert((DatanodeID)datanode)).setSize(size)
.setMinBlockSize(minBlockSize).build();
- try {
- return PBHelper.convert(rpcProxy.getBlocks(NULL_CONTROLLER, req)
- .getBlocks());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelper.convert(ipc(() -> rpcProxy.getBlocks(NULL_CONTROLLER, req)
+ .getBlocks()));
}
@Override
public ExportedBlockKeys getBlockKeys() throws IOException {
- try {
- GetBlockKeysResponseProto rsp = rpcProxy.getBlockKeys(NULL_CONTROLLER,
- VOID_GET_BLOCKKEYS_REQUEST);
- return rsp.hasKeys() ? PBHelper.convert(rsp.getKeys()) : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetBlockKeysResponseProto rsp = ipc(() -> rpcProxy.getBlockKeys(NULL_CONTROLLER,
+ VOID_GET_BLOCKKEYS_REQUEST));
+ return rsp.hasKeys() ? PBHelper.convert(rsp.getKeys()) : null;
}
@Override
public long getTransactionID() throws IOException {
- try {
- return rpcProxy.getTransactionId(NULL_CONTROLLER,
- VOID_GET_TRANSACTIONID_REQUEST).getTxId();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.getTransactionId(NULL_CONTROLLER,
+ VOID_GET_TRANSACTIONID_REQUEST).getTxId());
}
@Override
public long getMostRecentCheckpointTxId() throws IOException {
- try {
- return rpcProxy.getMostRecentCheckpointTxId(NULL_CONTROLLER,
- GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return ipc(() -> rpcProxy.getMostRecentCheckpointTxId(NULL_CONTROLLER,
+ GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId());
}
@Override
public CheckpointSignature rollEditLog() throws IOException {
- try {
- return PBHelper.convert(rpcProxy.rollEditLog(NULL_CONTROLLER,
- VOID_ROLL_EDITLOG_REQUEST).getSignature());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelper.convert(ipc(() -> rpcProxy.rollEditLog(NULL_CONTROLLER,
+ VOID_ROLL_EDITLOG_REQUEST).getSignature()));
}
@Override
public NamespaceInfo versionRequest() throws IOException {
- try {
- return PBHelper.convert(rpcProxy.versionRequest(NULL_CONTROLLER,
- VOID_VERSION_REQUEST).getInfo());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelper.convert(ipc(() -> rpcProxy.versionRequest(NULL_CONTROLLER,
+ VOID_VERSION_REQUEST).getInfo()));
}
@Override
@@ -172,11 +147,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
ErrorReportRequestProto req = ErrorReportRequestProto.newBuilder()
.setErrorCode(errorCode).setMsg(msg)
.setRegistration(PBHelper.convert(registration)).build();
- try {
- rpcProxy.errorReport(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.errorReport(NULL_CONTROLLER, req));
}
@Override
@@ -184,13 +155,9 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
NamenodeRegistration registration) throws IOException {
RegisterRequestProto req = RegisterRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)).build();
- try {
- return PBHelper.convert(
- rpcProxy.registerSubordinateNamenode(NULL_CONTROLLER, req)
- .getRegistration());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelper.convert(
+ ipc(() -> rpcProxy.registerSubordinateNamenode(NULL_CONTROLLER, req)
+ .getRegistration()));
}
@Override
@@ -199,11 +166,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
StartCheckpointRequestProto req = StartCheckpointRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration)).build();
NamenodeCommandProto cmd;
- try {
- cmd = rpcProxy.startCheckpoint(NULL_CONTROLLER, req).getCommand();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ cmd = ipc(() -> rpcProxy.startCheckpoint(NULL_CONTROLLER, req).getCommand());
return PBHelper.convert(cmd);
}
@@ -213,11 +176,7 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
EndCheckpointRequestProto req = EndCheckpointRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setSignature(PBHelper.convert(sig)).build();
- try {
- rpcProxy.endCheckpoint(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.endCheckpoint(NULL_CONTROLLER, req));
}
@Override
@@ -225,12 +184,8 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
throws IOException {
GetEditLogManifestRequestProto req = GetEditLogManifestRequestProto
.newBuilder().setSinceTxId(sinceTxId).build();
- try {
- return PBHelper.convert(rpcProxy.getEditLogManifest(NULL_CONTROLLER, req)
- .getManifest());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ return PBHelper.convert(ipc(() -> rpcProxy.getEditLogManifest(NULL_CONTROLLER, req)
+ .getManifest()));
}
@Override
@@ -244,38 +199,26 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
public boolean isUpgradeFinalized() throws IOException {
IsUpgradeFinalizedRequestProto req = IsUpgradeFinalizedRequestProto
.newBuilder().build();
- try {
- IsUpgradeFinalizedResponseProto response = rpcProxy.isUpgradeFinalized(
- NULL_CONTROLLER, req);
- return response.getIsUpgradeFinalized();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ IsUpgradeFinalizedResponseProto response = ipc(() -> rpcProxy.isUpgradeFinalized(
+ NULL_CONTROLLER, req));
+ return response.getIsUpgradeFinalized();
}
@Override
public boolean isRollingUpgrade() throws IOException {
IsRollingUpgradeRequestProto req = IsRollingUpgradeRequestProto
.newBuilder().build();
- try {
- IsRollingUpgradeResponseProto response = rpcProxy.isRollingUpgrade(
- NULL_CONTROLLER, req);
- return response.getIsRollingUpgrade();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ IsRollingUpgradeResponseProto response = ipc(() -> rpcProxy.isRollingUpgrade(
+ NULL_CONTROLLER, req));
+ return response.getIsRollingUpgrade();
}
@Override
public Long getNextSPSPath() throws IOException {
GetNextSPSPathRequestProto req =
GetNextSPSPathRequestProto.newBuilder().build();
- try {
- GetNextSPSPathResponseProto nextSPSPath =
- rpcProxy.getNextSPSPath(NULL_CONTROLLER, req);
- return nextSPSPath.hasSpsPath() ? nextSPSPath.getSpsPath() : null;
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ GetNextSPSPathResponseProto nextSPSPath =
+ ipc(() -> rpcProxy.getNextSPSPath(NULL_CONTROLLER, req));
+ return nextSPSPath.hasSpsPath() ? nextSPSPath.getSpsPath() : null;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java
index 79a133a..4544308 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java
@@ -20,14 +20,12 @@
package org.apache.hadoop.hdfs.qjournal.protocolPB;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
@@ -35,6 +33,8 @@ import org.apache.hadoop.ipc.RpcClientUtil;
import java.io.Closeable;
import java.io.IOException;
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
+
/**
* This class is the client side translator to translate the requests made on
* {@link InterQJournalProtocol} interfaces to the RPC server implementing
@@ -63,21 +63,16 @@ public class InterQJournalProtocolTranslatorPB implements ProtocolMetaInterface,
public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
throws IOException {
- try {
- GetEditLogManifestRequestProto.Builder req;
- req = GetEditLogManifestRequestProto.newBuilder()
- .setJid(convertJournalId(jid))
- .setSinceTxId(sinceTxId)
- .setInProgressOk(inProgressOk);
- if (nameServiceId !=null) {
- req.setNameServiceId(nameServiceId);
- }
- return rpcProxy.getEditLogManifestFromJournal(NULL_CONTROLLER,
- req.build()
- );
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ GetEditLogManifestRequestProto.Builder req;
+ req = GetEditLogManifestRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setSinceTxId(sinceTxId)
+ .setInProgressOk(inProgressOk);
+ if (nameServiceId !=null) {
+ req.setNameServiceId(nameServiceId);
}
+ return ipc(() -> rpcProxy.getEditLogManifestFromJournal(NULL_CONTROLLER,
+ req.build()));
}
private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
index 7da009c..54b9a8d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java
@@ -63,13 +63,12 @@ import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
-
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
/**
* This class is the client side translator to translate the requests made on
@@ -97,36 +96,28 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
@Override
public boolean isFormatted(String journalId,
String nameServiceId) throws IOException {
- try {
- IsFormattedRequestProto.Builder req = IsFormattedRequestProto.newBuilder()
- .setJid(convertJournalId(journalId));
- if (nameServiceId != null) {
- req.setNameServiceId(nameServiceId);
- }
-
- IsFormattedResponseProto resp = rpcProxy.isFormatted(
- NULL_CONTROLLER, req.build());
- return resp.getIsFormatted();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ IsFormattedRequestProto.Builder req = IsFormattedRequestProto.newBuilder()
+ .setJid(convertJournalId(journalId));
+ if (nameServiceId != null) {
+ req.setNameServiceId(nameServiceId);
}
+
+ IsFormattedResponseProto resp = ipc(() -> rpcProxy.isFormatted(
+ NULL_CONTROLLER, req.build()));
+ return resp.getIsFormatted();
}
@Override
public GetJournalStateResponseProto getJournalState(String jid,
String nameServiceId)
throws IOException {
- try {
- GetJournalStateRequestProto.Builder req = GetJournalStateRequestProto
- .newBuilder()
- .setJid(convertJournalId(jid));
- if (nameServiceId != null) {
- req.setNameServiceId(nameServiceId);
- }
- return rpcProxy.getJournalState(NULL_CONTROLLER, req.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ GetJournalStateRequestProto.Builder req = GetJournalStateRequestProto
+ .newBuilder()
+ .setJid(convertJournalId(jid));
+ if (nameServiceId != null) {
+ req.setNameServiceId(nameServiceId);
}
+ return ipc(() -> rpcProxy.getJournalState(NULL_CONTROLLER, req.build()));
}
private JournalIdProto convertJournalId(String jid) {
@@ -140,19 +131,15 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
String nameServiceId,
NamespaceInfo nsInfo,
boolean force) throws IOException {
- try {
- FormatRequestProto.Builder req = FormatRequestProto.newBuilder()
- .setJid(convertJournalId(jid))
- .setNsInfo(PBHelper.convert(nsInfo))
- .setForce(force);
- if(nameServiceId != null) {
- req.setNameServiceId(nameServiceId);
- }
-
- rpcProxy.format(NULL_CONTROLLER, req.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ FormatRequestProto.Builder req = FormatRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setNsInfo(PBHelper.convert(nsInfo))
+ .setForce(force);
+ if(nameServiceId != null) {
+ req.setNameServiceId(nameServiceId);
}
+
+ ipc(() -> rpcProxy.format(NULL_CONTROLLER, req.build()));
}
@Override
@@ -160,20 +147,16 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
String nameServiceId,
NamespaceInfo nsInfo,
long epoch) throws IOException {
- try {
- NewEpochRequestProto.Builder req = NewEpochRequestProto.newBuilder()
- .setJid(convertJournalId(jid))
- .setNsInfo(PBHelper.convert(nsInfo))
- .setEpoch(epoch);
-
- if(nameServiceId != null) {
- req.setNameServiceId(nameServiceId);
- }
+ NewEpochRequestProto.Builder req = NewEpochRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setNsInfo(PBHelper.convert(nsInfo))
+ .setEpoch(epoch);
- return rpcProxy.newEpoch(NULL_CONTROLLER, req.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ if(nameServiceId != null) {
+ req.setNameServiceId(nameServiceId);
}
+
+ return ipc(() -> rpcProxy.newEpoch(NULL_CONTROLLER, req.build()));
}
@Override
@@ -187,22 +170,14 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
.setNumTxns(numTxns)
.setRecords(PBHelperClient.getByteString(records))
.build();
- try {
- rpcProxy.journal(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.journal(NULL_CONTROLLER, req));
}
@Override
public void heartbeat(RequestInfo reqInfo) throws IOException {
- try {
- rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder()
- .setReqInfo(convert(reqInfo))
- .build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.heartbeat(NULL_CONTROLLER, HeartbeatRequestProto.newBuilder()
+ .setReqInfo(convert(reqInfo))
+ .build()));
}
private QJournalProtocolProtos.RequestInfoProto convert(
@@ -227,11 +202,7 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
.setReqInfo(convert(reqInfo))
.setTxid(txid).setLayoutVersion(layoutVersion)
.build();
- try {
- rpcProxy.startLogSegment(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.startLogSegment(NULL_CONTROLLER, req));
}
@Override
@@ -243,11 +214,7 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
.setStartTxId(startTxId)
.setEndTxId(endTxId)
.build();
- try {
- rpcProxy.finalizeLogSegment(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.finalizeLogSegment(NULL_CONTROLLER, req));
}
@Override
@@ -257,79 +224,58 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
.setReqInfo(convert(reqInfo))
.setMinTxIdToKeep(minTxIdToKeep)
.build();
- try {
- rpcProxy.purgeLogs(NULL_CONTROLLER, req);
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ ipc(() -> rpcProxy.purgeLogs(NULL_CONTROLLER, req));
}
@Override
public GetEditLogManifestResponseProto getEditLogManifest(
String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk) throws IOException {
- try {
- GetEditLogManifestRequestProto.Builder req;
- req = GetEditLogManifestRequestProto.newBuilder()
- .setJid(convertJournalId(jid))
- .setSinceTxId(sinceTxId)
- .setInProgressOk(inProgressOk);
- if (nameServiceId !=null) {
- req.setNameServiceId(nameServiceId);
- }
- return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
- req.build()
- );
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ GetEditLogManifestRequestProto.Builder req;
+ req = GetEditLogManifestRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setSinceTxId(sinceTxId)
+ .setInProgressOk(inProgressOk);
+ if (nameServiceId !=null) {
+ req.setNameServiceId(nameServiceId);
}
+ return ipc(() -> rpcProxy.getEditLogManifest(NULL_CONTROLLER,
+ req.build()));
}
@Override
public GetJournaledEditsResponseProto getJournaledEdits(String jid,
String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
- try {
- GetJournaledEditsRequestProto.Builder req =
- GetJournaledEditsRequestProto.newBuilder()
- .setJid(convertJournalId(jid))
- .setSinceTxId(sinceTxId)
- .setMaxTxns(maxTxns);
- if (nameServiceId != null) {
- req.setNameServiceId(nameServiceId);
- }
- return rpcProxy.getJournaledEdits(NULL_CONTROLLER, req.build());
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
+ GetJournaledEditsRequestProto.Builder req =
+ GetJournaledEditsRequestProto.newBuilder()
+ .setJid(convertJournalId(jid))
+ .setSinceTxId(sinceTxId)
+ .setMaxTxns(maxTxns);
+ if (nameServiceId != null) {
+ req.setNameServiceId(nameServiceId);
}
+ return ipc(() -> rpcProxy.getJournaledEdits(NULL_CONTROLLER, req.build()));
}
@Override
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
long segmentTxId) throws IOException {
- try {
- return rpcProxy.prepareRecovery(NULL_CONTROLLER,
- PrepareRecoveryRequestProto.newBuilder()
+ return ipc(() -> rpcProxy.prepareRecovery(NULL_CONTROLLER,
+ PrepareRecoveryRequestProto.newBuilder()
.setReqInfo(convert(reqInfo))
.setSegmentTxId(segmentTxId)
- .build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ .build()));
}
@Override
public void acceptRecovery(RequestInfo reqInfo,
SegmentStateProto stateToAccept, URL fromUrl) throws IOException {
- try {
- rpcProxy.acceptRecovery(NULL_CONTROLLER,
- AcceptRecoveryRequestProto.newBuilder()
+ ipc(() -> rpcProxy.acceptRecovery(NULL_CONTROLLER,
+ AcceptRecoveryRequestProto.newBuilder()
.setReqInfo(convert(reqInfo))
.setStateToAccept(stateToAccept)
.setFromURL(fromUrl.toExternalForm())
- .build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ .build()));
}
public boolean isMethodSupported(String methodName) throws IOException {
@@ -340,42 +286,30 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
@Override
public void doPreUpgrade(String jid) throws IOException {
- try {
- DoPreUpgradeRequestProto.Builder req;
- req = DoPreUpgradeRequestProto.newBuilder()
- .setJid(convertJournalId(jid));
- rpcProxy.doPreUpgrade(NULL_CONTROLLER, req.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ DoPreUpgradeRequestProto.Builder req;
+ req = DoPreUpgradeRequestProto.newBuilder()
+ .setJid(convertJournalId(jid));
+ ipc(() -> rpcProxy.doPreUpgrade(NULL_CONTROLLER, req.build()));
}
@Override
public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException {
- try {
- rpcProxy.doUpgrade(NULL_CONTROLLER,
- DoUpgradeRequestProto.newBuilder()
+ ipc(() -> rpcProxy.doUpgrade(NULL_CONTROLLER,
+ DoUpgradeRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.setSInfo(PBHelper.convert(sInfo))
- .build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
- }
+ .build()));
}
@Override
public void doFinalize(String jid, String nameServiceId) throws IOException {
- try {
- DoFinalizeRequestProto.Builder req = DoFinalizeRequestProto
- .newBuilder()
- .setJid(convertJournalId(jid));
- if (nameServiceId != null) {
- req.setNameServiceId(nameServiceId);
- }
- rpcProxy.doFinalize(NULL_CONTROLLER, req.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ DoFinalizeRequestProto.Builder req = DoFinalizeRequestProto
+ .newBuilder()
+ .setJid(convertJournalId(jid));
+ if (nameServiceId != null) {
+ req.setNameServiceId(nameServiceId);
}
+ ipc(() -> rpcProxy.doFinalize(NULL_CONTROLLER, req.build()));
}
@Override
@@ -384,37 +318,29 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
StorageInfo storage,
StorageInfo prevStorage,
int targetLayoutVersion) throws IOException {
- try {
- CanRollBackRequestProto.Builder req = CanRollBackRequestProto.newBuilder()
- .setJid(convertJournalId(journalId))
- .setStorage(PBHelper.convert(storage))
- .setPrevStorage(PBHelper.convert(prevStorage))
- .setTargetLayoutVersion(targetLayoutVersion);
- if (nameServiceId != null) {
- req.setNameServiceId(nameServiceId);
- }
- CanRollBackResponseProto response = rpcProxy.canRollBack(
- NULL_CONTROLLER, req.build());
- return response.getCanRollBack();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ CanRollBackRequestProto.Builder req = CanRollBackRequestProto.newBuilder()
+ .setJid(convertJournalId(journalId))
+ .setStorage(PBHelper.convert(storage))
+ .setPrevStorage(PBHelper.convert(prevStorage))
+ .setTargetLayoutVersion(targetLayoutVersion);
+ if (nameServiceId != null) {
+ req.setNameServiceId(nameServiceId);
}
+ CanRollBackResponseProto response = ipc(() -> rpcProxy.canRollBack(
+ NULL_CONTROLLER, req.build()));
+ return response.getCanRollBack();
}
@Override
public void doRollback(String journalId,
String nameServiceId) throws IOException {
- try {
- DoRollbackRequestProto.Builder req = DoRollbackRequestProto.newBuilder()
- .setJid(convertJournalId(journalId));
+ DoRollbackRequestProto.Builder req = DoRollbackRequestProto.newBuilder()
+ .setJid(convertJournalId(journalId));
- if (nameServiceId != null) {
- req.setNameserviceId(nameServiceId);
- }
- rpcProxy.doRollback(NULL_CONTROLLER, req.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ if (nameServiceId != null) {
+ req.setNameserviceId(nameServiceId);
}
+ ipc(() -> rpcProxy.doRollback(NULL_CONTROLLER, req.build()));
}
@Override
@@ -422,37 +348,28 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
String nameServiceId,
long startTxId)
throws IOException {
- try {
- DiscardSegmentsRequestProto.Builder req = DiscardSegmentsRequestProto
- .newBuilder()
- .setJid(convertJournalId(journalId)).setStartTxId(startTxId);
+ DiscardSegmentsRequestProto.Builder req = DiscardSegmentsRequestProto
+ .newBuilder()
+ .setJid(convertJournalId(journalId)).setStartTxId(startTxId);
- if (nameServiceId != null) {
- req.setNameServiceId(nameServiceId);
- }
- rpcProxy.discardSegments(NULL_CONTROLLER, req.build());
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ if (nameServiceId != null) {
+ req.setNameServiceId(nameServiceId);
}
+ ipc(() -> rpcProxy.discardSegments(NULL_CONTROLLER, req.build()));
}
@Override
public Long getJournalCTime(String journalId,
String nameServiceId) throws IOException {
- try {
-
- GetJournalCTimeRequestProto.Builder req = GetJournalCTimeRequestProto
- .newBuilder()
- .setJid(convertJournalId(journalId));
- if(nameServiceId !=null) {
- req.setNameServiceId(nameServiceId);
- }
- GetJournalCTimeResponseProto response = rpcProxy.getJournalCTime(
- NULL_CONTROLLER, req.build());
- return response.getResultCTime();
- } catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ GetJournalCTimeRequestProto.Builder req = GetJournalCTimeRequestProto
+ .newBuilder()
+ .setJid(convertJournalId(journalId));
+ if(nameServiceId !=null) {
+ req.setNameServiceId(nameServiceId);
}
+ GetJournalCTimeResponseProto response = ipc(() -> rpcProxy.getJournalCTime(
+ NULL_CONTROLLER, req.build()));
+ return response.getResultCTime();
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
index d13c98f..a5ed0d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java
@@ -109,7 +109,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
BlockingService interQJournalProtocolService = InterQJournalProtocolService
.newReflectiveBlockingService(qJournalProtocolServerSideTranslatorPB);
- DFSUtil.addPBProtocol(confCopy, InterQJournalProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(confCopy, InterQJournalProtocolPB.class,
interQJournalProtocolService, server);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 96c4ad9..d047a4d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1350,14 +1350,14 @@ public class DataNode extends ReconfigurableBase
= new ReconfigurationProtocolServerSideTranslatorPB(this);
service = ReconfigurationProtocolService
.newReflectiveBlockingService(reconfigurationProtocolXlator);
- DFSUtil.addPBProtocol(getConf(), ReconfigurationProtocolPB.class, service,
+ DFSUtil.addInternalPBProtocol(getConf(), ReconfigurationProtocolPB.class, service,
ipcServer);
InterDatanodeProtocolServerSideTranslatorPB interDatanodeProtocolXlator =
new InterDatanodeProtocolServerSideTranslatorPB(this);
service = InterDatanodeProtocolService
.newReflectiveBlockingService(interDatanodeProtocolXlator);
- DFSUtil.addPBProtocol(getConf(), InterDatanodeProtocolPB.class, service,
+ DFSUtil.addInternalPBProtocol(getConf(), InterDatanodeProtocolPB.class, service,
ipcServer);
LOG.info("Opened IPC server at {}", ipcServer.getListenerAddress());
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
index dab227f..9b57d3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java
@@ -244,7 +244,7 @@ public class BackupNode extends NameNode {
new JournalProtocolServerSideTranslatorPB(this);
BlockingService service = JournalProtocolService
.newReflectiveBlockingService(journalProtocolTranslator);
- DFSUtil.addPBProtocol(conf, JournalProtocolPB.class, service,
+ DFSUtil.addInternalPBProtocol(conf, JournalProtocolPB.class, service,
this.clientRpcServer);
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index badbc64..c593dce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -371,24 +371,24 @@ public class NameNodeRpcServer implements NamenodeProtocols {
.build();
// Add all the RPC protocols that the namenode implements
- DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
+ DFSUtil.addInternalPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
serviceRpcServer);
- DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, ReconfigurationProtocolPB.class,
reconfigurationPbService, serviceRpcServer);
- DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
+ DFSUtil.addInternalPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
serviceRpcServer);
- DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
+ DFSUtil.addInternalPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
serviceRpcServer);
- DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
refreshAuthService, serviceRpcServer);
- DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, serviceRpcServer);
// We support Refreshing call queue here in case the client RPC queue is full
- DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, serviceRpcServer);
- DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, serviceRpcServer);
- DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, serviceRpcServer);
// Update the address with the correct port
@@ -431,7 +431,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
.setSecretManager(namesystem.getDelegationTokenSecretManager())
.build();
- DFSUtil.addPBProtocol(conf, DatanodeLifelineProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, DatanodeLifelineProtocolPB.class,
lifelineProtoPbService, lifelineRpcServer);
// Update the address with the correct port
@@ -474,23 +474,23 @@ public class NameNodeRpcServer implements NamenodeProtocols {
.build();
// Add all the RPC protocols that the namenode implements
- DFSUtil.addPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
+ DFSUtil.addInternalPBProtocol(conf, HAServiceProtocolPB.class, haPbService,
clientRpcServer);
- DFSUtil.addPBProtocol(conf, ReconfigurationProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, ReconfigurationProtocolPB.class,
reconfigurationPbService, clientRpcServer);
- DFSUtil.addPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
+ DFSUtil.addInternalPBProtocol(conf, NamenodeProtocolPB.class, NNPbService,
clientRpcServer);
- DFSUtil.addPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
+ DFSUtil.addInternalPBProtocol(conf, DatanodeProtocolPB.class, dnProtoPbService,
clientRpcServer);
- DFSUtil.addPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, RefreshAuthorizationPolicyProtocolPB.class,
refreshAuthService, clientRpcServer);
- DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, RefreshUserMappingsProtocolPB.class,
refreshUserMappingService, clientRpcServer);
- DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, RefreshCallQueueProtocolPB.class,
refreshCallQueueService, clientRpcServer);
- DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, GenericRefreshProtocolPB.class,
genericRefreshService, clientRpcServer);
- DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class,
+ DFSUtil.addInternalPBProtocol(conf, GetUserMappingsProtocolPB.class,
getUserMappingService, clientRpcServer);
// set service-level authorization security policy
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
index 76441e5..86a59ac 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/pom.xml
@@ -38,6 +38,10 @@
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</dependency>
<dependency>
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java
index ea8c17d..305e776 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/protocolPB/HSAdminRefreshProtocolClientSideTranslatorPB.java
@@ -22,7 +22,6 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil;
@@ -34,7 +33,8 @@ import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.Refr
import org.apache.hadoop.mapreduce.v2.hs.proto.HSAdminRefreshProtocolProtos.RefreshLogRetentionSettingsRequestProto;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
-import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.ipc;
@Private
public class HSAdminRefreshProtocolClientSideTranslatorPB implements
@@ -73,43 +73,27 @@ public class HSAdminRefreshProtocolClientSideTranslatorPB implements
@Override
public void refreshAdminAcls() throws IOException {
- try {
- rpcProxy.refreshAdminAcls(NULL_CONTROLLER,
- VOID_REFRESH_ADMIN_ACLS_REQUEST);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.refreshAdminAcls(NULL_CONTROLLER,
+ VOID_REFRESH_ADMIN_ACLS_REQUEST));
}
@Override
public void refreshLoadedJobCache() throws IOException {
- try {
- rpcProxy.refreshLoadedJobCache(NULL_CONTROLLER,
- VOID_REFRESH_LOADED_JOB_CACHE_REQUEST);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.refreshLoadedJobCache(NULL_CONTROLLER,
+ VOID_REFRESH_LOADED_JOB_CACHE_REQUEST));
}
@Override
public void refreshJobRetentionSettings() throws IOException {
- try {
- rpcProxy.refreshJobRetentionSettings(NULL_CONTROLLER,
- VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.refreshJobRetentionSettings(NULL_CONTROLLER,
+ VOID_REFRESH_JOB_RETENTION_SETTINGS_REQUEST));
}
@Override
public void refreshLogRetentionSettings() throws IOException {
- try {
- rpcProxy.refreshLogRetentionSettings(NULL_CONTROLLER,
- VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST);
- } catch (ServiceException se) {
- throw ProtobufHelper.getRemoteException(se);
- }
+ ipc(() -> rpcProxy.refreshLogRetentionSettings(NULL_CONTROLLER,
+ VOID_REFRESH_LOG_RETENTION_SETTINGS_REQUEST));
}
@Override
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
index a5bffce..78411ce 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
@@ -39,6 +39,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
+ <scope>${transient.protobuf2.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index e21a91f..5cae4c0 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -86,8 +86,14 @@
<!-- com.google.re2j version -->
<re2j.version>1.1</re2j.version>
- <!--Protobuf version for backward compatibility-->
+ <!-- Protobuf version for backward compatibility -->
+ <!-- This is used in hadoop-common for compilation only -->
<protobuf.version>2.5.0</protobuf.version>
+ <!-- Protobuf scope in hadoop common -->
+ <!-- set to "provided" and protobuf2 will no longer be exported as a dependency -->
+ <common.protobuf2.scope>provided</common.protobuf2.scope>
+ <!-- Protobuf scope in other modules which explicitly import the libarary -->
+ <transient.protobuf2.scope>${common.protobuf2.scope}</transient.protobuf2.scope>
<!-- ProtocolBuffer version, actually used in Hadoop -->
<hadoop.protobuf.version>3.23.4</hadoop.protobuf.version>
<protoc.path>${env.HADOOP_PROTOC_PATH}</protoc.path>
@@ -1340,7 +1346,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
+ <version>${hadoop.protobuf.version}</version>
</dependency>
<dependency>
<groupId>commons-daemon</groupId>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml
index 425cde0..85b37aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/pom.xml
@@ -185,6 +185,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
+ <scope>${transient.protobuf2.scope}</scope>
</dependency>
<dependency>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index 2bafd46..55ff5fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -132,6 +132,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
+ <scope>${transient.protobuf2.scope}</scope>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
index 20729a3..df4beb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/ResourceManagerAdministrationProtocolPBClientImpl.java
@@ -24,7 +24,6 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine2;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -106,6 +105,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UpdateNodeResou
import org.apache.hadoop.thirdparty.protobuf.ServiceException;
+import static org.apache.hadoop.ipc.internal.ShadedProtobufHelper.getRemoteException;
+
@Private
public class ResourceManagerAdministrationProtocolPBClientImpl implements ResourceManagerAdministrationProtocol, Closeable {
@@ -223,7 +224,7 @@ public class ResourceManagerAdministrationProtocolPBClientImpl implements Resour
return (String[]) responseProto.getGroupsList().toArray(
new String[responseProto.getGroupsCount()]);
} catch (ServiceException e) {
- throw ProtobufHelper.getRemoteException(e);
+ throw getRemoteException(e);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
index f2c2da4..c8405a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-csi/pom.xml
@@ -38,6 +38,7 @@
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${hadoop.protobuf.version}</version>
+ <scope>${transient.protobuf2.scope}</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index ab33b27..abae2ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -89,6 +89,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
+ <scope>${transient.protobuf2.scope}</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
index edfa1e0..c2ed8f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
@@ -81,6 +81,7 @@
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
+ <scope>${transient.protobuf2.scope}</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
Follow ups