diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index 5f414f2cd69fd..8e0c8cbb450ba 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.audit.AuditLogOperation; import org.apache.iotdb.commons.audit.IAuditEntity; +import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.auth.entity.PrivilegeUnion; import org.apache.iotdb.commons.conf.CommonDescriptor; @@ -95,7 +96,6 @@ import org.apache.iotdb.confignode.manager.pipe.metric.receiver.PipeConfigNodeReceiverMetrics; import org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanExceptionVisitor; import org.apache.iotdb.confignode.manager.pipe.receiver.visitor.PipeConfigPhysicalPlanTSStatusVisitor; -import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigNodeHandshakeV1Req; import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigNodeHandshakeV2Req; import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigPlanReq; import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotPieceReq; @@ -185,13 +185,15 @@ public TPipeTransferResp receive(final TPipeTransferReq req) { .setMessage( "The receiver ConfigNode has set up a new receiver and the sender must re-send its handshake request.")); } + final TPipeTransferResp authResp = checkPipeTransferAuthenticated(type); + if (Objects.nonNull(authResp)) { + return authResp; + } final TPipeTransferResp resp; final long startTime = System.nanoTime(); switch (type) { case HANDSHAKE_CONFIGNODE_V1: - resp = - handleTransferHandshakeV1( - PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req)); + resp = new TPipeTransferResp(getUnsupportedHandshakeV1Status()); PipeConfigNodeReceiverMetrics.getInstance() .recordHandshakeConfigNodeV1Timer(System.nanoTime() - startTime); return resp; @@ -199,7 +201,10 @@ public TPipeTransferResp receive(final TPipeTransferReq req) { resp = handleTransferHandshakeV2( PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req)); - userEntity.setAuditLogOperation(AuditLogOperation.DDL); + if (resp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + && Objects.nonNull(userEntity)) { + userEntity.setAuditLogOperation(AuditLogOperation.DDL); + } PipeConfigNodeReceiverMetrics.getInstance() .recordHandshakeConfigNodeV2Timer(System.nanoTime() - startTime); return resp; @@ -262,6 +267,37 @@ private boolean needHandshake(final PipeRequestType type) { && type != PipeRequestType.HANDSHAKE_CONFIGNODE_V2; } + private TPipeTransferResp checkPipeTransferAuthenticated(final PipeRequestType type) { + if (!requiresAuthentication(type)) { + return null; + } + + final IClientSession clientSession = SESSION_MANAGER.getCurrSession(); + if (hasPipeHandshakeCredential || (clientSession != null && clientSession.isLogin())) { + if (!hasPipeHandshakeCredential && clientSession != null) { + username = clientSession.getUsername(); + userEntity = + new UserEntity(clientSession.getUserId(), username, clientSession.getClientAddress()) + .setAuditLogOperation(AuditLogOperation.DDL); + } + return null; + } + + return new TPipeTransferResp(getNotLoggedInStatus()); + } + + private static boolean requiresAuthentication(final PipeRequestType type) { + switch (type) { + case TRANSFER_CONFIG_PLAN: + case TRANSFER_CONFIG_SNAPSHOT_PIECE: + case TRANSFER_CONFIG_SNAPSHOT_SEAL: + case TRANSFER_COMPRESSED: + return true; + default: + return false; + } + } + private TPipeTransferResp handleTransferConfigPlan(final PipeTransferConfigPlanReq req) throws IOException { return new TPipeTransferResp( @@ -1221,11 +1257,19 @@ protected String getClusterId() { // 2. The detection period (300s) is too long for configPlans. @Override protected boolean shouldLogin() { - return true; + final IClientSession clientSession = SESSION_MANAGER.getCurrSession(); + return hasPipeHandshakeCredential || clientSession == null || !clientSession.isLogin(); } @Override protected TSStatus login() { + final IClientSession session = SESSION_MANAGER.getCurrSession(); + if (!hasPipeHandshakeCredential) { + return session != null && session.isLogin() + ? RpcUtils.SUCCESS_STATUS + : getNotLoggedInStatus(); + } + return configManager.login(username, password, false).getStatus(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java index 8747c12d076b6..f99a5bb1f5839 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.agent.task.builder; +import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; @@ -47,6 +48,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_FORMAT_KEY; @@ -131,6 +133,8 @@ public PipeDataNodeTask build() { sinkStage.getPipeSinkPendingQueue(), PROCESSOR_EXECUTOR, pipeTaskMeta, + getSourceUserEntity(sourceParameters), + getSourcePassword(sourceParameters), pipeStaticMeta .getSinkParameters() .getStringOrDefault( @@ -143,6 +147,38 @@ public PipeDataNodeTask build() { pipeStaticMeta.getPipeName(), regionId, sourceStage, processorStage, sinkStage); } + private UserEntity getSourceUserEntity(final PipeParameters sourceParameters) { + final String username = + sourceParameters.getStringByKeys( + PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY, + PipeSourceConstant.SOURCE_IOTDB_USER_KEY, + PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY, + PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY); + if (Objects.isNull(username)) { + return null; + } + + final String userId = + sourceParameters.getStringOrDefault( + Arrays.asList( + PipeSourceConstant.EXTRACTOR_IOTDB_USER_ID, + PipeSourceConstant.SOURCE_IOTDB_USER_ID), + "-1"); + final String cliHostname = + sourceParameters.getStringOrDefault( + Arrays.asList( + PipeSourceConstant.EXTRACTOR_IOTDB_CLI_HOSTNAME, + PipeSourceConstant.SOURCE_IOTDB_CLI_HOSTNAME), + ""); + return new UserEntity(Long.parseLong(userId), username, cliHostname); + } + + private String getSourcePassword(final PipeParameters sourceParameters) { + return sourceParameters.getStringByKeys( + PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY, + PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + } + private void generateSystemParameters() { if (!(pipeTaskMeta.getProgressIndex() instanceof MinimumProgressIndex) || pipeTaskMeta.isNewlyAdded()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java index 2373495c8ebd8..85a3ff4cd3c60 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/stage/PipeTaskProcessorStage.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.agent.task.stage; +import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.connection.EventSupplier; @@ -70,12 +71,14 @@ public PipeTaskProcessorStage( final UnboundedBlockingPendingQueue pipeSinkOutputPendingQueue, final PipeProcessorSubtaskExecutor executor, final PipeTaskMeta pipeTaskMeta, + final UserEntity sourceUserEntity, + final String sourcePassword, final boolean forceTabletFormat, final boolean skipParsing) { final PipeProcessorRuntimeConfiguration runtimeConfiguration = new PipeTaskRuntimeConfiguration( new PipeTaskProcessorRuntimeEnvironment( - pipeName, creationTime, regionId, pipeTaskMeta)); + pipeName, creationTime, regionId, pipeTaskMeta, sourceUserEntity, sourcePassword)); final PipeProcessor pipeProcessor = StorageEngine.getInstance().getAllDataRegionIds().contains(new DataRegionId(regionId)) || PipeRuntimeMeta.isSourceExternal(regionId) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java index 4ab24850cf9e0..ac2dfcc24e905 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/receiver/TwoStageAggregateReceiver.java @@ -26,6 +26,8 @@ import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest; import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.FetchCombineResultRequest; import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.RequestType; +import org.apache.iotdb.db.protocol.session.IClientSession; +import org.apache.iotdb.db.protocol.session.SessionManager; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; @@ -37,6 +39,7 @@ public class TwoStageAggregateReceiver implements IoTDBReceiver { private static final Logger LOGGER = LoggerFactory.getLogger(TwoStageAggregateReceiver.class); + private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); @Override public IoTDBSinkRequestVersion getVersion() { @@ -46,6 +49,14 @@ public IoTDBSinkRequestVersion getVersion() { @Override public TPipeTransferResp receive(TPipeTransferReq req) { try { + final IClientSession clientSession = SESSION_MANAGER.getCurrSession(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.NOT_LOGIN, + "Log in failed. Either you are not authorized or the session has timed out.")); + } + final short rawRequestType = req.getType(); if (RequestType.isValidatedRequestType(rawRequestType)) { switch (RequestType.valueOf(rawRequestType)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java index e221b4a19c779..f9624086d83fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/exchange/sender/TwoStageAggregateSender.java @@ -20,8 +20,10 @@ package org.apache.iotdb.db.pipe.processor.twostage.exchange.sender; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.property.ThriftClientProperty; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; @@ -32,14 +34,18 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; +import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.ZoneId; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -55,8 +61,12 @@ public class TwoStageAggregateSender implements AutoCloseable { private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance(); + private static final String USE_ENCRYPTED_PASSWORD_KEY = "use_encrypted_password"; + private final String pipeName; private final long creationTime; + private final UserEntity sourceUserEntity; + private final String sourcePassword; private static final AtomicLong DATANODE_ID_2_END_POINTS_LAST_UPDATE_TIME = new AtomicLong(0); private static final AtomicReference> DATANODE_ID_2_END_POINTS = @@ -67,8 +77,15 @@ public class TwoStageAggregateSender implements AutoCloseable { new ConcurrentHashMap<>(); public TwoStageAggregateSender(String pipeName, long creationTime) { + this(pipeName, creationTime, null, null); + } + + public TwoStageAggregateSender( + String pipeName, long creationTime, UserEntity sourceUserEntity, String sourcePassword) { this.pipeName = pipeName; this.creationTime = creationTime; + this.sourceUserEntity = sourceUserEntity; + this.sourcePassword = sourcePassword; } public synchronized TPipeTransferResp request(long watermark, TPipeTransferReq req) @@ -178,7 +195,7 @@ private void tryConstructClients(boolean endPointsChanged) { try { endPointIoTDBSyncClientMap.put(endPoint, constructIoTDBSyncClient(endPoint)); - } catch (TTransportException e) { + } catch (TException e) { LOGGER.warn(DataNodePipeMessages.FAILED_TO_CONSTRUCT_IOTDBSYNCCLIENT, e); } } @@ -194,8 +211,7 @@ private void tryConstructClients(boolean endPointsChanged) { } } - private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint) - throws TTransportException { + private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint) throws TException { final IoTDBSyncClient oldClient = endPointIoTDBSyncClientMap.remove(endPoint); if (oldClient != null) { try { @@ -209,17 +225,46 @@ private IoTDBSyncClient reconstructIoTDBSyncClient(TEndPoint endPoint) return newClient; } - private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TTransportException { - return new IoTDBSyncClient( - new ThriftClientProperty.Builder() - .setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs()) - .setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled()) - .build(), - endPoint.getIp(), - endPoint.getPort(), - false, - null, - null); + private IoTDBSyncClient constructIoTDBSyncClient(TEndPoint endPoint) throws TException { + final IoTDBSyncClient client = + new IoTDBSyncClient( + new ThriftClientProperty.Builder() + .setConnectionTimeoutMs(PIPE_CONFIG.getPipeSinkHandshakeTimeoutMs()) + .setRpcThriftCompressionEnabled(PIPE_CONFIG.isPipeSinkRPCThriftCompressionEnabled()) + .build(), + endPoint.getIp(), + endPoint.getPort(), + false, + null, + null); + openSession(client); + return client; + } + + private void openSession(final IoTDBSyncClient client) throws TException { + if (Objects.isNull(sourceUserEntity) || Objects.isNull(sourcePassword)) { + throw new PipeException( + String.format( + "Missing source credentials for two-stage aggregate pipe %s-%s.", + pipeName, creationTime)); + } + + final TSOpenSessionReq openSessionReq = new TSOpenSessionReq(); + openSessionReq.setUsername(sourceUserEntity.getUsername()); + openSessionReq.setPassword(sourcePassword); + openSessionReq.setZoneId(ZoneId.systemDefault().toString()); + openSessionReq.setClient_protocol(TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3); + openSessionReq.putToConfiguration("version", IoTDBConstant.ClientVersion.V_1_0.toString()); + openSessionReq.putToConfiguration(USE_ENCRYPTED_PASSWORD_KEY, Boolean.TRUE.toString()); + + final TSOpenSessionResp openSessionResp = client.openSession(openSessionReq); + if (openSessionResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException( + String.format( + "Failed to login for two-stage aggregate pipe %s-%s, status: %s.", + pipeName, creationTime, openSessionResp.getStatus())); + } + client.setTimeout(PIPE_CONFIG.getPipeSinkTransferTimeoutMs()); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java index e9f40b69fa628..fc5e3e09e26e5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/twostage/plugin/TwoStageCountProcessor.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.pipe.processor.twostage.plugin; +import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; @@ -102,6 +103,9 @@ public class TwoStageCountProcessor implements PipeProcessor { private final Queue /* ([timestamp, local count], progress index) */> localCommitQueue = new ConcurrentLinkedQueue<>(); + private UserEntity sourceUserEntity; + private String sourcePassword; + private TwoStageAggregateSender twoStageAggregateSender; private final Queue /* (timestamp, global count) */> globalCountQueue = new ConcurrentLinkedQueue<>(); @@ -139,6 +143,8 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati creationTime = runtimeEnvironment.getCreationTime(); regionId = runtimeEnvironment.getRegionId(); pipeTaskMeta = runtimeEnvironment.getPipeTaskMeta(); + sourceUserEntity = runtimeEnvironment.getSourceUserEntity(); + sourcePassword = runtimeEnvironment.getSourcePassword(); dataBaseName = StorageEngine.getInstance() .getDataRegion(new DataRegionId(runtimeEnvironment.getRegionId())) @@ -176,7 +182,8 @@ public void customize(PipeParameters parameters, PipeProcessorRuntimeConfigurati PipeCombineHandlerManager.getInstance() .register( pipeName, creationTime, (combineId) -> new CountOperator(combineId, globalCountQueue)); - twoStageAggregateSender = new TwoStageAggregateSender(pipeName, creationTime); + twoStageAggregateSender = + new TwoStageAggregateSender(pipeName, creationTime, sourceUserEntity, sourcePassword); } static PartialPath parseOutputSeries(final PipeParameters parameters) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 978d924c05651..3de8ae72425dd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -66,7 +66,6 @@ import org.apache.iotdb.db.pipe.receiver.visitor.PipeTreeStatementToBatchVisitor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; -import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq; @@ -221,20 +220,15 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) { if (requestType != PipeRequestType.TRANSFER_SLICE) { sliceReqHandler.clear(); } + final TPipeTransferResp authResp = checkPipeTransferAuthenticated(requestType); + if (Objects.nonNull(authResp)) { + return authResp; + } switch (requestType) { case HANDSHAKE_DATANODE_V1: { try { - if (PipeConfig.getInstance().isPipeEnableMemoryCheck() - && PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes() - < PipeConfig.getInstance().getPipeMinimumReceiverMemory()) { - return new TPipeTransferResp( - RpcUtils.getStatus( - TSStatusCode.PIPE_HANDSHAKE_ERROR.getStatusCode(), - "The receiver memory is not enough to handle the handshake request from datanode.")); - } - return handleTransferHandshakeV1( - PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req)); + return new TPipeTransferResp(getUnsupportedHandshakeV1Status()); } finally { PipeDataNodeReceiverMetrics.getInstance() .recordHandshakeDatanodeV1Timer(System.nanoTime() - startTime); @@ -528,6 +522,46 @@ protected String getClusterId() { return IoTDBDescriptor.getInstance().getConfig().getClusterId(); } + private TPipeTransferResp checkPipeTransferAuthenticated(final PipeRequestType requestType) { + if (!requiresAuthentication(requestType)) { + return null; + } + + final IClientSession clientSession = SESSION_MANAGER.getCurrSession(); + if (hasPipeHandshakeCredential || (clientSession != null && clientSession.isLogin())) { + if (!hasPipeHandshakeCredential && clientSession != null) { + username = clientSession.getUsername(); + userEntity = AuthorityChecker.createIAuditEntity(username, clientSession); + } + return null; + } + + return new TPipeTransferResp(getNotLoggedInStatus()); + } + + private static boolean requiresAuthentication(final PipeRequestType requestType) { + switch (requestType) { + case TRANSFER_TABLET_INSERT_NODE: + case TRANSFER_TABLET_INSERT_NODE_V2: + case TRANSFER_TABLET_RAW: + case TRANSFER_TABLET_RAW_V2: + case TRANSFER_TABLET_BINARY: + case TRANSFER_TABLET_BINARY_V2: + case TRANSFER_TABLET_BATCH: + case TRANSFER_TABLET_BATCH_V2: + case TRANSFER_TS_FILE_PIECE: + case TRANSFER_TS_FILE_SEAL: + case TRANSFER_TS_FILE_PIECE_WITH_MOD: + case TRANSFER_TS_FILE_SEAL_WITH_MOD: + case TRANSFER_PLAN_NODE: + case TRANSFER_SCHEMA_SNAPSHOT_PIECE: + case TRANSFER_SCHEMA_SNAPSHOT_SEAL: + return true; + default: + return false; + } + } + @Override protected boolean shouldLogin() { // The idle time is updated per request @@ -1105,6 +1139,12 @@ private boolean shouldUseTableModelVisitorForLoadStatement( protected TSStatus login() { final IClientSession session = SESSION_MANAGER.getCurrSession(); + if (!hasPipeHandshakeCredential) { + return session != null && session.isLogin() + ? RpcUtils.SUCCESS_STATUS + : getNotLoggedInStatus(); + } + if (session != null && !session.isLogin()) { final BasicOpenSessionResp openSessionResp = SESSION_MANAGER.login( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 50312a4cea667..95d5af029feb3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -147,6 +147,8 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.rpc.stmt.PreparedParameterSerde; import org.apache.iotdb.rpc.stmt.PreparedParameterSerde.DeserializedParam; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeResponseType; +import org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeResponseVersion; import org.apache.iotdb.service.rpc.thrift.ServerProperties; import org.apache.iotdb.service.rpc.thrift.TCreateTimeseriesUsingSchemaTemplateReq; import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq; @@ -3472,17 +3474,51 @@ public TPipeTransferResp pipeTransfer(final TPipeTransferReq req) { @Override public TPipeSubscribeResp pipeSubscribe(final TPipeSubscribeReq req) { - return SubscriptionAgent.receiver().handle(req); + try { + final IClientSession clientSession = SESSION_MANAGER.getCurrSession(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return getNotLoggedInPipeSubscribeResp(); + } + + return SubscriptionAgent.receiver().handle(req); + } finally { + SESSION_MANAGER.updateIdleTime(); + } } @Override public TSBackupConfigurationResp getBackupConfiguration() { - return new TSBackupConfigurationResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + try { + final IClientSession clientSession = SESSION_MANAGER.getCurrSession(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return new TSBackupConfigurationResp(getNotLoggedInStatus()); + } + + return new TSBackupConfigurationResp(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)); + } finally { + SESSION_MANAGER.updateIdleTime(); + } } @Override public TSConnectionInfoResp fetchAllConnectionsInfo() { - return SESSION_MANAGER.getAllConnectionInfo(); + try { + final IClientSession clientSession = SESSION_MANAGER.getCurrSession(); + if (!SESSION_MANAGER.checkLogin(clientSession)) { + return new TSConnectionInfoResp(Collections.emptyList()); + } + + return SESSION_MANAGER.getAllConnectionInfo(); + } finally { + SESSION_MANAGER.updateIdleTime(); + } + } + + private TPipeSubscribeResp getNotLoggedInPipeSubscribeResp() { + return new TPipeSubscribeResp( + getNotLoggedInStatus(), + PipeSubscribeResponseVersion.VERSION_1.getVersion(), + PipeSubscribeResponseType.ACK.getType()); } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeReceiverTest.java index 5b38d7bfdfdd2..a6d4011505fa5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeReceiverTest.java @@ -20,35 +20,77 @@ package org.apache.iotdb.db.pipe.sink; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; -import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq; +import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.write.record.Tablet; -import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Assert; import org.junit.Test; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; public class PipeReceiverTest { @Test - public void testIoTDBThriftReceiverV1() { + public void testUnauthenticatedPipeTransferRejected() { + final IoTDBDataNodeReceiver receiver = new IoTDBDataNodeReceiver(); + + final TPipeTransferResp resp = receiver.receive(buildEmptyRawTabletTransferReq()); + + Assert.assertEquals(TSStatusCode.NOT_LOGIN.getStatusCode(), resp.getStatus().getCode()); + } + + @Test + public void testUnauthenticatedWrappedPipeTransferRejected() throws IOException { + final IoTDBDataNodeReceiver receiver = new IoTDBDataNodeReceiver(); + final TPipeTransferReq rawReq = buildEmptyRawTabletTransferReq(); + + final TPipeTransferResp compressedResp = + receiver.receive( + PipeTransferCompressedReq.toTPipeTransferReq(rawReq, Collections.emptyList())); + Assert.assertEquals( + TSStatusCode.NOT_LOGIN.getStatusCode(), compressedResp.getStatus().getCode()); + + final TPipeTransferReq sliceReq = + PipeTransferSliceReq.toTPipeTransferReq( + 0, + PipeRequestType.TRANSFER_TABLET_RAW.getType(), + 0, + 1, + rawReq.body.duplicate(), + 0, + rawReq.body.limit()); + final TPipeTransferResp sliceResp = receiver.receive(sliceReq); + Assert.assertEquals(TSStatusCode.NOT_LOGIN.getStatusCode(), sliceResp.getStatus().getCode()); + } + + @Test + public void testIoTDBThriftReceiverV1HandshakeRejected() { IoTDBDataNodeReceiver receiver = new IoTDBDataNodeReceiver(); try { - receiver.receive( - PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq( - CommonDescriptor.getInstance().getConfig().getTimestampPrecision())); - receiver.receive( - PipeTransferTabletRawReq.toTPipeTransferReq( - new Tablet( - "root.sg.d", - Collections.singletonList(new MeasurementSchema("s", TSDataType.INT32))), - true)); + final TPipeTransferResp handshakeResp = + receiver.receive( + PipeTransferDataNodeHandshakeV1Req.toTPipeTransferReq( + CommonDescriptor.getInstance().getConfig().getTimestampPrecision())); + Assert.assertEquals( + TSStatusCode.PIPE_HANDSHAKE_ERROR.getStatusCode(), handshakeResp.getStatus().getCode()); } catch (IOException e) { Assert.fail(); } } + + private TPipeTransferReq buildEmptyRawTabletTransferReq() { + final TPipeTransferReq req = new TPipeTransferReq(); + req.setVersion(IoTDBSinkRequestVersion.VERSION_1.getVersion()); + req.setType(PipeRequestType.TRANSFER_TABLET_RAW.getType()); + req.setBody(ByteBuffer.allocate(0)); + return req; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskProcessorRuntimeEnvironment.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskProcessorRuntimeEnvironment.java index 6e5cf88793853..11111b0d9f576 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskProcessorRuntimeEnvironment.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/plugin/env/PipeTaskProcessorRuntimeEnvironment.java @@ -19,19 +19,42 @@ package org.apache.iotdb.commons.pipe.config.plugin.env; +import org.apache.iotdb.commons.audit.UserEntity; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; public class PipeTaskProcessorRuntimeEnvironment extends PipeTaskRuntimeEnvironment { private final PipeTaskMeta pipeTaskMeta; + private final UserEntity sourceUserEntity; + private final String sourcePassword; public PipeTaskProcessorRuntimeEnvironment( String pipeName, long creationTime, int regionId, PipeTaskMeta pipeTaskMeta) { + this(pipeName, creationTime, regionId, pipeTaskMeta, null, null); + } + + public PipeTaskProcessorRuntimeEnvironment( + String pipeName, + long creationTime, + int regionId, + PipeTaskMeta pipeTaskMeta, + UserEntity sourceUserEntity, + String sourcePassword) { super(pipeName, creationTime, regionId); this.pipeTaskMeta = pipeTaskMeta; + this.sourceUserEntity = sourceUserEntity; + this.sourcePassword = sourcePassword; } public PipeTaskMeta getPipeTaskMeta() { return pipeTaskMeta; } + + public UserEntity getSourceUserEntity() { + return sourceUserEntity; + } + + public String getSourcePassword() { + return sourcePassword; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java index 8304161c9d0db..47871d05081cd 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java @@ -82,6 +82,7 @@ public abstract class IoTDBFileReceiver implements IoTDBReceiver { protected String username = CONNECTOR_IOTDB_USER_DEFAULT_VALUE; protected String password = CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; protected IAuditEntity userEntity; + protected boolean hasPipeHandshakeCredential = false; protected long lastSuccessfulLoginTime = Long.MIN_VALUE; @@ -106,6 +107,8 @@ public IoTDBSinkRequestVersion getVersion() { } protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshakeV1Req req) { + hasPipeHandshakeCredential = false; + if (!CommonDescriptor.getInstance() .getConfig() .getTimestampPrecision() @@ -226,6 +229,8 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak protected TPipeTransferResp handleTransferHandshakeV2(final PipeTransferHandshakeV2Req req) throws IOException { + hasPipeHandshakeCredential = false; + // Reject to handshake if the receiver can not take clusterId from config node. final String clusterIdFromConfigNode = getClusterId(); if (clusterIdFromConfigNode == null) { @@ -280,30 +285,41 @@ protected TPipeTransferResp handleTransferHandshakeV2(final PipeTransferHandshak if (userIdString != null) { userId = Long.parseLong(userIdString); } - final String usernameString = - req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME); - if (usernameString != null) { - username = usernameString; - } final String cliHostnameString = req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLI_HOSTNAME); if (cliHostnameString != null) { cliHostname = cliHostnameString; } - userEntity = new UserEntity(userId, username, cliHostname); - + final String usernameString = + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME); final String passwordString = req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD); - if (passwordString != null) { - password = passwordString; + if (usernameString == null || passwordString == null) { + return new TPipeTransferResp( + RpcUtils.getStatus( + TSStatusCode.NOT_LOGIN, "Pipe handshake missing username or password.")); + } + + username = usernameString; + password = passwordString; + userEntity = new UserEntity(userId, username, cliHostname); + hasPipeHandshakeCredential = true; + + final TSStatus status; + try { + status = login(); + } catch (final Exception e) { + hasPipeHandshakeCredential = false; + throw e; } - final TSStatus status = loginIfNecessary(); if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + hasPipeHandshakeCredential = false; PipeLogger.log( LOGGER::warn, PipeMessages.RECEIVER_HANDSHAKE_FAILED_LOGIN, receiverId.get(), status); return new TPipeTransferResp(status); } else { + lastSuccessfulLoginTime = System.currentTimeMillis(); LOGGER.info(PipeMessages.RECEIVER_USER_LOGIN_SUCCESS, receiverId.get(), username); } @@ -343,13 +359,17 @@ protected TPipeTransferResp handleTransferHandshakeV2(final PipeTransferHandshak // Handle the handshake request as a v1 request. // Here we construct a fake "dataNode" request to valid from v1 validation logic, though // it may not require the actual type of the v1 request. - return handleTransferHandshakeV1( - new PipeTransferHandshakeV1Req() { - @Override - protected PipeRequestType getPlanType() { - return PipeRequestType.HANDSHAKE_DATANODE_V1; - } - }.convertToTPipeTransferReq(timestampPrecision)); + final TPipeTransferResp handshakeResp = + handleTransferHandshakeV1( + new PipeTransferHandshakeV1Req() { + @Override + protected PipeRequestType getPlanType() { + return PipeRequestType.HANDSHAKE_DATANODE_V1; + } + }.convertToTPipeTransferReq(timestampPrecision)); + hasPipeHandshakeCredential = + handshakeResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode(); + return handshakeResp; } protected abstract String getClusterId(); @@ -380,6 +400,18 @@ protected TSStatus loginIfNecessary() { return StatusUtils.OK; } + protected TSStatus getNotLoggedInStatus() { + return RpcUtils.getStatus( + TSStatusCode.NOT_LOGIN, + "Log in failed. Either you are not authorized or the session has timed out."); + } + + protected TSStatus getUnsupportedHandshakeV1Status() { + return RpcUtils.getStatus( + TSStatusCode.PIPE_HANDSHAKE_ERROR, + "Pipe handshake V1 is no longer supported. Please use handshake V2 with username and password."); + } + protected abstract TSStatus login(); protected final TPipeTransferResp handleTransferFilePiece( diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java index 5a1ed3e653aec..12dbec2b10cbe 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiverTest.java @@ -22,10 +22,12 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferHandshakeV1Req; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferHandshakeV2Req; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; @@ -41,7 +43,9 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; public class IoTDBFileReceiverTest { @@ -115,6 +119,64 @@ public void testHandshakeResetsWritingFileState() throws Exception { } } + @Test + public void testHandshakeV1ClearsPipeCredential() throws Exception { + final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test"); + final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile()); + try { + receiver.setHasPipeHandshakeCredential(true); + + receiver.handshake(); + + Assert.assertFalse(receiver.hasPipeHandshakeCredential()); + } finally { + receiver.handleExit(); + } + } + + @Test + public void testHandshakeV2RequiresCredentials() throws Exception { + final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test"); + final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile()); + try { + final TPipeTransferResp response = receiver.handshakeV2(buildHandshakeV2Params(false)); + + Assert.assertEquals(TSStatusCode.NOT_LOGIN.getStatusCode(), response.getStatus().getCode()); + Assert.assertEquals(0, receiver.getLoginCallCount()); + } finally { + receiver.handleExit(); + } + } + + @Test + public void testHandshakeV2AuthenticatesImmediately() throws Exception { + final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test"); + final DummyFileReceiver receiver = new DummyFileReceiver(baseDir.toFile()); + try { + final TPipeTransferResp response = receiver.handshakeV2(buildHandshakeV2Params(true)); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), response.getStatus().getCode()); + Assert.assertEquals(1, receiver.getLoginCallCount()); + Assert.assertTrue(receiver.hasPipeHandshakeCredential()); + } finally { + receiver.handleExit(); + } + } + + private Map buildHandshakeV2Params(final boolean includeCredentials) { + final Map params = new HashMap<>(); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, "sender-cluster"); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + if (includeCredentials) { + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root"); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root"); + } + return params; + } + @Test public void testSealFileV1FailureDeletesTransferredFile() throws Exception { final Path baseDir = Files.createTempDirectory("iotdb-file-receiver-test"); @@ -142,6 +204,7 @@ private static class DummyFileReceiver extends IoTDBFileReceiver { private final File receiverFileBaseDir; private TSStatus loadFileV1Status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + private int loginCallCount = 0; DummyFileReceiver(final File baseDir) { receiverFileBaseDir = baseDir; @@ -158,6 +221,10 @@ void handshake() throws IOException { CommonDescriptor.getInstance().getConfig().getTimestampPrecision())); } + TPipeTransferResp handshakeV2(final Map params) throws IOException { + return handleTransferHandshakeV2(DummyHandshakeV2Req.toTPipeTransferReq(params)); + } + void writeToCurrentWritingFile(final byte[] bytes) throws Exception { getCurrentWritingFileWriter().write(bytes); } @@ -166,6 +233,18 @@ void setLoadFileV1Status(final TSStatus status) { loadFileV1Status = status; } + void setHasPipeHandshakeCredential(final boolean hasPipeHandshakeCredential) { + this.hasPipeHandshakeCredential = hasPipeHandshakeCredential; + } + + boolean hasPipeHandshakeCredential() { + return hasPipeHandshakeCredential; + } + + int getLoginCallCount() { + return loginCallCount; + } + TPipeTransferResp sealFileV1(final String fileName, final long fileLength) throws IOException { return handleTransferFileSealV1(DummyFileSealReqV1.toTPipeTransferReq(fileName, fileLength)); } @@ -225,7 +304,8 @@ protected String getClusterId() { @Override protected TSStatus login() { - return new TSStatus(200); + loginCallCount++; + return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } @Override @@ -266,6 +346,19 @@ protected PipeRequestType getPlanType() { } } + private static class DummyHandshakeV2Req extends PipeTransferHandshakeV2Req { + + static DummyHandshakeV2Req toTPipeTransferReq(final Map params) + throws IOException { + return (DummyHandshakeV2Req) new DummyHandshakeV2Req().convertToTPipeTransferReq(params); + } + + @Override + protected PipeRequestType getPlanType() { + return PipeRequestType.HANDSHAKE_DATANODE_V2; + } + } + private static class DummyFileSealReqV1 extends PipeTransferFileSealReqV1 { static DummyFileSealReqV1 toTPipeTransferReq(final String fileName, final long fileLength)