diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java index b33bb7d4d6d9..15167a57ab33 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/pipe/task/PipeTableResp.java @@ -207,6 +207,9 @@ public TShowPipeResp convertToTShowPipeResp() { runtimeMeta.getNodeId2PipeRuntimeExceptionMap().entrySet()) { final Integer nodeId = entry.getKey(); final PipeRuntimeException e = entry.getValue(); + if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) { + continue; + } final String exceptionMessage = DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage(); @@ -219,6 +222,9 @@ public TShowPipeResp convertToTShowPipeResp() { runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) { final Integer regionId = entry.getKey(); for (final PipeRuntimeException e : entry.getValue().getExceptionMessages()) { + if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) { + continue; + } final String exceptionMessage = DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage(); pipeExceptionMessage2RegionIdsMap diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java index 2a2856714935..0cfc1251afcd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java @@ -248,24 +248,21 @@ private void parseHeartbeatAndSaveMetaChangeLocally( // Update runtime exception final PipeTaskMeta pipeTaskMetaFromCoordinator = runtimeMetaFromCoordinator.getValue(); + final PipeRuntimeMeta pipeRuntimeMeta = pipeMetaFromCoordinator.getRuntimeMeta(); pipeTaskMetaFromCoordinator.clearExceptionMessages(); for (final PipeRuntimeException exception : runtimeMetaFromAgent.getExceptionMessages()) { - - // Do not judge the exception's clear time to avoid the restart process - // being ended after the failure of some pipe + if (exception.getTimeStamp() <= pipeRuntimeMeta.getExceptionsClearTime()) { + needPushPipeMetaToDataNodes.set(true); + continue; + } pipeTaskMetaFromCoordinator.trackExceptionMessage(exception); if (exception instanceof PipeRuntimeCriticalException) { final String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName(); - if (!pipeMetaFromCoordinator - .getRuntimeMeta() - .getStatus() - .get() - .equals(PipeStatus.STOPPED)) { - PipeRuntimeMeta runtimeMeta = pipeMetaFromCoordinator.getRuntimeMeta(); - runtimeMeta.getStatus().set(PipeStatus.STOPPED); - runtimeMeta.setIsStoppedByRuntimeException(true); + if (!pipeRuntimeMeta.getStatus().get().equals(PipeStatus.STOPPED)) { + pipeRuntimeMeta.getStatus().set(PipeStatus.STOPPED); + pipeRuntimeMeta.setIsStoppedByRuntimeException(true); needWriteConsensusOnConfigNodes.set(true); needPushPipeMetaToDataNodes.set(false); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java index 7fd1034f3213..b30194441722 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java @@ -974,7 +974,19 @@ private boolean isStoppedByRuntimeExceptionInternal( public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(final String pipeName) { acquireWriteLock(); try { - clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeName); + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeName, System.currentTimeMillis()); + } finally { + releaseWriteLock(); + } + } + + public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse( + final String pipeName, final long exceptionsClearTime) { + acquireWriteLock(); + try { + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeName, exceptionsClearTime); } finally { releaseWriteLock(); } @@ -990,20 +1002,37 @@ public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse( } } + public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse( + final String pipeName, final boolean isTableModel, final long exceptionsClearTime) { + acquireWriteLock(); + try { + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeName, isTableModel, exceptionsClearTime); + } finally { + releaseWriteLock(); + } + } + private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( - final String pipeName) { + final String pipeName, final long exceptionsClearTime) { clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( - pipeMetaKeeper.getPipeMeta(pipeName)); + pipeMetaKeeper.getPipeMeta(pipeName), exceptionsClearTime); } private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( final String pipeName, final boolean isTableModel) { clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( - pipeMetaKeeper.getPipeMeta(pipeName, isTableModel)); + pipeMetaKeeper.getPipeMeta(pipeName, isTableModel), System.currentTimeMillis()); } private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( - final PipeMeta pipeMeta) { + final String pipeName, final boolean isTableModel, final long exceptionsClearTime) { + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeMetaKeeper.getPipeMeta(pipeName, isTableModel), exceptionsClearTime); + } + + private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + final PipeMeta pipeMeta, final long exceptionsClearTime) { if (pipeMeta == null) { return; } @@ -1013,7 +1042,7 @@ private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( // To avoid unnecessary retries, we set the isStoppedByRuntimeException flag to false runtimeMeta.setIsStoppedByRuntimeException(false); - runtimeMeta.setExceptionsClearTime(System.currentTimeMillis()); + runtimeMeta.setExceptionsClearTime(exceptionsClearTime); final Map exceptionMap = runtimeMeta.getNodeId2PipeRuntimeExceptionMap(); @@ -1140,14 +1169,17 @@ public boolean autoRestart() { */ private boolean autoRestartInternal() { final AtomicBoolean needRestart = new AtomicBoolean(false); + final long exceptionsClearTime = System.currentTimeMillis(); final List pipeToRestart = new LinkedList<>(); pipeMetaKeeper .getPipeMetaList() .forEach( pipeMeta -> { - if (pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException()) { - pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + if (runtimeMeta.getIsStoppedByRuntimeException()) { + runtimeMeta.setExceptionsClearTime(exceptionsClearTime); + runtimeMeta.getStatus().set(PipeStatus.RUNNING); needRestart.set(true); pipeToRestart.add(pipeMeta.getStaticMeta().getPipeName()); @@ -1181,8 +1213,11 @@ private void handleSuccessfulRestartInternal() { .getPipeMetaList() .forEach( pipeMeta -> { - if (pipeMeta.getRuntimeMeta().getStatus().get().equals(PipeStatus.RUNNING)) { - clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeMeta); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + if (runtimeMeta.getStatus().get().equals(PipeStatus.RUNNING) + && runtimeMeta.getIsStoppedByRuntimeException()) { + clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal( + pipeMeta, runtimeMeta.getExceptionsClearTime()); } }); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java index 20b44d9bbf06..a3d0fb01d696 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java @@ -20,8 +20,10 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; +import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2; import org.apache.iotdb.confignode.i18n.ConfigNodeMessages; import org.apache.iotdb.confignode.i18n.ProcedureMessages; @@ -40,6 +42,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 { @@ -120,6 +124,11 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) thro public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException { LOGGER.info(ProcedureMessages.STARTPIPEPROCEDUREV2_EXECUTEFROMOPERATEONDATANODES, pipeName); + final long exceptionsClearTime = System.currentTimeMillis(); + final boolean isStoppedByRuntimeException = + isTableModelSet + ? pipeTaskInfo.get().isStoppedByRuntimeException(pipeName, isTableModel) + : pipeTaskInfo.get().isStoppedByRuntimeException(pipeName); final PipeStaticMeta pipeStaticMeta = (isTableModelSet ? pipeTaskInfo.get().getPipeMetaByPipeName(pipeName, isTableModel) @@ -141,9 +150,38 @@ public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOE if (isTableModelSet) { pipeTaskInfo .get() - .clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, isTableModel); + .clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse( + pipeName, isTableModel, exceptionsClearTime); } else { - pipeTaskInfo.get().clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName); + pipeTaskInfo + .get() + .clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, exceptionsClearTime); + } + + if (isStoppedByRuntimeException) { + writePipeMetaChangesToConfigNodeConsensus(env); + } + } + + private void writePipeMetaChangesToConfigNodeConsensus(final ConfigNodeProcedureEnv env) { + final List pipeMetaList = new ArrayList<>(); + for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) { + pipeMetaList.add(pipeMeta); + } + + TSStatus response; + try { + response = + env.getConfigManager() + .getConsensusManager() + .write(new PipeHandleMetaChangePlan(pipeMetaList)); + } catch (ConsensusException e) { + LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e); + response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()); + response.setMessage(e.getMessage()); + } + if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + throw new PipeException(response.getMessage()); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java index 1021d60768b5..d77992c18be9 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParserTest.java @@ -21,9 +21,11 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator; @@ -37,6 +39,7 @@ import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -44,7 +47,10 @@ import java.lang.reflect.Field; import java.util.Collections; import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -60,6 +66,8 @@ public class PipeHeartbeatParserTest { + private static final int DATA_NODE_ID = 1; + private boolean originalSeparatedPipeHeartbeatEnabled; @Before @@ -128,6 +136,84 @@ public void testParseHeartbeatKeepsPendingFlagsWhenProcedureSubmissionFails() th verify(context.procedureManager, times(2)).pipeHandleMetaChange(true, false); } + @Test + public void testParseHeartbeatIgnoresExceptionsBeforeClearTime() throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final String pipeName = "staleExceptionPipe"; + final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo(); + createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING); + + final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + final PipeTaskMeta coordinatorTaskMeta = + runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID); + coordinatorTaskMeta.trackExceptionMessage( + new PipeRuntimeCriticalException("stale failure", 100L)); + + pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 200L); + + final PipeTaskMeta agentTaskMeta = + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); + agentTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("stale failure", 100L)); + final ConcurrentMap agentPipeTasks = new ConcurrentHashMap<>(); + agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta); + final PipeHeartbeat heartbeat = + new PipeHeartbeat( + Collections.singletonList( + new PipeMeta(pipeMeta.getStaticMeta(), new PipeRuntimeMeta(agentPipeTasks)) + .serialize()), + Collections.singletonList(false), + Collections.singletonList(0L), + Collections.singletonList(0D), + null); + + final ParserTestContext context = createParserTestContext(1, pipeTaskInfo); + context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat); + + Assert.assertFalse(coordinatorTaskMeta.hasExceptionMessages()); + Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get()); + verify(context.procedureManager, times(1)).pipeHandleMetaChange(false, true); + } + + @Test + public void testParseHeartbeatTracksExceptionsAfterClearTime() throws Exception { + CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); + + final String pipeName = "freshExceptionPipe"; + final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo(); + createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING); + + final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName); + final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta(); + final PipeTaskMeta coordinatorTaskMeta = + runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID); + pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 200L); + + final PipeTaskMeta agentTaskMeta = + new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); + agentTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("fresh failure", 300L)); + final ConcurrentMap agentPipeTasks = new ConcurrentHashMap<>(); + agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta); + final PipeHeartbeat heartbeat = + new PipeHeartbeat( + Collections.singletonList( + new PipeMeta(pipeMeta.getStaticMeta(), new PipeRuntimeMeta(agentPipeTasks)) + .serialize()), + Collections.singletonList(false), + Collections.singletonList(0L), + Collections.singletonList(0D), + null); + + final ParserTestContext context = createParserTestContext(1, pipeTaskInfo); + context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat); + + Assert.assertTrue(coordinatorTaskMeta.hasExceptionMessages()); + Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get()); + Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException()); + verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, false); + } + @Test public void testParseHeartbeatRecordsPipeDegradedStatus() throws Exception { CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false); @@ -230,6 +316,37 @@ private ParserTestContext createParserTestContext( return new ParserTestContext(new PipeHeartbeatParser(configManager), procedureManager); } + private void createPipe( + final PipeTaskInfo pipeTaskInfo, final String pipeName, final PipeStatus initialStatus) { + final Map extractorAttributes = new HashMap<>(); + extractorAttributes.put("extractor", "iotdb-source"); + final Map processorAttributes = new HashMap<>(); + processorAttributes.put("processor", "do-nothing-processor"); + final Map connectorAttributes = new HashMap<>(); + connectorAttributes.put("connector", "iotdb-thrift-sink"); + + final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID); + final ConcurrentMap pipeTasks = new ConcurrentHashMap<>(); + pipeTasks.put(DATA_NODE_ID, pipeTaskMeta); + final PipeStaticMeta pipeStaticMeta = + new PipeStaticMeta( + pipeName, + System.currentTimeMillis(), + extractorAttributes, + processorAttributes, + connectorAttributes); + final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); + pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta)); + + if (PipeStatus.RUNNING.equals(initialStatus)) { + pipeTaskInfo + .getPipeMetaByPipeName(pipeName) + .getRuntimeMeta() + .getStatus() + .set(PipeStatus.RUNNING); + } + } + private PipeHeartbeat createPipeHeartbeat(final PipeMeta pipeMeta, final boolean isDegraded) throws Exception { return new PipeHeartbeat( diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java index 58d59b96ae29..f938bdbd85f4 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfoAutoRestartTest.java @@ -262,6 +262,34 @@ public void testEnrichLoadedPipeMetasWithRootUserForCompatibility() { rootPassword, sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); } + @Test + public void testHandleSuccessfulRestartClearsRuntimeExceptionMessages() { + final String pipeName = "restartPipe"; + createPipe(pipeName, PipeStatus.RUNNING); + + Assert.assertTrue( + pipeTaskInfo.recordDataNodePushPipeMetaExceptions(createErrorRespMap(pipeName))); + + final PipeRuntimeMeta runtimeMeta = + pipeTaskInfo.getPipeMetaByPipeName(pipeName).getRuntimeMeta(); + Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get()); + Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException()); + Assert.assertFalse(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().isEmpty()); + + Assert.assertTrue(pipeTaskInfo.autoRestart()); + final long exceptionsClearTime = runtimeMeta.getExceptionsClearTime(); + Assert.assertTrue( + runtimeMeta.getNodeId2PipeRuntimeExceptionMap().values().stream() + .allMatch(exception -> exception.getTimeStamp() <= exceptionsClearTime)); + + pipeTaskInfo.handleSuccessfulRestart(); + + Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get()); + Assert.assertFalse(runtimeMeta.getIsStoppedByRuntimeException()); + Assert.assertTrue(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().isEmpty()); + Assert.assertEquals(exceptionsClearTime, runtimeMeta.getExceptionsClearTime()); + } + @Test public void testLegacyStatusAndDropPlansTargetTableOnlyPipeByName() { final String pipeName = "legacyTablePipe"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java index b404a2ce00be..94bfc6deac2b 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java @@ -313,6 +313,8 @@ private void executeSinglePipeRuntimeMetaChanges( } } + syncRuntimeExceptionClearTime(runtimeMetaFromCoordinator, runtimeMetaInAgent); + // 2. Handle pipe runtime meta status changes final PipeStatus statusFromCoordinator = runtimeMetaFromCoordinator.getStatus().get(); final PipeStatus statusInAgent = runtimeMetaInAgent.getStatus().get(); @@ -355,6 +357,12 @@ private void executeSinglePipeRuntimeMetaChanges( } } + private void syncRuntimeExceptionClearTime( + final PipeRuntimeMeta runtimeMetaFromCoordinator, final PipeRuntimeMeta runtimeMetaInAgent) { + runtimeMetaInAgent.setExceptionsClearTime(runtimeMetaFromCoordinator.getExceptionsClearTime()); + runtimeMetaInAgent.clearExceptionMessagesBefore(runtimeMetaInAgent.getExceptionsClearTime()); + } + protected abstract void thawRate(final String pipeName, final long creationTime); protected abstract void freezeRate(final String pipeName, final long creationTime); @@ -563,6 +571,11 @@ protected boolean createPipe(final PipeMeta pipeMetaFromCoordinator) throws Ille pipeMetaKeeper.addPipeMeta(pipeMetaFromCoordinator); + pipeMetaFromCoordinator + .getRuntimeMeta() + .clearExceptionMessagesBefore( + pipeMetaFromCoordinator.getRuntimeMeta().getExceptionsClearTime()); + // If the pipe status from coordinator is RUNNING, we will start the pipe later. return needToStartPipe; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java index 1f28a24dd604..2aa1b65638fc 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeRuntimeMeta.java @@ -128,6 +128,15 @@ public void setExceptionsClearTime(long exceptionsClearTime) { } } + public void clearExceptionMessagesBefore(final long exceptionsClearTime) { + nodeId2PipeRuntimeExceptionMap + .entrySet() + .removeIf(entry -> entry.getValue().getTimeStamp() <= exceptionsClearTime); + consensusGroupId2TaskMetaMap + .values() + .forEach(pipeTaskMeta -> pipeTaskMeta.clearExceptionMessagesBefore(exceptionsClearTime)); + } + public boolean getIsStoppedByRuntimeException() { return isStoppedByRuntimeException.get(); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java index 9584ca8cbabd..e9939d7b2c6f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeTaskMeta.java @@ -134,6 +134,10 @@ public synchronized void clearExceptionMessages() { exceptionMessages.clear(); } + public synchronized void clearExceptionMessagesBefore(final long exceptionsClearTime) { + exceptionMessages.removeIf(exception -> exception.getTimeStamp() <= exceptionsClearTime); + } + public synchronized void serialize(final OutputStream outputStream) throws IOException { progressIndex.get().serialize(outputStream); diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java index 1e29c96e0905..fa0ac2c1a37e 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/task/PipeMetaDeSerTest.java @@ -151,4 +151,30 @@ public void test() throws IOException { final PipeMeta pipeMeta1 = PipeMeta.deserialize4Coordinator(byteBuffer); Assert.assertEquals(pipeMeta, pipeMeta1); } + + @Test + public void testClearExceptionMessagesBeforeClearTime() { + final PipeTaskMeta staleTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); + staleTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("stale", 100L)); + final PipeTaskMeta freshTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); + freshTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("fresh", 300L)); + + final ConcurrentHashMap taskMetaMap = new ConcurrentHashMap<>(); + taskMetaMap.put(1, staleTaskMeta); + taskMetaMap.put(2, freshTaskMeta); + final PipeRuntimeMeta runtimeMeta = new PipeRuntimeMeta(taskMetaMap); + runtimeMeta + .getNodeId2PipeRuntimeExceptionMap() + .put(1, new PipeRuntimeCriticalException("stale node", 100L)); + runtimeMeta + .getNodeId2PipeRuntimeExceptionMap() + .put(2, new PipeRuntimeCriticalException("fresh node", 300L)); + + runtimeMeta.clearExceptionMessagesBefore(200L); + + Assert.assertFalse(staleTaskMeta.hasExceptionMessages()); + Assert.assertTrue(freshTaskMeta.hasExceptionMessages()); + Assert.assertFalse(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().containsKey(1)); + Assert.assertTrue(runtimeMeta.getNodeId2PipeRuntimeExceptionMap().containsKey(2)); + } }