From 41eb7e65fc9d5c889eec79c5b00eaf2d160bd1e1 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 1 Jul 2026 18:34:53 +0800 Subject: [PATCH] [Pipe] Improve pipe control-flow lock responsiveness (#18076) * Improve pipe control flow lock responsiveness * Fix pipe meta helper test initialization --- .../pipe/AbstractOperatePipeProcedureV2.java | 24 +- .../AbstractOperatePipeProcedureV2Test.java | 168 ++++++++++++ .../impl/DataNodeInternalRPCServiceImpl.java | 53 +--- .../thrift/impl/PushMultiPipeMetaHelper.java | 85 ++++++ ...alRPCServiceImplPushMultiPipeMetaTest.java | 246 ++++++++++++++++++ 5 files changed, 528 insertions(+), 48 deletions(-) create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2Test.java create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/PushMultiPipeMetaHelper.java create mode 100644 iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImplPushMultiPipeMetaTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java index 592f5291c13ee..0bfe1612f07ca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java @@ -46,7 +46,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -93,6 +92,10 @@ public abstract class AbstractOperatePipeProcedureV2 // putting it here is just for convenience protected AtomicReference pipeTaskInfo; + // Only used to release global locks before retrying the same state. Do not serialize it because a + // recovered procedure is already re-scheduled by the procedure framework. + private transient boolean shouldYieldAfterExecution; + private static final String SKIP_PIPE_PROCEDURE_MESSAGE = "Try to start a RUNNING pipe or stop a STOPPED pipe, do nothing."; @@ -162,15 +165,17 @@ protected void releaseLock(ConfigNodeProcedureEnv configNodeProcedureEnv) { LOGGER.warn("ProcedureId {} release lock. No need to release pipe lock.", getProcId()); } else { LOGGER.debug("ProcedureId {} release lock. Pipe lock will be released.", getProcId()); - if (this instanceof PipeMetaSyncProcedure) { + if (isSuccess() && this instanceof PipeMetaSyncProcedure) { configNodeProcedureEnv .getConfigManager() .getPipeManager() .getPipeTaskCoordinator() .updateLastSyncedVersion(); } - PipeProcedureMetrics.getInstance() - .updateTimer(this.getOperation().getName(), this.elapsedTime()); + if (isFinished()) { + PipeProcedureMetrics.getInstance() + .updateTimer(this.getOperation().getName(), this.elapsedTime()); + } releasePipeTaskCoordinatorLock(configNodeProcedureEnv); } } @@ -196,7 +201,7 @@ private void releasePipeTaskCoordinatorLock(ConfigNodeProcedureEnv configNodePro public abstract void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env); /** - * Execute at state {@link OperatePipeTaskState#WRITE_CONFIG_NODE_CONSENSUS}.‘ + * Execute at state {@link OperatePipeTaskState#WRITE_CONFIG_NODE_CONSENSUS}. * * @throws PipeException if configNode consensus write failed */ @@ -215,6 +220,7 @@ public abstract void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) @Override protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState state) throws InterruptedException { + shouldYieldAfterExecution = false; if (pipeTaskInfo == null) { LOGGER.warn( "ProcedureId {}: Pipe lock is not acquired, executeFromState's execution will be skipped.", @@ -262,8 +268,7 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState RETRY_THRESHOLD, e); setNextState(getCurrentState()); - // Wait 3s for next retry - TimeUnit.MILLISECONDS.sleep(3000L); + shouldYieldAfterExecution = true; } else { LOGGER.warn( "ProcedureId {}: All {} retries failed when trying to {} at state [{}], will rollback...", @@ -283,6 +288,11 @@ protected Flow executeFromState(ConfigNodeProcedureEnv env, OperatePipeTaskState return Flow.HAS_MORE_STATE; } + @Override + protected boolean isYieldAfterExecution(final ConfigNodeProcedureEnv env) { + return shouldYieldAfterExecution; + } + @Override protected boolean isRollbackSupported(OperatePipeTaskState state) { return true; diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2Test.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2Test.java new file mode 100644 index 0000000000000..be6b6374e8572 --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2Test.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.procedure.impl.pipe; + +import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; +import org.apache.iotdb.confignode.procedure.Procedure; +import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure; +import org.apache.iotdb.confignode.procedure.state.pipe.task.OperatePipeTaskState; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +public class AbstractOperatePipeProcedureV2Test { + + @Test + public void testSuccessfulStateDoesNotYield() throws Exception { + final TestOperatePipeProcedure procedure = new TestOperatePipeProcedure(); + + Assert.assertEquals( + StateMachineProcedure.Flow.HAS_MORE_STATE, + procedure.executeFromState(null, OperatePipeTaskState.VALIDATE_TASK)); + + Assert.assertFalse(procedure.isYieldAfterExecution(null)); + Assert.assertEquals(1, procedure.validateExecutionCount); + } + + @Test + public void testRetryStateYieldsAndResetsAfterNextExecution() throws Exception { + final TestOperatePipeProcedure procedure = new TestOperatePipeProcedure(); + procedure.failValidation = true; + + Assert.assertEquals( + StateMachineProcedure.Flow.HAS_MORE_STATE, + procedure.executeFromState(null, OperatePipeTaskState.VALIDATE_TASK)); + + Assert.assertTrue(procedure.isYieldAfterExecution(null)); + Assert.assertEquals(1, procedure.validateExecutionCount); + + procedure.failValidation = false; + Assert.assertEquals( + StateMachineProcedure.Flow.HAS_MORE_STATE, + procedure.executeFromState(null, OperatePipeTaskState.VALIDATE_TASK)); + + Assert.assertFalse(procedure.isYieldAfterExecution(null)); + Assert.assertEquals(2, procedure.validateExecutionCount); + } + + @Test + public void testRetryStateYieldsOnlyBeforeRetryThreshold() throws Exception { + final TestOperatePipeProcedure procedure = new TestOperatePipeProcedure(); + + final Procedure[] validateSubProcedures = procedure.runOnce(); + Assert.assertEquals(1, validateSubProcedures.length); + Assert.assertSame(procedure, validateSubProcedures[0]); + Assert.assertFalse(procedure.isYieldAfterExecution(null)); + + procedure.failCalculation = true; + final Procedure[] calculateSubProcedures = procedure.runOnce(); + Assert.assertEquals(1, calculateSubProcedures.length); + Assert.assertSame(procedure, calculateSubProcedures[0]); + Assert.assertTrue(procedure.isYieldAfterExecution(null)); + Assert.assertEquals(1, procedure.calculateExecutionCount); + + Assert.assertNull(procedure.runOnce()); + Assert.assertTrue(procedure.hasException()); + Assert.assertFalse(procedure.isYieldAfterExecution(null)); + Assert.assertEquals(2, procedure.calculateExecutionCount); + } + + private static class TestOperatePipeProcedure extends AbstractOperatePipeProcedureV2 { + + private int validateExecutionCount; + private int calculateExecutionCount; + private boolean failValidation; + private boolean failCalculation; + + private TestOperatePipeProcedure() { + pipeTaskInfo = new AtomicReference<>(new PipeTaskInfo()); + } + + private Procedure[] runOnce() throws InterruptedException { + return execute(null); + } + + @Override + protected PipeTaskOperation getOperation() { + return PipeTaskOperation.START_PIPE; + } + + @Override + public boolean executeFromValidateTask( + final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) + throws PipeException { + validateExecutionCount++; + if (failValidation) { + throw new PipeException("retry"); + } + return true; + } + + @Override + public void executeFromCalculateInfoForTask( + final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) { + calculateExecutionCount++; + if (failCalculation) { + throw new RuntimeException("retry"); + } + } + + @Override + public void executeFromWriteConfigNodeConsensus( + final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) { + // Do nothing + } + + @Override + public void executeFromOperateOnDataNodes( + final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) { + // Do nothing + } + + @Override + public void rollbackFromValidateTask( + final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) { + // Do nothing + } + + @Override + public void rollbackFromCalculateInfoForTask( + final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) { + // Do nothing + } + + @Override + public void rollbackFromWriteConfigNodeConsensus( + final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) { + // Do nothing + } + + @Override + public void rollbackFromOperateOnDataNodes( + final org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv env) + throws IOException { + // Do nothing + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java index fcaef4baa302a..d36e852759608 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java @@ -1155,49 +1155,20 @@ public TPushPipeMetaResp pushSinglePipeMeta(TPushSinglePipeMetaReq req) { @Override public TPushPipeMetaResp pushMultiPipeMeta(TPushMultiPipeMetaReq req) { - boolean hasException = false; - // If there is any exception, we use the size of exceptionMessages to record the fail index - List exceptionMessages = new ArrayList<>(); - try { - if (req.isSetPipeNamesToDrop()) { - for (String pipeNameToDrop : req.getPipeNamesToDrop()) { - TPushPipeMetaRespExceptionMessage message = - PipeDataNodeAgent.task().handleDropPipe(pipeNameToDrop); - exceptionMessages.add(message); - if (message != null) { - // If there is any exception, skip the remaining pipes - hasException = true; - break; - } - } - } else if (req.isSetPipeMetas()) { - for (ByteBuffer byteBuffer : req.getPipeMetas()) { - final PipeMeta pipeMeta = PipeMeta.deserialize4TaskAgent(byteBuffer); - TPushPipeMetaRespExceptionMessage message = - PipeDataNodeAgent.task().handleSinglePipeMetaChanges(pipeMeta); - exceptionMessages.add(message); - if (message != null) { - // If there is any exception, skip the remaining pipes - hasException = true; - break; + return PushMultiPipeMetaHelper.pushMultiPipeMeta( + req, + new PushMultiPipeMetaHelper.Handler() { + @Override + public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) { + return PipeDataNodeAgent.task().handleDropPipe(pipeName); } - } - } else { - throw new Exception("Invalid TPushMultiPipeMetaReq"); - } - return hasException - ? new TPushPipeMetaResp() - .setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode())) - .setExceptionMessages(exceptionMessages) - : new TPushPipeMetaResp() - .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); - } catch (Exception e) { - LOGGER.warn("Error occurred when pushing multi pipe meta", e); - return new TPushPipeMetaResp() - .setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode())) - .setExceptionMessages(exceptionMessages); - } + @Override + public TPushPipeMetaRespExceptionMessage handleSinglePipeMeta(final ByteBuffer pipeMeta) { + return PipeDataNodeAgent.task() + .handleSinglePipeMetaChanges(PipeMeta.deserialize4TaskAgent(pipeMeta)); + } + }); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/PushMultiPipeMetaHelper.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/PushMultiPipeMetaHelper.java new file mode 100644 index 0000000000000..73fdbf804de91 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/PushMultiPipeMetaHelper.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.protocol.thrift.impl; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq; +import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; +import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +final class PushMultiPipeMetaHelper { + + private static final Logger LOGGER = LoggerFactory.getLogger(PushMultiPipeMetaHelper.class); + + private PushMultiPipeMetaHelper() { + // Utility class + } + + interface Handler { + + TPushPipeMetaRespExceptionMessage handleDropPipe(String pipeName) throws Exception; + + TPushPipeMetaRespExceptionMessage handleSinglePipeMeta(ByteBuffer pipeMeta) throws Exception; + } + + static TPushPipeMetaResp pushMultiPipeMeta( + final TPushMultiPipeMetaReq req, final Handler handler) { + final List exceptionMessages = new ArrayList<>(); + try { + if (req.isSetPipeNamesToDrop()) { + for (final String pipeNameToDrop : req.getPipeNamesToDrop()) { + final TPushPipeMetaRespExceptionMessage message = handler.handleDropPipe(pipeNameToDrop); + if (message != null) { + exceptionMessages.add(message); + } + } + } else if (req.isSetPipeMetas()) { + for (final ByteBuffer pipeMeta : req.getPipeMetas()) { + final TPushPipeMetaRespExceptionMessage message = handler.handleSinglePipeMeta(pipeMeta); + if (message != null) { + exceptionMessages.add(message); + } + } + } else { + throw new Exception("Invalid TPushMultiPipeMetaReq"); + } + + return exceptionMessages.isEmpty() + ? new TPushPipeMetaResp() + .setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())) + : new TPushPipeMetaResp() + .setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode())) + .setExceptionMessages(exceptionMessages); + } catch (final Exception e) { + LOGGER.warn("Error occurred when pushing multi pipe meta", e); + return new TPushPipeMetaResp() + .setStatus(new TSStatus(TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode())) + .setExceptionMessages(exceptionMessages); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImplPushMultiPipeMetaTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImplPushMultiPipeMetaTest.java new file mode 100644 index 0000000000000..0ae284bce9d7a --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImplPushMultiPipeMetaTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.protocol.thrift.impl; + +import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq; +import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp; +import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class DataNodeInternalRPCServiceImplPushMultiPipeMetaTest { + + @Test + public void testPushMultiPipeMetaContinuesAfterSinglePipeFailure() { + final AtomicInteger pushedPipeCount = new AtomicInteger(0); + final TPushPipeMetaResp resp = + PushMultiPipeMetaHelper.pushMultiPipeMeta( + new TPushMultiPipeMetaReq() + .setPipeMetas( + Arrays.asList( + ByteBuffer.wrap(new byte[] {1}), + ByteBuffer.wrap(new byte[] {2}), + ByteBuffer.wrap(new byte[] {3}))), + new PushMultiPipeMetaHelper.Handler() { + @Override + public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) { + Assert.fail("Unexpected drop pipe request"); + return null; + } + + @Override + public TPushPipeMetaRespExceptionMessage handleSinglePipeMeta( + final ByteBuffer pipeMeta) { + final int index = pushedPipeCount.incrementAndGet(); + return index == 2 ? newExceptionMessage("pipe-2") : null; + } + }); + + Assert.assertEquals(3, pushedPipeCount.get()); + Assert.assertEquals( + TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode(), resp.getStatus().getCode()); + Assert.assertEquals(1, resp.getExceptionMessagesSize()); + Assert.assertEquals("pipe-2", resp.getExceptionMessages().get(0).getPipeName()); + } + + @Test + public void testDropMultiPipeContinuesAfterSinglePipeFailure() { + final List droppedPipeNames = new ArrayList<>(); + final TPushPipeMetaResp resp = + PushMultiPipeMetaHelper.pushMultiPipeMeta( + new TPushMultiPipeMetaReq().setPipeNamesToDrop(Arrays.asList("pipe-1", "pipe-2")), + new PushMultiPipeMetaHelper.Handler() { + @Override + public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) { + droppedPipeNames.add(pipeName); + return "pipe-1".equals(pipeName) ? newExceptionMessage(pipeName) : null; + } + + @Override + public TPushPipeMetaRespExceptionMessage handleSinglePipeMeta( + final ByteBuffer pipeMeta) { + Assert.fail("Unexpected pipe meta request"); + return null; + } + }); + + Assert.assertEquals(Arrays.asList("pipe-1", "pipe-2"), droppedPipeNames); + Assert.assertEquals( + TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode(), resp.getStatus().getCode()); + Assert.assertEquals(1, resp.getExceptionMessagesSize()); + Assert.assertEquals("pipe-1", resp.getExceptionMessages().get(0).getPipeName()); + } + + @Test + public void testPushMultiPipeMetaReturnsSuccessWithoutExceptionMessages() { + final AtomicInteger pushedPipeCount = new AtomicInteger(0); + final TPushPipeMetaResp resp = + PushMultiPipeMetaHelper.pushMultiPipeMeta( + new TPushMultiPipeMetaReq() + .setPipeMetas( + Arrays.asList( + ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}))), + new PushMultiPipeMetaHelper.Handler() { + @Override + public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) { + Assert.fail("Unexpected drop pipe request"); + return null; + } + + @Override + public TPushPipeMetaRespExceptionMessage handleSinglePipeMeta( + final ByteBuffer pipeMeta) { + pushedPipeCount.incrementAndGet(); + return null; + } + }); + + Assert.assertEquals(2, pushedPipeCount.get()); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode()); + Assert.assertFalse(resp.isSetExceptionMessages()); + } + + @Test + public void testPushMultiPipeMetaKeepsCollectedExceptionMessagesWhenLaterPipeThrows() { + final AtomicInteger pushedPipeCount = new AtomicInteger(0); + final TPushPipeMetaResp resp = + PushMultiPipeMetaHelper.pushMultiPipeMeta( + new TPushMultiPipeMetaReq() + .setPipeMetas( + Arrays.asList( + ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}))), + new PushMultiPipeMetaHelper.Handler() { + @Override + public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) { + Assert.fail("Unexpected drop pipe request"); + return null; + } + + @Override + public TPushPipeMetaRespExceptionMessage handleSinglePipeMeta( + final ByteBuffer pipeMeta) { + final int index = pushedPipeCount.incrementAndGet(); + if (index == 1) { + return newExceptionMessage("pipe-1"); + } + throw new RuntimeException("boom"); + } + }); + + Assert.assertEquals(2, pushedPipeCount.get()); + Assert.assertEquals( + TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode(), resp.getStatus().getCode()); + Assert.assertEquals(1, resp.getExceptionMessagesSize()); + Assert.assertEquals("pipe-1", resp.getExceptionMessages().get(0).getPipeName()); + } + + @Test + public void testDropMultiPipeMetaReturnsSuccessWithoutExceptionMessages() { + final List droppedPipeNames = new ArrayList<>(); + final TPushPipeMetaResp resp = + PushMultiPipeMetaHelper.pushMultiPipeMeta( + new TPushMultiPipeMetaReq().setPipeNamesToDrop(Arrays.asList("pipe-1", "pipe-2")), + new PushMultiPipeMetaHelper.Handler() { + @Override + public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) { + droppedPipeNames.add(pipeName); + return null; + } + + @Override + public TPushPipeMetaRespExceptionMessage handleSinglePipeMeta( + final ByteBuffer pipeMeta) { + Assert.fail("Unexpected pipe meta request"); + return null; + } + }); + + Assert.assertEquals(Arrays.asList("pipe-1", "pipe-2"), droppedPipeNames); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), resp.getStatus().getCode()); + Assert.assertFalse(resp.isSetExceptionMessages()); + } + + @Test + public void testInvalidPushMultiPipeMetaRequestReturnsErrorWithoutCallingHandler() { + final TPushPipeMetaResp resp = + PushMultiPipeMetaHelper.pushMultiPipeMeta( + new TPushMultiPipeMetaReq(), + new PushMultiPipeMetaHelper.Handler() { + @Override + public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) { + Assert.fail("Unexpected drop pipe request"); + return null; + } + + @Override + public TPushPipeMetaRespExceptionMessage handleSinglePipeMeta( + final ByteBuffer pipeMeta) { + Assert.fail("Unexpected pipe meta request"); + return null; + } + }); + + Assert.assertEquals( + TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode(), resp.getStatus().getCode()); + Assert.assertEquals(0, resp.getExceptionMessagesSize()); + } + + @Test + public void testPushMultiPipeMetaStopsOnUnexpectedException() { + final AtomicInteger pushedPipeCount = new AtomicInteger(0); + final TPushPipeMetaResp resp = + PushMultiPipeMetaHelper.pushMultiPipeMeta( + new TPushMultiPipeMetaReq() + .setPipeMetas( + Arrays.asList( + ByteBuffer.wrap(new byte[] {1}), ByteBuffer.wrap(new byte[] {2}))), + new PushMultiPipeMetaHelper.Handler() { + @Override + public TPushPipeMetaRespExceptionMessage handleDropPipe(final String pipeName) { + Assert.fail("Unexpected drop pipe request"); + return null; + } + + @Override + public TPushPipeMetaRespExceptionMessage handleSinglePipeMeta( + final ByteBuffer pipeMeta) { + pushedPipeCount.incrementAndGet(); + throw new RuntimeException("boom"); + } + }); + + Assert.assertEquals(1, pushedPipeCount.get()); + Assert.assertEquals( + TSStatusCode.PIPE_PUSH_META_ERROR.getStatusCode(), resp.getStatus().getCode()); + Assert.assertEquals(0, resp.getExceptionMessagesSize()); + } + + private static TPushPipeMetaRespExceptionMessage newExceptionMessage(final String pipeName) { + return new TPushPipeMetaRespExceptionMessage(pipeName, "failed", 1L); + } +}