Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ public TShowPipeResp convertToTShowPipeResp() {
runtimeMeta.getNodeId2PipeRuntimeExceptionMap().entrySet()) {
final Integer nodeId = entry.getKey();
final PipeRuntimeException e = entry.getValue();
if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) {
continue;
}
final String exceptionMessage =
DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage();

Expand All @@ -219,6 +222,9 @@ public TShowPipeResp convertToTShowPipeResp() {
runtimeMeta.getConsensusGroupId2TaskMetaMap().entrySet()) {
final Integer regionId = entry.getKey();
for (final PipeRuntimeException e : entry.getValue().getExceptionMessages()) {
if (e.getTimeStamp() <= runtimeMeta.getExceptionsClearTime()) {
continue;
}
final String exceptionMessage =
DateTimeUtils.convertLongToDate(e.getTimeStamp(), "ms") + ", " + e.getMessage();
pipeExceptionMessage2RegionIdsMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
return expectedNodeCount;
}

private void parseHeartbeatAndSaveMetaChangeLocally(

Check warning on line 140 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatParser.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 157 to 64, Complexity from 15 to 14, Nesting Level from 8 to 2, Number of Variables from 23 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8S-h0gSu40kaeeT2VR&open=AZ8S-h0gSu40kaeeT2VR&pullRequest=18061
final AtomicReference<PipeTaskInfo> pipeTaskInfo,
final int nodeId,
final PipeHeartbeat pipeHeartbeat) {
Expand Down Expand Up @@ -248,24 +248,21 @@

// Update runtime exception
final PipeTaskMeta pipeTaskMetaFromCoordinator = runtimeMetaFromCoordinator.getValue();
final PipeRuntimeMeta pipeRuntimeMeta = pipeMetaFromCoordinator.getRuntimeMeta();
pipeTaskMetaFromCoordinator.clearExceptionMessages();
for (final PipeRuntimeException exception : runtimeMetaFromAgent.getExceptionMessages()) {

// Do not judge the exception's clear time to avoid the restart process
// being ended after the failure of some pipe
if (exception.getTimeStamp() <= pipeRuntimeMeta.getExceptionsClearTime()) {
needPushPipeMetaToDataNodes.set(true);
continue;
}

pipeTaskMetaFromCoordinator.trackExceptionMessage(exception);

if (exception instanceof PipeRuntimeCriticalException) {
final String pipeName = pipeMetaFromCoordinator.getStaticMeta().getPipeName();
if (!pipeMetaFromCoordinator
.getRuntimeMeta()
.getStatus()
.get()
.equals(PipeStatus.STOPPED)) {
PipeRuntimeMeta runtimeMeta = pipeMetaFromCoordinator.getRuntimeMeta();
runtimeMeta.getStatus().set(PipeStatus.STOPPED);
runtimeMeta.setIsStoppedByRuntimeException(true);
if (!pipeRuntimeMeta.getStatus().get().equals(PipeStatus.STOPPED)) {
pipeRuntimeMeta.getStatus().set(PipeStatus.STOPPED);
pipeRuntimeMeta.setIsStoppedByRuntimeException(true);

needWriteConsensusOnConfigNodes.set(true);
needPushPipeMetaToDataNodes.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,19 @@ private boolean isStoppedByRuntimeExceptionInternal(
public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(final String pipeName) {
acquireWriteLock();
try {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeName);
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeName, System.currentTimeMillis());
} finally {
releaseWriteLock();
}
}

public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(
final String pipeName, final long exceptionsClearTime) {
acquireWriteLock();
try {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeName, exceptionsClearTime);
} finally {
releaseWriteLock();
}
Expand All @@ -990,20 +1002,37 @@ public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(
}
}

public void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(
final String pipeName, final boolean isTableModel, final long exceptionsClearTime) {
acquireWriteLock();
try {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeName, isTableModel, exceptionsClearTime);
} finally {
releaseWriteLock();
}
}

private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
final String pipeName) {
final String pipeName, final long exceptionsClearTime) {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeMetaKeeper.getPipeMeta(pipeName));
pipeMetaKeeper.getPipeMeta(pipeName), exceptionsClearTime);
}

private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
final String pipeName, final boolean isTableModel) {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeMetaKeeper.getPipeMeta(pipeName, isTableModel));
pipeMetaKeeper.getPipeMeta(pipeName, isTableModel), System.currentTimeMillis());
}

private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
final PipeMeta pipeMeta) {
final String pipeName, final boolean isTableModel, final long exceptionsClearTime) {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeMetaKeeper.getPipeMeta(pipeName, isTableModel), exceptionsClearTime);
}

private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
final PipeMeta pipeMeta, final long exceptionsClearTime) {
if (pipeMeta == null) {
return;
}
Expand All @@ -1013,7 +1042,7 @@ private void clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
// To avoid unnecessary retries, we set the isStoppedByRuntimeException flag to false
runtimeMeta.setIsStoppedByRuntimeException(false);

runtimeMeta.setExceptionsClearTime(System.currentTimeMillis());
runtimeMeta.setExceptionsClearTime(exceptionsClearTime);

final Map<Integer, PipeRuntimeException> exceptionMap =
runtimeMeta.getNodeId2PipeRuntimeExceptionMap();
Expand Down Expand Up @@ -1140,14 +1169,17 @@ public boolean autoRestart() {
*/
private boolean autoRestartInternal() {
final AtomicBoolean needRestart = new AtomicBoolean(false);
final long exceptionsClearTime = System.currentTimeMillis();
final List<String> pipeToRestart = new LinkedList<>();

pipeMetaKeeper
.getPipeMetaList()
.forEach(
pipeMeta -> {
if (pipeMeta.getRuntimeMeta().getIsStoppedByRuntimeException()) {
pipeMeta.getRuntimeMeta().getStatus().set(PipeStatus.RUNNING);
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
if (runtimeMeta.getIsStoppedByRuntimeException()) {
runtimeMeta.setExceptionsClearTime(exceptionsClearTime);
runtimeMeta.getStatus().set(PipeStatus.RUNNING);

needRestart.set(true);
pipeToRestart.add(pipeMeta.getStaticMeta().getPipeName());
Expand Down Expand Up @@ -1181,8 +1213,11 @@ private void handleSuccessfulRestartInternal() {
.getPipeMetaList()
.forEach(
pipeMeta -> {
if (pipeMeta.getRuntimeMeta().getStatus().get().equals(PipeStatus.RUNNING)) {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(pipeMeta);
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
if (runtimeMeta.getStatus().get().equals(PipeStatus.RUNNING)
&& runtimeMeta.getIsStoppedByRuntimeException()) {
clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalseInternal(
pipeMeta, runtimeMeta.getExceptionsClearTime());
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.confignode.consensus.request.write.pipe.runtime.PipeHandleMetaChangePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.task.SetPipeStatusPlanV2;
import org.apache.iotdb.confignode.i18n.ConfigNodeMessages;
import org.apache.iotdb.confignode.i18n.ProcedureMessages;
Expand All @@ -40,6 +42,8 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

public class StartPipeProcedureV2 extends AbstractOperatePipeProcedureV2 {
Expand Down Expand Up @@ -120,6 +124,11 @@ public void executeFromWriteConfigNodeConsensus(ConfigNodeProcedureEnv env) thro
public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOException {
LOGGER.info(ProcedureMessages.STARTPIPEPROCEDUREV2_EXECUTEFROMOPERATEONDATANODES, pipeName);

final long exceptionsClearTime = System.currentTimeMillis();
final boolean isStoppedByRuntimeException =
isTableModelSet
? pipeTaskInfo.get().isStoppedByRuntimeException(pipeName, isTableModel)
: pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
final PipeStaticMeta pipeStaticMeta =
(isTableModelSet
? pipeTaskInfo.get().getPipeMetaByPipeName(pipeName, isTableModel)
Expand All @@ -141,9 +150,38 @@ public void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env) throws IOE
if (isTableModelSet) {
pipeTaskInfo
.get()
.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, isTableModel);
.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(
pipeName, isTableModel, exceptionsClearTime);
} else {
pipeTaskInfo.get().clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName);
pipeTaskInfo
.get()
.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, exceptionsClearTime);
}

if (isStoppedByRuntimeException) {
writePipeMetaChangesToConfigNodeConsensus(env);
}
}

private void writePipeMetaChangesToConfigNodeConsensus(final ConfigNodeProcedureEnv env) {
final List<PipeMeta> pipeMetaList = new ArrayList<>();
for (final PipeMeta pipeMeta : pipeTaskInfo.get().getPipeMetaList()) {
pipeMetaList.add(pipeMeta);
}

TSStatus response;
try {
response =
env.getConfigManager()
.getConsensusManager()
.write(new PipeHandleMetaChangePlan(pipeMetaList));
} catch (ConsensusException e) {
LOGGER.warn(ConfigNodeMessages.FAILED_IN_THE_WRITE_API_EXECUTING_THE_CONSENSUS_LAYER_DUE, e);
response = new TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
response.setMessage(e.getMessage());
}
if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(response.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInCoordinator;
Expand All @@ -37,14 +39,18 @@
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;

import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -60,6 +66,8 @@

public class PipeHeartbeatParserTest {

private static final int DATA_NODE_ID = 1;

private boolean originalSeparatedPipeHeartbeatEnabled;

@Before
Expand Down Expand Up @@ -128,6 +136,84 @@ public void testParseHeartbeatKeepsPendingFlagsWhenProcedureSubmissionFails() th
verify(context.procedureManager, times(2)).pipeHandleMetaChange(true, false);
}

@Test
public void testParseHeartbeatIgnoresExceptionsBeforeClearTime() throws Exception {
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);

final String pipeName = "staleExceptionPipe";
final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo();
createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING);

final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName);
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
final PipeTaskMeta coordinatorTaskMeta =
runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID);
coordinatorTaskMeta.trackExceptionMessage(
new PipeRuntimeCriticalException("stale failure", 100L));

pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 200L);

final PipeTaskMeta agentTaskMeta =
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
agentTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("stale failure", 100L));
final ConcurrentMap<Integer, PipeTaskMeta> agentPipeTasks = new ConcurrentHashMap<>();
agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta);
final PipeHeartbeat heartbeat =
new PipeHeartbeat(
Collections.singletonList(
new PipeMeta(pipeMeta.getStaticMeta(), new PipeRuntimeMeta(agentPipeTasks))
.serialize()),
Collections.singletonList(false),
Collections.singletonList(0L),
Collections.singletonList(0D),
null);

final ParserTestContext context = createParserTestContext(1, pipeTaskInfo);
context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat);

Assert.assertFalse(coordinatorTaskMeta.hasExceptionMessages());
Assert.assertEquals(PipeStatus.RUNNING, runtimeMeta.getStatus().get());
verify(context.procedureManager, times(1)).pipeHandleMetaChange(false, true);
}

@Test
public void testParseHeartbeatTracksExceptionsAfterClearTime() throws Exception {
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);

final String pipeName = "freshExceptionPipe";
final PipeTaskInfo pipeTaskInfo = new PipeTaskInfo();
createPipe(pipeTaskInfo, pipeName, PipeStatus.RUNNING);

final PipeMeta pipeMeta = pipeTaskInfo.getPipeMetaByPipeName(pipeName);
final PipeRuntimeMeta runtimeMeta = pipeMeta.getRuntimeMeta();
final PipeTaskMeta coordinatorTaskMeta =
runtimeMeta.getConsensusGroupId2TaskMetaMap().get(DATA_NODE_ID);
pipeTaskInfo.clearExceptionsAndSetIsStoppedByRuntimeExceptionToFalse(pipeName, 200L);

final PipeTaskMeta agentTaskMeta =
new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
agentTaskMeta.trackExceptionMessage(new PipeRuntimeCriticalException("fresh failure", 300L));
final ConcurrentMap<Integer, PipeTaskMeta> agentPipeTasks = new ConcurrentHashMap<>();
agentPipeTasks.put(DATA_NODE_ID, agentTaskMeta);
final PipeHeartbeat heartbeat =
new PipeHeartbeat(
Collections.singletonList(
new PipeMeta(pipeMeta.getStaticMeta(), new PipeRuntimeMeta(agentPipeTasks))
.serialize()),
Collections.singletonList(false),
Collections.singletonList(0L),
Collections.singletonList(0D),
null);

final ParserTestContext context = createParserTestContext(1, pipeTaskInfo);
context.parser.parseHeartbeat(DATA_NODE_ID, heartbeat);

Assert.assertTrue(coordinatorTaskMeta.hasExceptionMessages());
Assert.assertEquals(PipeStatus.STOPPED, runtimeMeta.getStatus().get());
Assert.assertTrue(runtimeMeta.getIsStoppedByRuntimeException());
verify(context.procedureManager, times(1)).pipeHandleMetaChange(true, false);
}

@Test
public void testParseHeartbeatRecordsPipeDegradedStatus() throws Exception {
CommonDescriptor.getInstance().getConfig().setSeperatedPipeHeartbeatEnabled(false);
Expand Down Expand Up @@ -230,6 +316,37 @@ private ParserTestContext createParserTestContext(
return new ParserTestContext(new PipeHeartbeatParser(configManager), procedureManager);
}

private void createPipe(
final PipeTaskInfo pipeTaskInfo, final String pipeName, final PipeStatus initialStatus) {
final Map<String, String> extractorAttributes = new HashMap<>();
extractorAttributes.put("extractor", "iotdb-source");
final Map<String, String> processorAttributes = new HashMap<>();
processorAttributes.put("processor", "do-nothing-processor");
final Map<String, String> connectorAttributes = new HashMap<>();
connectorAttributes.put("connector", "iotdb-thrift-sink");

final PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, DATA_NODE_ID);
final ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(DATA_NODE_ID, pipeTaskMeta);
final PipeStaticMeta pipeStaticMeta =
new PipeStaticMeta(
pipeName,
System.currentTimeMillis(),
extractorAttributes,
processorAttributes,
connectorAttributes);
final PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
pipeTaskInfo.createPipe(new CreatePipePlanV2(pipeStaticMeta, pipeRuntimeMeta));

if (PipeStatus.RUNNING.equals(initialStatus)) {
pipeTaskInfo
.getPipeMetaByPipeName(pipeName)
.getRuntimeMeta()
.getStatus()
.set(PipeStatus.RUNNING);
}
}

private PipeHeartbeat createPipeHeartbeat(final PipeMeta pipeMeta, final boolean isDegraded)
throws Exception {
return new PipeHeartbeat(
Expand Down
Loading
Loading