diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index b26283f52cbcf..1ad4a7e248190 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -286,8 +286,8 @@ public final class DataNodePipeMessages { + "PipeInsertNodeTabletInsertionEvent({}) overlaps with the time range: [{}, {}]. " + "Returning true to ensure data integrity."; public static final String FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE = - "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count " - + "is {}, will keep retrying."; + "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, " + + "will release parser memory and retry the TsFile event later."; public static final String FAILED_TO_BUILD_TABLET = "Failed to build tablet"; public static final String FAILED_TO_CHECK_NEXT = "Failed to check next"; public static final String FAILED_TO_CLOSE_TSFILEREADER = "Failed to close TsFileReader"; diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 08804a755ce4f..7043d38b08e79 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -274,8 +274,8 @@ public final class DataNodePipeMessages { "determining the event time of PipeInsertNodeTabletInsertionEvent({}) overlaps with the " + "time range: [{}, {}]. Returning true to ensure data integrity 时发生异常"; public static final String FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE = - "{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count " - + "is {}, will keep retrying."; + "{}:为解析 TsFile {} 分配内存失败,tablet 事件编号 {}," + + "将释放解析器内存并稍后重试该 TsFile 事件。"; public static final String FAILED_TO_BUILD_TABLET = "构建 tablet 失败"; public static final String FAILED_TO_CHECK_NEXT = "check next 失败"; public static final String FAILED_TO_CLOSE_TSFILEREADER = "关闭 TsFileReader 失败"; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java index 6a9352deb7706..dda94917d42fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.pipe.agent.task; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.concurrent.IoTThreadFactory; @@ -39,7 +38,6 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; -import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent; @@ -47,10 +45,8 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; -import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; @@ -59,7 +55,6 @@ import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeBuilder; import org.apache.iotdb.db.pipe.agent.task.builder.PipeDataNodeTaskBuilder; -import org.apache.iotdb.db.pipe.agent.task.subtask.sink.PipeSinkSubtaskManager; import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics; import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; @@ -109,32 +104,12 @@ import java.util.function.Consumer; import java.util.stream.Collectors; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_EXCLUSION_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_INCLUSION_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATH_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_EXCLUSION_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_INCLUSION_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_PATTERN_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_ENABLE_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_START_TIME_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_ENABLE_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_EXCLUSION_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_INCLUSION_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATH_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_EXCLUSION_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_INCLUSION_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_PATTERN_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_REALTIME_ENABLE_KEY; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_START_TIME_KEY; public class PipeDataNodeTaskAgent extends PipeTaskAgent { @@ -199,9 +174,7 @@ protected void createPipeTask( || needConstructDataRegionTask || needConstructSchemaRegionTask) { calculateMemoryUsage( - pipeStaticMeta, - Collections.singletonList(new Pair<>(consensusGroupId, pipeTaskMeta)), - false); + pipeStaticMeta, Collections.singletonList(new Pair<>(consensusGroupId, pipeTaskMeta))); final PipeDataNodeTask pipeTask = new PipeDataNodeTaskBuilder(pipeStaticMeta, consensusGroupId, pipeTaskMeta).build(); @@ -415,75 +388,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro PipeConfig.getInstance().getPipeMetaReportMaxLogIntervalRounds(), pipeMetaKeeper.getPipeMetaCount()); - final Set dataRegionIds = - StorageEngine.getInstance().getAllDataRegionIds().stream() - .map(DataRegionId::getId) - .collect(Collectors.toSet()); - - final List pipeMetaBinaryList = new ArrayList<>(); - final List pipeCompletedList = new ArrayList<>(); - final List pipeRemainingEventCountList = new ArrayList<>(); - final List pipeRemainingTimeList = new ArrayList<>(); - final List pipeDegradedStatusList = new ArrayList<>(); - try { - for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { - pipeMetaBinaryList.add(pipeMeta.serialize()); - - final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta(); - - final Map pipeTaskMap = pipeTaskManager.getPipeTasks(staticMeta); - final boolean isAllDataRegionCompleted = - pipeTaskMap == null - || pipeTaskMap.entrySet().stream() - .filter(entry -> dataRegionIds.contains(entry.getKey())) - .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted()); - final String sourceModeValue = - pipeMeta - .getStaticMeta() - .getSourceParameters() - .getStringOrDefault( - Arrays.asList( - PipeSourceConstant.EXTRACTOR_MODE_KEY, PipeSourceConstant.SOURCE_MODE_KEY), - PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE); - final boolean includeDataAndNeedDrop = - DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( - pipeMeta.getStaticMeta().getSourceParameters()) - .getLeft() - && (sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE) - || sourceModeValue.equalsIgnoreCase( - PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE)); - - final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop; - final Pair remainingEventAndTime = - PipeDataNodeSinglePipeMetrics.getInstance() - .getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime()); - pipeCompletedList.add(isCompleted); - pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); - pipeRemainingTimeList.add(remainingEventAndTime.getRight()); - pipeDegradedStatusList.add( - PipeTemporaryMeta.encodeTsFileEpochDegradedStatus( - ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) - .getGlobalTsFileEpochDegraded())); - - logger.ifPresent( - l -> - PipeLogger.log( - l::info, - "Reporting pipe meta: %s, isCompleted: %s, remainingEventCount: %s", - pipeMeta.coreReportMessage(), - isCompleted, - remainingEventAndTime.getLeft())); - } - logger.ifPresent( - l -> PipeLogger.log(l::info, "Reported %s pipe metas.", pipeMetaBinaryList.size())); - } catch (final IOException | IllegalPathException e) { - throw new TException(e); - } - resp.setPipeMetaList(pipeMetaBinaryList); - resp.setPipeCompletedList(pipeCompletedList); - resp.setPipeRemainingEventCountList(pipeRemainingEventCountList); - resp.setPipeRemainingTimeList(pipeRemainingTimeList); - resp.setPipeDegradedStatusList(pipeDegradedStatusList); + collectPipeMetaReport(logger, true).setTo(resp); PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true); } @@ -505,19 +410,21 @@ protected void collectPipeMetaListInternal( LOGGER.debug( DataNodePipeMessages.RECEIVED_PIPE_HEARTBEAT_REQUEST_FROM_CONFIG_NODE, req.heartbeatId); + collectPipeMetaReport(logger, false).setTo(resp); + PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true); + } + + private PipeMetaReport collectPipeMetaReport( + final Optional logger, final boolean includeQueryMode) throws TException { final Set dataRegionIds = StorageEngine.getInstance().getAllDataRegionIds().stream() .map(DataRegionId::getId) .collect(Collectors.toSet()); - final List pipeMetaBinaryList = new ArrayList<>(); - final List pipeCompletedList = new ArrayList<>(); - final List pipeRemainingEventCountList = new ArrayList<>(); - final List pipeRemainingTimeList = new ArrayList<>(); - final List pipeDegradedStatusList = new ArrayList<>(); + final PipeMetaReport report = new PipeMetaReport(); try { for (final PipeMeta pipeMeta : pipeMetaKeeper.getPipeMetaList()) { - pipeMetaBinaryList.add(pipeMeta.serialize()); + report.pipeMetaBinaryList.add(pipeMeta.serialize()); final PipeStaticMeta staticMeta = pipeMeta.getStaticMeta(); @@ -527,21 +434,15 @@ protected void collectPipeMetaListInternal( || pipeTaskMap.entrySet().stream() .filter(entry -> dataRegionIds.contains(entry.getKey())) .allMatch(entry -> ((PipeDataNodeTask) entry.getValue()).isCompleted()); - - final boolean includeDataAndNeedDrop = - DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair( - pipeMeta.getStaticMeta().getSourceParameters()) - .getLeft() - && isSnapshotMode(pipeMeta.getStaticMeta().getSourceParameters()); - - final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop; + final boolean isCompleted = + isAllDataRegionCompleted && includeDataAndNeedDrop(pipeMeta, includeQueryMode); final Pair remainingEventAndTime = PipeDataNodeSinglePipeMetrics.getInstance() .getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime()); - pipeCompletedList.add(isCompleted); - pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); - pipeRemainingTimeList.add(remainingEventAndTime.getRight()); - pipeDegradedStatusList.add( + report.pipeCompletedList.add(isCompleted); + report.pipeRemainingEventCountList.add(remainingEventAndTime.getLeft()); + report.pipeRemainingTimeList.add(remainingEventAndTime.getRight()); + report.pipeDegradedStatusList.add( PipeTemporaryMeta.encodeTsFileEpochDegradedStatus( ((PipeTemporaryMetaInAgent) pipeMeta.getTemporaryMeta()) .getGlobalTsFileEpochDegraded())); @@ -556,16 +457,56 @@ protected void collectPipeMetaListInternal( remainingEventAndTime.getLeft())); } logger.ifPresent( - l -> PipeLogger.log(l::info, "Reported %s pipe metas.", pipeMetaBinaryList.size())); + l -> + PipeLogger.log(l::info, "Reported %s pipe metas.", report.pipeMetaBinaryList.size())); } catch (final IOException | IllegalPathException e) { throw new TException(e); } - resp.setPipeMetaList(pipeMetaBinaryList); - resp.setPipeCompletedList(pipeCompletedList); - resp.setPipeRemainingEventCountList(pipeRemainingEventCountList); - resp.setPipeRemainingTimeList(pipeRemainingTimeList); - resp.setPipeDegradedStatusList(pipeDegradedStatusList); - PipeInsertionDataNodeListener.getInstance().listenToHeartbeat(true); + return report; + } + + private boolean includeDataAndNeedDrop(final PipeMeta pipeMeta, final boolean includeQueryMode) + throws IllegalPathException { + final PipeParameters sourceParameters = pipeMeta.getStaticMeta().getSourceParameters(); + if (!DataRegionListeningFilter.parseInsertionDeletionListeningOptionPair(sourceParameters) + .getLeft()) { + return false; + } + if (!includeQueryMode) { + return isSnapshotMode(sourceParameters); + } + + final String sourceModeValue = + sourceParameters.getStringOrDefault( + Arrays.asList( + PipeSourceConstant.EXTRACTOR_MODE_KEY, PipeSourceConstant.SOURCE_MODE_KEY), + PipeSourceConstant.EXTRACTOR_MODE_DEFAULT_VALUE); + return sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_QUERY_VALUE) + || sourceModeValue.equalsIgnoreCase(PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE); + } + + private static class PipeMetaReport { + private final List pipeMetaBinaryList = new ArrayList<>(); + private final List pipeCompletedList = new ArrayList<>(); + private final List pipeRemainingEventCountList = new ArrayList<>(); + private final List pipeRemainingTimeList = new ArrayList<>(); + private final List pipeDegradedStatusList = new ArrayList<>(); + + private void setTo(final TDataNodeHeartbeatResp resp) { + resp.setPipeMetaList(pipeMetaBinaryList); + resp.setPipeCompletedList(pipeCompletedList); + resp.setPipeRemainingEventCountList(pipeRemainingEventCountList); + resp.setPipeRemainingTimeList(pipeRemainingTimeList); + resp.setPipeDegradedStatusList(pipeDegradedStatusList); + } + + private void setTo(final TPipeHeartbeatResp resp) { + resp.setPipeMetaList(pipeMetaBinaryList); + resp.setPipeCompletedList(pipeCompletedList); + resp.setPipeRemainingEventCountList(pipeRemainingEventCountList); + resp.setPipeRemainingTimeList(pipeRemainingTimeList); + resp.setPipeDegradedStatusList(pipeDegradedStatusList); + } } ///////////////////////// Terminate Logic ///////////////////////// @@ -817,21 +758,11 @@ protected void calculateMemoryUsage(final PipeMeta pipeMetaFromCoordinator) return; } - final PipeMeta pipeMetaInAgent = pipeMetaKeeper.getPipeMeta(staticMeta.getPipeName()); - final boolean ignoreRegisteredSinkSubtasks = - Objects.nonNull(pipeMetaInAgent) - && (!pipeMetaInAgent.getStaticMeta().equals(staticMeta) - || PipeStatus.DROPPED.equals(pipeMetaInAgent.getRuntimeMeta().getStatus().get())); - calculateMemoryUsage( - staticMeta, - collectPipeTasksToBeCreated(pipeMetaFromCoordinator), - ignoreRegisteredSinkSubtasks); + calculateMemoryUsage(staticMeta, collectPipeTasksToBeCreated(pipeMetaFromCoordinator)); } private void calculateMemoryUsage( - final PipeStaticMeta staticMeta, - final List> pipeTasksToBeCreated, - final boolean ignoreRegisteredSinkSubtasks) + final PipeStaticMeta staticMeta, final List> pipeTasksToBeCreated) throws IllegalPathException { if (!PipeConfig.getInstance().isPipeEnableMemoryCheck() || !isInnerSource(staticMeta.getSourceParameters()) @@ -845,8 +776,7 @@ private void calculateMemoryUsage( } final MemoryEstimation memoryEstimation = - calculateIncrementalMemoryUsage( - staticMeta, pipeTasksToBeCreated, ignoreRegisteredSinkSubtasks); + calculateIncrementalMemoryUsage(staticMeta, pipeTasksToBeCreated); calculateInsertNodeQueueMemory( staticMeta.getSourceParameters(), memoryEstimation.dataRegionTaskCount); @@ -914,48 +844,19 @@ private List> collectPipeTasksToBeCreated( private MemoryEstimation calculateIncrementalMemoryUsage( final PipeStaticMeta staticMeta, - final List> pipeTasksToBeCreated, - final boolean ignoreRegisteredSinkSubtasks) { - long needMemory = 0; + final List> pipeTasksToBeCreated) { int dataRegionTaskCount = 0; - final Set sinkSubtasksToBeCreated = new HashSet<>(); for (final Pair regionIdAndTaskMeta : pipeTasksToBeCreated) { - final int regionId = regionIdAndTaskMeta.getLeft(); - final PipeTaskMeta pipeTaskMeta = regionIdAndTaskMeta.getRight(); - final PipeParameters sourceParameters = - PipeDataNodeTaskBuilder.blendUserAndSystemParameters( - staticMeta.getSourceParameters(), pipeTaskMeta); - final PipeParameters sinkParameters = - PipeDataNodeTaskBuilder.blendUserAndSystemParameters( - staticMeta.getSinkParameters(), pipeTaskMeta); - PipeDataNodeTaskBuilder.preprocessParameters(sourceParameters, sinkParameters); - - final boolean isDataRegionTask = isDataRegionTask(regionId); - if (isDataRegionTask) { + if (isDataRegionTask(regionIdAndTaskMeta.getLeft())) { dataRegionTaskCount++; - needMemory += calculateTsFileParserMemory(sourceParameters, sinkParameters); - } - - final String sinkSubtaskId = - PipeSinkSubtaskManager.generateAttributeSortedString(sinkParameters, regionId); - if (isDataRegionTask - && !sinkSubtasksToBeCreated.contains(sinkSubtaskId) - && (ignoreRegisteredSinkSubtasks - || !PipeSinkSubtaskManager.instance() - .hasRegisteredSubtasks(sinkParameters, regionId))) { - sinkSubtasksToBeCreated.add(sinkSubtaskId); - final int sinkSubtaskNum = - PipeSinkSubtaskManager.calculateSinkSubtaskNum(sinkParameters, regionId); - needMemory += calculateSinkBatchMemory(sinkParameters) * sinkSubtaskNum; - needMemory += - calculateSendTsFileReadBufferMemory(sourceParameters, sinkParameters) * sinkSubtaskNum; } } - if (dataRegionTaskCount > 0) { - needMemory += calculateAssignerMemory(staticMeta.getSourceParameters()); - } + // TsFile parser, sink batch, and TsFile read buffer memory are allocated dynamically + // from PipeMemoryManager only while they are active. + final long needMemory = + dataRegionTaskCount > 0 ? calculateAssignerMemory(staticMeta.getSourceParameters()) : 0; return new MemoryEstimation(needMemory, dataRegionTaskCount); } @@ -1026,212 +927,6 @@ private void calculateInsertNodeQueueMemory( } } - private long calculateTsFileParserMemory( - final PipeParameters sourceParameters, final PipeParameters sinkParameters) { - - // If the source is not history, we do not need to allocate memory - boolean isExtractorHistory = - sourceParameters.getBooleanOrDefault( - SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, - SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE) - || sourceParameters.getBooleanOrDefault( - Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), - EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE); - - // If the source is history, and has start/end time, we need to allocate memory - boolean isTSFileParser = - isExtractorHistory - && sourceParameters.hasAnyAttributes( - EXTRACTOR_HISTORY_START_TIME_KEY, SOURCE_HISTORY_START_TIME_KEY); - - isTSFileParser = - isTSFileParser - || (isExtractorHistory - && sourceParameters.hasAnyAttributes( - EXTRACTOR_HISTORY_END_TIME_KEY, SOURCE_HISTORY_END_TIME_KEY)); - - // if the source has start/end time, we need to allocate memory - isTSFileParser = - isTSFileParser - || sourceParameters.hasAnyAttributes(SOURCE_START_TIME_KEY, EXTRACTOR_START_TIME_KEY); - - isTSFileParser = - isTSFileParser - || sourceParameters.hasAnyAttributes(SOURCE_END_TIME_KEY, EXTRACTOR_END_TIME_KEY); - - // If the source has pattern or path, we need to allocate memory - isTSFileParser = - isTSFileParser - || sourceParameters.hasAnyAttributes( - EXTRACTOR_PATTERN_KEY, - SOURCE_PATTERN_KEY, - EXTRACTOR_PATTERN_INCLUSION_KEY, - SOURCE_PATTERN_INCLUSION_KEY, - EXTRACTOR_PATTERN_EXCLUSION_KEY, - SOURCE_PATTERN_EXCLUSION_KEY); - - isTSFileParser = - isTSFileParser - || sourceParameters.hasAnyAttributes( - EXTRACTOR_PATH_KEY, - SOURCE_PATH_KEY, - EXTRACTOR_PATH_INCLUSION_KEY, - SOURCE_PATH_INCLUSION_KEY, - EXTRACTOR_PATTERN_INCLUSION_KEY, - SOURCE_PATTERN_INCLUSION_KEY, - EXTRACTOR_PATH_EXCLUSION_KEY, - SOURCE_PATH_EXCLUSION_KEY); - - // If the source is not hybrid, we do need to allocate memory - isTSFileParser = - isTSFileParser - || !PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals( - sinkParameters.getStringOrDefault( - Arrays.asList( - PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), - PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE)); - - if (!isTSFileParser) { - return 0; - } - - return PipeConfig.getInstance().getTsFileParserMemory(); - } - - private static long calculateSinkBatchMemory(final PipeParameters sinkParameters) { - final String format = - sinkParameters.getStringOrDefault( - Arrays.asList(PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), - PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE); - final boolean usingTsFileBatch = PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals(format); - - // TsFile format always uses a batch. Other formats only use a batch when batch mode is enabled. - final boolean needUseBatch = - usingTsFileBatch - || sinkParameters.getBooleanOrDefault( - Arrays.asList( - PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, - PipeSinkConstant.SINK_IOTDB_BATCH_MODE_ENABLE_KEY), - PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_DEFAULT_VALUE); - - if (!needUseBatch) { - return 0; - } - - final long batchSizeInBytes = - sinkParameters.getLongOrDefault( - Arrays.asList( - PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, - PipeSinkConstant.SINK_IOTDB_BATCH_SIZE_KEY), - usingTsFileBatch - ? PipeSinkConstant.CONNECTOR_IOTDB_TS_FILE_BATCH_SIZE_DEFAULT_VALUE - : PipeSinkConstant.CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE); - - return batchSizeInBytes * calculateBatchShardCount(sinkParameters, usingTsFileBatch); - } - - private static long calculateBatchShardCount( - final PipeParameters sinkParameters, final boolean usingTsFileBatch) { - if (usingTsFileBatch - || !sinkParameters.getBooleanOrDefault( - Arrays.asList( - PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY, - PipeSinkConstant.SINK_LEADER_CACHE_ENABLE_KEY), - PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE)) { - return 1; - } - - // Plain batches always allocate the default batch and may lazily allocate one batch per target - // endpoint when leader cache splits events by endpoint. - return 1L + calculateTargetEndPointCount(sinkParameters); - } - - private static int calculateTargetEndPointCount(final PipeParameters sinkParameters) { - final Set targetEndPoints = new HashSet<>(); - try { - addTargetEndPoint( - targetEndPoints, - sinkParameters, - PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY, - PipeSinkConstant.CONNECTOR_IOTDB_HOST_KEY, - PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY); - addTargetEndPoint( - targetEndPoints, - sinkParameters, - PipeSinkConstant.SINK_IOTDB_IP_KEY, - PipeSinkConstant.SINK_IOTDB_HOST_KEY, - PipeSinkConstant.SINK_IOTDB_PORT_KEY); - if (sinkParameters.hasAttribute(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY)) { - targetEndPoints.addAll( - NodeUrlUtils.parseTEndPointUrls( - Arrays.asList( - sinkParameters - .getStringByKeys(PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY) - .replace(" ", "") - .split(",")))); - } - if (sinkParameters.hasAttribute(PipeSinkConstant.SINK_IOTDB_NODE_URLS_KEY)) { - targetEndPoints.addAll( - NodeUrlUtils.parseTEndPointUrls( - Arrays.asList( - sinkParameters - .getStringByKeys(PipeSinkConstant.SINK_IOTDB_NODE_URLS_KEY) - .replace(" ", "") - .split(",")))); - } - } catch (final Exception ignored) { - return 1; - } - return Math.max(1, targetEndPoints.size()); - } - - private static void addTargetEndPoint( - final Set targetEndPoints, - final PipeParameters sinkParameters, - final String ipKey, - final String hostKey, - final String portKey) { - if (sinkParameters.hasAttribute(ipKey) && sinkParameters.hasAttribute(portKey)) { - targetEndPoints.add( - new TEndPoint( - sinkParameters.getStringByKeys(ipKey), sinkParameters.getIntByKeys(portKey))); - } - if (sinkParameters.hasAttribute(hostKey) && sinkParameters.hasAttribute(portKey)) { - targetEndPoints.add( - new TEndPoint( - sinkParameters.getStringByKeys(hostKey), sinkParameters.getIntByKeys(portKey))); - } - } - - private static long calculateSendTsFileReadBufferMemory( - final PipeParameters sourceParameters, final PipeParameters sinkParameters) { - // If the source is history enable, we need to transfer tsfile - boolean needTransferTsFile = - sourceParameters.getBooleanOrDefault( - SystemConstant.RESTART_OR_NEWLY_ADDED_KEY, - SystemConstant.RESTART_OR_NEWLY_ADDED_DEFAULT_VALUE) - || sourceParameters.getBooleanOrDefault( - Arrays.asList(EXTRACTOR_HISTORY_ENABLE_KEY, SOURCE_HISTORY_ENABLE_KEY), - EXTRACTOR_HISTORY_ENABLE_DEFAULT_VALUE); - - String format = - sinkParameters.getStringOrDefault( - Arrays.asList(PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.SINK_FORMAT_KEY), - PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE); - - // If the sink format is tsfile and hybrid, we need to transfer tsfile - needTransferTsFile = - needTransferTsFile - || PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE.equals(format) - || PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE.equals(format); - - if (!needTransferTsFile) { - return 0; - } - - return PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); - } - private long calculateAssignerMemory(final PipeParameters sourceParameters) { try { if (!PipeInsertionDataNodeListener.getInstance().isEmpty() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java index 295f30da3f202..8fd5584f3a18b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java @@ -179,6 +179,8 @@ protected boolean executeOnce() throws Exception { event1 -> { try { pipeProcessor.process(event1, outputEventCollector); + } catch (PipeRuntimeOutOfMemoryCriticalException e) { + throw e; } catch (Exception e) { ex.set(e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 8f664761aa032..98ab1e03ca3dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -720,38 +720,37 @@ public interface TabletInsertionEventConsumer { public void consumeTabletInsertionEventsWithRetry( final TabletInsertionEventConsumer consumer, final String callerName) throws Exception { - final Iterable iterable = toTabletInsertionEvents(); - final Iterator iterator = iterable.iterator(); int tabletEventCount = 0; - while (iterator.hasNext()) { - final TabletInsertionEvent parsedEvent = iterator.next(); - tabletEventCount++; - int retryCount = 0; - while (true) { - // If failed due do insufficient memory, retry until success to avoid race among multiple - // processor threads + try { + final Iterable iterable = toTabletInsertionEvents(); + final Iterator iterator = iterable.iterator(); + while (iterator.hasNext()) { + final TabletInsertionEvent parsedEvent = iterator.next(); + tabletEventCount++; try { consumer.consume((PipeRawTabletInsertionEvent) parsedEvent); - break; } catch (final PipeRuntimeOutOfMemoryCriticalException e) { - if (retryCount++ % 100 == 0) { - LOGGER.warn( - DataNodePipeMessages.FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE, - callerName, - getTsFile(), - tabletEventCount, - retryCount); - } else if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - DataNodePipeMessages.FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE, - callerName, - getTsFile(), - tabletEventCount, - retryCount, - e); - } + releaseParsedTabletEvent(parsedEvent); + throw e; } } + } catch (final PipeRuntimeOutOfMemoryCriticalException e) { + close(); + LOGGER.warn( + DataNodePipeMessages.FAILED_TO_ALLOCATE_MEMORY_FOR_PARSING_TSFILE, + callerName, + getTsFile(), + tabletEventCount, + e); + throw e; + } + } + + private void releaseParsedTabletEvent(final TabletInsertionEvent parsedEvent) { + if (parsedEvent instanceof PipeRawTabletInsertionEvent + && ((PipeRawTabletInsertionEvent) parsedEvent).getReferenceCount() == 0 + && !((PipeRawTabletInsertionEvent) parsedEvent).isReleased()) { + ((PipeRawTabletInsertionEvent) parsedEvent).clearReferenceCount(getClass().getName()); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index 627731fa7edc4..da5cc6b88dba1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -24,7 +24,6 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser; @@ -109,9 +108,7 @@ protected TsFileInsertionEventParser( this.sourceEvent = sourceEvent; this.allocatedMemoryBlockForTablet = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); LOGGER.debug( DataNodePipeMessages.TSFILE_HAS_INITIALIZED_PIPENAME_CREATION_TIME_PATTERN, @@ -180,6 +177,13 @@ protected void recordTabletMetrics(final Tablet tablet) { } } + protected void releaseTabletMemoryBlock() { + if (allocatedMemoryBlockForTablet != null + && allocatedMemoryBlockForTablet.getMemoryUsageInBytes() > 0) { + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet, 0); + } + } + @Override public void close() { tabletInsertionIterable = null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java index 33652f3f3da69..ba840b11c32dc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java @@ -524,6 +524,7 @@ public TabletInsertionEvent next() { final Tablet tablet = tabletIterator.next(); // Record tablet metrics recordTabletMetrics(tablet); + releaseTabletMemoryBlock(); final boolean isAligned = deviceIsAlignedMap.getOrDefault( IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index e3af5aaa0c1a0..8eeb95546b6f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -29,7 +29,6 @@ import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.db.auth.AuthorityChecker; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -144,12 +143,9 @@ public TsFileInsertionEventScanParser( filter = Objects.nonNull(timeFilterExpression) ? timeFilterExpression.getFilter() : null; this.allocatedMemoryBlockForBatchData = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForChunk = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); try { currentModifications = @@ -225,6 +221,9 @@ public TabletInsertionEvent next() { throw new NoSuchElementException(); } + // Release the previous parser-owned tablet buffer before allocating the next + // tablet. + releaseTabletMemoryBlock(); // currentIsAligned is initialized when TsFileInsertionEventScanParser is // constructed. // When the getNextTablet function is called, currentIsAligned may be updated, @@ -263,6 +262,9 @@ public TabletInsertionEvent next() { sourceEvent, isLast); } finally { + // The raw event owns the generated tablet; only release the parser-side memory + // accounting. + releaseTabletMemoryBlock(); if (isLast) { recordParseEndTimeIfNecessary(); close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java index 8ecdcc0cec5e9..5d81d6ae9021e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -22,11 +22,9 @@ import org.apache.iotdb.commons.audit.IAuditEntity; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.auth.AuthorityChecker; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -94,25 +92,14 @@ public TsFileInsertionEventTableParser( allocatedMemoryBlockForModifications = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry(currentModifications.ramBytesUsed()); - long tableSize = - Math.min( - IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(), - IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize()); - this.allocatedMemoryBlockForChunk = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - PipeConfig.getInstance().getPipeMaxReaderChunkSize()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForBatchData = - PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForChunkMeta = - PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(tableSize); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.allocatedMemoryBlockForTableSchemas = - PipeDataNodeResourceManager.memory() - .forceAllocateForTabletWithRetry( - IoTDBDescriptor.getInstance() - .getConfig() - .getPipeDataStructureTabletSizeInBytes()); + PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0); this.startTime = startTime; this.endTime = endTime; @@ -239,6 +226,7 @@ && hasTablePrivilege(entry.getKey()), final Tablet tablet = tabletIterator.next(); recordTabletMetrics(tablet); + releaseTabletMemoryBlock(); if (!PipeRawTabletInsertionEvent.isTabletEmpty(tablet)) { bufferedTablet = tablet; return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java index 6d27b9f7dd5b0..d7ba00eb18db0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; @@ -538,6 +539,8 @@ public void process( event -> { try { process(event, eventCollector); + } catch (PipeRuntimeOutOfMemoryCriticalException e) { + throw e; } catch (Exception e) { ex.set(e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java index a8e0c270570bb..fcaa0feb05814 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.processor.downsampling; import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -156,6 +157,8 @@ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector ev event -> { try { process(event, eventCollector); + } catch (PipeRuntimeOutOfMemoryCriticalException e) { + throw e; } catch (Exception e) { ex.set(e); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java index c99efe5e3da3e..64844430a720b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryManager.java @@ -329,12 +329,6 @@ public synchronized void resize( } final long oldSize = block.getMemoryUsageInBytes(); - if (oldSize == 0) { - // If the memory block is not registered, we need to register it first. - // Otherwise, the memory usage will be inconsistent. - // See registerMemoryBlock for more details. - allocatedBlocks.add(block); - } if (oldSize >= targetSize) { memoryBlock.release(oldSize - targetSize); @@ -350,6 +344,8 @@ public synchronized void resize( if (targetSize == 0) { allocatedBlocks.remove(block); } + + this.notifyAll(); return; } @@ -359,6 +355,12 @@ public synchronized void resize( if (getTotalNonFloatingMemorySizeInBytes() - memoryBlock.getUsedMemoryInBytes() >= sizeInBytes) { memoryBlock.forceAllocateWithoutLimitation(sizeInBytes); + if (oldSize == 0) { + // If the memory block is not registered, we need to register it first. + // Otherwise, the memory usage will be inconsistent. + // See registerMemoryBlock for more details. + allocatedBlocks.add(block); + } if (block instanceof PipeTabletMemoryBlock) { usedMemorySizeInBytesOfTablets += sizeInBytes; } @@ -495,6 +497,9 @@ public synchronized boolean tryAllocate( if (getTotalNonFloatingMemorySizeInBytes() - memoryBlock.getUsedMemoryInBytes() >= memoryInBytesNeededToBeAllocated) { memoryBlock.forceAllocateWithoutLimitation(memoryInBytesNeededToBeAllocated); + if (block.getMemoryUsageInBytes() == 0) { + allocatedBlocks.add(block); + } if (block instanceof PipeTabletMemoryBlock) { usedMemorySizeInBytesOfTablets += memoryInBytesNeededToBeAllocated; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java index aede0e994d9a9..b850de896a1a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventBatch.java @@ -61,8 +61,7 @@ protected PipeTabletEventBatch( // limit in buffer size this.maxBatchSizeInBytes = requestMaxBatchSizeInBytes; - this.allocatedMemoryBlock = - PipeDataNodeResourceManager.memory().forceAllocate(requestMaxBatchSizeInBytes); + this.allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocate(0); if (recordMetric != null) { this.recordMetric = recordMetric; } else { @@ -97,6 +96,10 @@ public synchronized boolean onEvent(final TabletInsertionEvent event) events.add((EnrichedEvent) event); } } catch (final Exception e) { + if (events.isEmpty()) { + clearBatchData(); + resetMemoryUsage(); + } // If the event is not added to the batch, we need to decrease the reference count. ((EnrichedEvent) event) .decreaseReferenceCount(PipeTransferBatchReqBuilder.class.getName(), false); @@ -126,7 +129,27 @@ public synchronized boolean onEvent(final TabletInsertionEvent event) protected abstract boolean constructBatch(final TabletInsertionEvent event) throws WALPipeException, IOException; + protected void increaseTotalBufferSizeAndUpdateMemoryBlock(final long bufferSize) { + if (bufferSize <= 0) { + return; + } + + final long newTotalBufferSize = Math.min(totalBufferSize + bufferSize, maxBatchSizeInBytes); + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlock, newTotalBufferSize); + totalBufferSize = newTotalBufferSize; + } + + protected void releaseAllocatedMemoryBlock() { + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlock, 0); + } + + protected void clearBatchData() {} + public boolean shouldEmit() { + if (events.isEmpty()) { + return false; + } + final long diff = System.currentTimeMillis() - firstEventProcessingTime; if (totalBufferSize >= maxBatchSizeInBytes || diff >= maxDelayInMs) { recordMetric.accept(diff, totalBufferSize, events.size()); @@ -138,23 +161,26 @@ public boolean shouldEmit() { public synchronized void onSuccess() { events.clear(); - totalBufferSize = 0; - - firstEventProcessingTime = Long.MIN_VALUE; + resetMemoryUsage(); } @Override public synchronized void close() { + if (isClosed) { + return; + } isClosed = true; clearEventsReferenceCount(PipeTabletEventBatch.class.getName()); events.clear(); + clearBatchData(); + resetMemoryUsage(); allocatedMemoryBlock.close(); } /** - * Discard all events of the given pipe. This method only clears the reference count of the events - * and discard them, but do not modify other objects (such as buffers) for simplicity. + * Discard all events of the given pipe. This method only clears the reference count of the + * events. If some events remain, cached batch data is kept unchanged for simplicity. */ public synchronized void discardEventsOfPipe( final String pipeNameToDrop, final long creationTimeToDrop, final int regionId) { @@ -162,14 +188,27 @@ public synchronized void discardEventsOfPipe( } public synchronized void discardEventsOfPipe(final CommitterKey committerKey) { - events.removeIf( - event -> { - if (isEventFromPipe(event, committerKey)) { - event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); - return true; - } - return false; - }); + final boolean hasDiscardedEvents = + events.removeIf( + event -> { + if (isEventFromPipe(event, committerKey)) { + event.clearReferenceCount(IoTDBDataRegionAsyncSink.class.getName()); + return true; + } + return false; + }); + if (hasDiscardedEvents && events.isEmpty()) { + clearBatchData(); + resetMemoryUsage(); + } + } + + private void resetMemoryUsage() { + totalBufferSize = 0; + + releaseAllocatedMemoryBlock(); + + firstEventProcessingTime = Long.MIN_VALUE; } private static boolean isEventFromPipe( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index b32479e2f1a21..05b348f323791 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -74,7 +74,6 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { @Override protected boolean constructBatch(final TabletInsertionEvent event) throws IOException { final long bufferSize = buildTabletInsertionBuffer(event); - totalBufferSize += bufferSize; pipe2BytesAccumulated.compute( new Pair<>( ((EnrichedEvent) event).getPipeName(), ((EnrichedEvent) event).getCreationTime()), @@ -85,8 +84,13 @@ protected boolean constructBatch(final TabletInsertionEvent event) throws IOExce @Override public synchronized void onSuccess() { + clearBatchData(); + super.onSuccess(); + } + @Override + protected void clearBatchData() { insertNodeBuffers.clear(); tabletBuffers.clear(); @@ -161,24 +165,21 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); if (!(insertNode instanceof RelationalInsertTabletNode)) { buffer = insertNode.serializeToByteBuffer(); + final String databaseName = + pipeInsertNodeTabletInsertionEvent.isTableModelEvent() + ? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName() + : pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); + estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit(); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); insertNodeBuffers.add(buffer); - if (pipeInsertNodeTabletInsertionEvent.isTableModelEvent()) { - final String databaseName = - pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName(); - estimateSize = RamUsageEstimator.sizeOf(databaseName); - insertNodeDataBases.add(databaseName); - } else { - final String databaseName = pipeInsertNodeTabletInsertionEvent.getTreeModelDatabaseName(); - estimateSize = RamUsageEstimator.sizeOf(databaseName); - insertNodeDataBases.add(databaseName); - } - estimateSize += buffer.limit(); + insertNodeDataBases.add(databaseName); } else { - for (final Tablet tablet : - ((PipeInsertNodeTabletInsertionEvent) event).convertToTablets()) { - estimateSize += - constructTabletBatch( - tablet, pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); + final List tablets = pipeInsertNodeTabletInsertionEvent.convertToTablets(); + estimateSize = calculateTabletsSizeInBytes(tablets); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); + for (final Tablet tablet : tablets) { + constructTabletBatchWithoutMemoryReservation( + tablet, pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()); } } } else { @@ -198,6 +199,7 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws } final String databaseName = pipeRawTabletInsertionEvent.getTreeModelDatabaseName(); estimateSize = RamUsageEstimator.sizeOf(databaseName) + buffer.limit(); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); tabletBuffers.add(buffer); tabletDataBases.add(databaseName); } @@ -207,12 +209,27 @@ private long buildTabletInsertionBuffer(final TabletInsertionEvent event) throws } private long constructTabletBatch(final Tablet tablet, final String databaseName) { + final long estimateSize = calculateTabletSizeInBytes(tablet); + increaseTotalBufferSizeAndUpdateMemoryBlock(estimateSize); + constructTabletBatchWithoutMemoryReservation(tablet, databaseName); + return estimateSize; + } + + private void constructTabletBatchWithoutMemoryReservation( + final Tablet tablet, final String databaseName) { final Pair> currentBatch = tableModelTabletMap .computeIfAbsent(databaseName, k -> new HashMap<>()) .computeIfAbsent(tablet.getTableName(), k -> new Pair<>(0, new ArrayList<>())); currentBatch.setLeft(currentBatch.getLeft() + tablet.getRowSize()); currentBatch.getRight().add(tablet); + } + + private long calculateTabletsSizeInBytes(final List tablets) { + return tablets.stream().mapToLong(PipeTabletEventPlainBatch::calculateTabletSizeInBytes).sum(); + } + + private static long calculateTabletSizeInBytes(final Tablet tablet) { return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index 7b511e23fc6c9..053b42b2c780d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -86,6 +86,7 @@ protected boolean constructBatch(final TabletInsertionEvent event) { (PipeInsertNodeTabletInsertionEvent) event; final boolean isTableModel = insertNodeTabletInsertionEvent.isTableModelEvent(); final List tablets = insertNodeTabletInsertionEvent.convertToTablets(); + increaseTotalBufferSizeAndUpdateMemoryBlock(calculateTabletsSizeInBytes(tablets)); for (int i = 0; i < tablets.size(); ++i) { final Tablet tablet = tablets.get(i); if (isTabletEmpty(tablet)) { @@ -114,6 +115,7 @@ protected boolean constructBatch(final TabletInsertionEvent event) { if (isTabletEmpty(tablet)) { return true; } + increaseTotalBufferSizeAndUpdateMemoryBlock(calculateTabletSizeInBytes(tablet)); if (rawTabletInsertionEvent.isTableModelEvent()) { // table Model bufferTableModelTablet( @@ -139,6 +141,17 @@ protected boolean constructBatch(final TabletInsertionEvent event) { return true; } + private long calculateTabletsSizeInBytes(final List tablets) { + return tablets.stream() + .filter(tablet -> !isTabletEmpty(tablet)) + .mapToLong(PipeTabletEventTsFileBatch::calculateTabletSizeInBytes) + .sum(); + } + + private static long calculateTabletSizeInBytes(final Tablet tablet) { + return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2; + } + private void bufferTreeModelTablet( final String pipeName, final long creationTime, @@ -146,11 +159,6 @@ private void bufferTreeModelTablet( final boolean isAligned) { new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); - // TODO: Currently, PipeTreeModelTsFileBuilderV2 still uses PipeTreeModelTsFileBuilder as a - // fallback builder, so memory table writing and storing temporary tablets require double the - // memory. - totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2; - pipeName2WeightMap.compute( new Pair<>(pipeName, creationTime), (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1); @@ -162,11 +170,6 @@ private void bufferTableModelTablet( final String pipeName, final long creationTime, final Tablet tablet, final String dataBase) { new PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp(); - // TODO: Currently, PipeTableModelTsFileBuilderV2 still uses PipeTableModelTsFileBuilder as a - // fallback builder, so memory table writing and storing temporary tablets require double the - // memory. - totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) * 2; - pipeName2WeightMap.compute( new Pair<>(pipeName, creationTime), (pipe, weight) -> Objects.nonNull(weight) ? ++weight : 1); @@ -209,8 +212,13 @@ public synchronized List> sealTsFiles() @Override public synchronized void onSuccess() { + clearBatchData(); + super.onSuccess(); + } + @Override + protected void clearBatchData() { pipeName2WeightMap.clear(); tableModeTsFileBuilder.onSuccess(); treeModeTsFileBuilder.onSuccess(); @@ -220,8 +228,6 @@ public synchronized void onSuccess() { public synchronized void close() { super.close(); - pipeName2WeightMap.clear(); - tableModeTsFileBuilder.close(); treeModeTsFileBuilder.close(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java index 4f2dab1bfa895..92a8c731fbe86 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java @@ -33,6 +33,8 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventTsFileBatch; @@ -497,10 +499,13 @@ private void transferFilePieces( final AirGapSocket socket, final boolean isMultiFile) throws PipeException, IOException { - final int readFileBufferSize = PIPE_CONFIG.getPipeSinkReadFileBufferSize(); - final byte[] readBuffer = new byte[readFileBufferSize]; - long position = 0; - try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final byte[] readBuffer = new byte[readFileBufferSize]; + long position = 0; while (true) { mayLimitRateAndRecordIO(readFileBufferSize); final int readLength = reader.read(readBuffer); @@ -532,6 +537,11 @@ private void transferFilePieces( } } + private int getReadFileBufferSize(final File file) { + return (int) + Math.min((long) PIPE_CONFIG.getPipeSinkReadFileBufferSize(), Math.max(file.length(), 1L)); + } + private boolean sendBatch( final AirGapSocket socket, byte[] bytes, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java index 481e340a739f3..9c3104c0ad17d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java @@ -39,6 +39,8 @@ import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.builder.IoTConsensusV2SyncBatchReqBuilder; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2DeleteNodeReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TabletInsertNodeReq; @@ -435,10 +437,13 @@ protected void transferFilePieces( final TCommitId tCommitId, final TConsensusGroupId tConsensusGroupId) throws PipeException, IOException { - final int readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); - final byte[] readBuffer = new byte[readFileBufferSize]; - long position = 0; - try (final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final byte[] readBuffer = new byte[readFileBufferSize]; + long position = 0; while (true) { final int readLength = reader.read(readBuffer); if (readLength == -1) { @@ -501,6 +506,13 @@ protected void transferFilePieces( } } + private int getReadFileBufferSize(final File file) { + return (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(file.length(), 1L)); + } + private TEndPoint getFollowerUrl() { // In current iotConsensusV2 design, one connector corresponds to one follower, so the peers is // actually a singleton list diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java index 4e269aaa7e82c..52815e645bf8d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java @@ -31,6 +31,8 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.consensus.metric.IoTConsensusV2SinkMetrics; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceReq; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.payload.request.IoTConsensusV2TsFilePieceWithModReq; @@ -70,7 +72,8 @@ public class IoTConsensusV2TsFileInsertionEventHandler private final boolean transferMod; private final int readFileBufferSize; - private final byte[] readBuffer; + private PipeTsFileMemoryBlock memoryBlock; + private byte[] readBuffer; private long position; private RandomAccessFile reader; @@ -106,8 +109,15 @@ public IoTConsensusV2TsFileInsertionEventHandler( transferMod = event.isWithMod(); currentFile = transferMod ? modFile : tsFile; - readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); - readBuffer = new byte[readFileBufferSize]; + final long maxFileLength = + transferMod && Objects.nonNull(modFile) + ? Math.max(tsFile.length(), modFile.length()) + : tsFile.length(); + readFileBufferSize = + (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(maxFileLength, 1L)); position = 0; reader = @@ -128,6 +138,12 @@ public void transfer(final AsyncIoTConsensusV2ServiceClient client) this.client = client; client.setShouldReturnSelf(false); + if (readBuffer == null) { + memoryBlock = + PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(readFileBufferSize); + readBuffer = new byte[readFileBufferSize]; + } + final int readLength = reader.read(readBuffer); if (readLength == -1) { if (currentFile == modFile) { @@ -246,6 +262,8 @@ public void onComplete(final TIoTConsensusV2TransferResp response) { client.returnSelf(); } + releaseReadBufferMemoryBlock(); + long duration = System.nanoTime() - createTime; metric.recordConnectorTsFileTransferTimer(duration); } @@ -330,10 +348,20 @@ public void onError(final Exception exception) { connector.addFailureEventToRetryQueue(event); metric.recordRetryCounter(); + releaseReadBufferMemoryBlock(); + if (client != null) { client.setShouldReturnSelf(true); client.returnSelf(); } } } + + private void releaseReadBufferMemoryBlock() { + if (memoryBlock != null) { + memoryBlock.close(); + memoryBlock = null; + readBuffer = null; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java index 677c77e0540f5..fc387084a0012 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/payload/builder/IoTConsensusV2TransferBatchReqBuilder.java @@ -70,6 +70,7 @@ public abstract class IoTConsensusV2TransferBatchReqBuilder implements AutoClose protected long firstEventProcessingTime = Long.MIN_VALUE; // limit in buffer size + protected final long maxBatchSizeInBytes; protected final PipeMemoryBlock allocatedMemoryBlock; protected long totalBufferSize = 0; @@ -92,37 +93,12 @@ protected IoTConsensusV2TransferBatchReqBuilder( this.consensusGroupId = consensusGroupId; this.thisDataNodeId = thisDataNodeId; - final long requestMaxBatchSizeInBytes = + maxBatchSizeInBytes = parameters.getLongOrDefault( Arrays.asList(CONNECTOR_IOTDB_BATCH_SIZE_KEY, SINK_IOTDB_BATCH_SIZE_KEY), CONNECTOR_IOTDB_PLAIN_BATCH_SIZE_DEFAULT_VALUE); - allocatedMemoryBlock = - PipeDataNodeResourceManager.memory() - .tryAllocate(requestMaxBatchSizeInBytes) - .setShrinkMethod(oldMemory -> Math.max(oldMemory / 2, 0)) - .setShrinkCallback( - (oldMemory, newMemory) -> - LOGGER.info( - DataNodePipeMessages.THE_BATCH_SIZE_LIMIT_HAS_SHRUNK_FROM, - oldMemory, - newMemory)) - .setExpandMethod( - oldMemory -> Math.min(Math.max(oldMemory, 1) * 2, requestMaxBatchSizeInBytes)) - .setExpandCallback( - (oldMemory, newMemory) -> - LOGGER.info( - DataNodePipeMessages.THE_BATCH_SIZE_LIMIT_HAS_EXPANDED_FROM, - oldMemory, - newMemory)); - - if (getMaxBatchSizeInBytes() != requestMaxBatchSizeInBytes) { - LOGGER.info( - "IoTConsensusV2TransferBatchReqBuilder: the max batch size is adjusted from {} to {} due to the " - + "memory restriction", - requestMaxBatchSizeInBytes, - getMaxBatchSizeInBytes()); - } + allocatedMemoryBlock = PipeDataNodeResourceManager.memory().forceAllocate(0); } /** @@ -137,27 +113,74 @@ public synchronized boolean onEvent(TabletInsertionEvent event) return false; } - final long requestCommitId = ((EnrichedEvent) event).getReplicateIndexForIoTV2(); + final EnrichedEvent enrichedEvent = (EnrichedEvent) event; + final long requestCommitId = enrichedEvent.getReplicateIndexForIoTV2(); // The deduplication logic here is to avoid the accumulation of the same event in a batch when // retrying. if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) { - events.add((EnrichedEvent) event); - requestCommitIds.add(requestCommitId); - final int bufferSize = buildTabletInsertionBuffer(event); - - ((EnrichedEvent) event) - .increaseReferenceCount(IoTConsensusV2TransferBatchReqBuilder.class.getName()); + if (!enrichedEvent.increaseReferenceCount( + IoTConsensusV2TransferBatchReqBuilder.class.getName())) { + LOGGER.warn(DataNodePipeMessages.CANNOT_INCREASE_REFERENCE_COUNT_FOR_EVENT_IGNORE, event); + return shouldEmit(); + } - if (firstEventProcessingTime == Long.MIN_VALUE) { - firstEventProcessingTime = System.currentTimeMillis(); + final int previousEventsSize = events.size(); + final int previousRequestCommitIdsSize = requestCommitIds.size(); + final int previousBatchReqsSize = batchReqs.size(); + try { + events.add(enrichedEvent); + requestCommitIds.add(requestCommitId); + final int bufferSize = buildTabletInsertionBuffer(event); + increaseTotalBufferSizeAndUpdateMemoryBlock(bufferSize); + + if (firstEventProcessingTime == Long.MIN_VALUE) { + firstEventProcessingTime = System.currentTimeMillis(); + } + } catch (final Exception e) { + rollbackTo(previousEventsSize, previousRequestCommitIdsSize, previousBatchReqsSize); + if (events.isEmpty()) { + resetMemoryUsage(); + } + enrichedEvent.decreaseReferenceCount( + IoTConsensusV2TransferBatchReqBuilder.class.getName(), false); + throw e; } + } + + return shouldEmit(); + } + + private boolean shouldEmit() { + return !events.isEmpty() + && (totalBufferSize >= getMaxBatchSizeInBytes() + || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs); + } - totalBufferSize += bufferSize; + private void increaseTotalBufferSizeAndUpdateMemoryBlock(final long bufferSize) { + if (bufferSize <= 0) { + return; } - return totalBufferSize >= getMaxBatchSizeInBytes() - || System.currentTimeMillis() - firstEventProcessingTime >= maxDelayInMs; + final long newTotalBufferSize = + Math.min(totalBufferSize + bufferSize, getMaxBatchSizeInBytes()); + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlock, newTotalBufferSize); + totalBufferSize = newTotalBufferSize; + } + + private void rollbackTo( + final int previousEventsSize, + final int previousRequestCommitIdsSize, + final int previousBatchReqsSize) { + events.subList(previousEventsSize, events.size()).clear(); + requestCommitIds.subList(previousRequestCommitIdsSize, requestCommitIds.size()).clear(); + batchReqs.subList(previousBatchReqsSize, batchReqs.size()).clear(); + } + + private void resetMemoryUsage() { + firstEventProcessingTime = Long.MIN_VALUE; + totalBufferSize = 0; + PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlock, 0); } public synchronized void onSuccess() { @@ -166,9 +189,7 @@ public synchronized void onSuccess() { events.clear(); requestCommitIds.clear(); - firstEventProcessingTime = Long.MIN_VALUE; - - totalBufferSize = 0; + resetMemoryUsage(); } public IoTConsensusV2TabletBatchReq toTIoTConsensusV2BatchTransferReq() throws IOException { @@ -176,7 +197,7 @@ public IoTConsensusV2TabletBatchReq toTIoTConsensusV2BatchTransferReq() throws I } protected long getMaxBatchSizeInBytes() { - return allocatedMemoryBlock.getMemoryUsageInBytes(); + return maxBatchSizeInBytes; } public boolean isEmpty() { @@ -220,6 +241,9 @@ public synchronized void close() { ((EnrichedEvent) event).clearReferenceCount(this.getClass().getName()); } } + batchReqs.clear(); + events.clear(); + requestCommitIds.clear(); allocatedMemoryBlock.close(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index b937331957f03..a705eeb49fed3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -37,6 +37,8 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; @@ -420,8 +422,12 @@ private void transportSingleFilePieceByPiece(final File file) throws IOException long position = 0; // Try small piece to rebase the file position. - final byte[] buffer = new byte[PipeConfig.getInstance().getPipeSinkReadFileBufferSize()]; - try (final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")) { + final byte[] buffer = new byte[readFileBufferSize]; while (true) { final int dataLength = randomAccessFile.read(buffer); if (dataLength == -1) { @@ -459,6 +465,13 @@ private void transportSingleFilePieceByPiece(final File file) throws IOException } } + private int getReadFileBufferSize(final File file) { + return (int) + Math.min( + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(file.length(), 1L)); + } + @Override public void close() throws Exception { if (client != null) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java index 2467c3ce14336..27cb81d6cda29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/handler/PipeTransferTsFileHandler.java @@ -124,11 +124,15 @@ public PipeTransferTsFileHandler( // the memory of the TsFile event is not released, so the memory is not enough for slicing. This // will cause a deadlock. waitForResourceEnough4Slicing((long) ((1 + Math.random()) * 20 * 1000)); // 20 - 40 seconds + final long maxFileLength = + transferMod && Objects.nonNull(modFile) + ? Math.max(tsFile.length(), modFile.length()) + : tsFile.length(); readFileBufferSize = (int) Math.min( - PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), - transferMod ? Math.max(tsFile.length(), modFile.length()) : tsFile.length()); + (long) PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), + Math.max(maxFileLength, 1L)); position = 0; isSealSignalSent = new AtomicBoolean(false); @@ -142,21 +146,6 @@ public void transfer( final IoTDBDataNodeAsyncClientManager clientManager, final AsyncPipeDataTransferServiceClient client) throws TException, IOException { - // Delay creation of resources to avoid OOM or too many open files - if (readBuffer == null) { - memoryBlock = - PipeDataNodeResourceManager.memory() - .forceAllocateForTsFileWithRetry( - PipeConfig.getInstance().isPipeSinkReadFileBufferMemoryControlEnabled() - ? readFileBufferSize - : 0); - readBuffer = new byte[readFileBufferSize]; - } - - if (reader == null) { - reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r"); - } - this.clientManager = clientManager; this.client = client; @@ -173,6 +162,17 @@ public void transfer( return; } + // Delay creation of resources to avoid OOM or too many open files + if (readBuffer == null) { + memoryBlock = + PipeDataNodeResourceManager.memory().forceAllocateForTsFileWithRetry(readFileBufferSize); + readBuffer = new byte[readFileBufferSize]; + } + + if (reader == null) { + reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r"); + } + client.setShouldReturnSelf(false); client.setTimeoutDynamically(clientManager.getConnectionTimeout()); @@ -256,6 +256,7 @@ public void onComplete(final TPipeTransferResp response) { super.onComplete(response); } finally { if (sink.isClosed()) { + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } } @@ -319,6 +320,7 @@ protected boolean onCompleteInternal(final TPipeTransferResp response) { referenceCount); } + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } @@ -361,6 +363,7 @@ public void onError(final Exception exception) { try { super.onError(exception); } finally { + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } } @@ -412,6 +415,7 @@ protected void onErrorInternal(final Exception exception) { LOGGER.warn(DataNodePipeMessages.FAILED_TO_CLOSE_FILE_READER_OR_DELETE, e); } finally { try { + releaseReadBufferMemoryBlock(); returnClientIfNecessary(); } finally { if (eventsHadBeenAddedToRetryQueue.compareAndSet(false, true)) { @@ -473,10 +477,14 @@ public void clearEventsReferenceCount() { @Override public void close() { super.close(); + releaseReadBufferMemoryBlock(); + } + private void releaseReadBufferMemoryBlock() { if (memoryBlock != null) { memoryBlock.close(); memoryBlock = null; + readBuffer = null; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java index eb7d39864c433..a108743ed1cb6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java @@ -22,10 +22,12 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey; +import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.sink.client.IoTDBSyncClient; import org.apache.iotdb.commons.pipe.sink.limiter.TsFileSendRateLimiter; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.i18n.DataNodePipeMessages; import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent; @@ -36,6 +38,8 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics; import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeTsFileMemoryBlock; import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeSyncClientManager; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventBatch; import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch; @@ -71,6 +75,7 @@ import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.Collections; @@ -593,6 +598,130 @@ private void doTransfer( LOGGER.info(DataNodePipeMessages.SUCCESSFULLY_TRANSFERRED_FILE, tsFile); } + @Override + protected void transferFilePieces( + final Map, Double> pipe2WeightMap, + final File file, + final Pair clientAndStatus, + final boolean isMultiFile) + throws PipeException, IOException { + final int readFileBufferSize = getReadFileBufferSize(file); + try (final PipeTsFileMemoryBlock ignored = + PipeDataNodeResourceManager.memory() + .forceAllocateForTsFileWithRetry(readFileBufferSize); + final RandomAccessFile reader = new RandomAccessFile(file, "r")) { + final byte[] readBuffer = new byte[readFileBufferSize]; + long position = 0; + int readLength; + while ((readLength = readNextFilePiece(reader, readBuffer, readFileBufferSize)) != -1) { + position = + transferFilePiece( + pipe2WeightMap, + file, + clientAndStatus, + isMultiFile, + readBuffer, + position, + readLength, + reader); + } + } + } + + private int readNextFilePiece( + final RandomAccessFile reader, final byte[] readBuffer, final int readFileBufferSize) + throws IOException { + mayLimitRateAndRecordIO(readFileBufferSize); + return reader.read(readBuffer); + } + + private long transferFilePiece( + final Map, Double> pipe2WeightMap, + final File file, + final Pair clientAndStatus, + final boolean isMultiFile, + final byte[] readBuffer, + final long position, + final int readLength, + final RandomAccessFile reader) + throws PipeException, IOException { + final byte[] payLoad = buildFilePiecePayload(readBuffer, readLength); + final PipeTransferFilePieceResp resp = + doTransferFilePiece(pipe2WeightMap, file, clientAndStatus, isMultiFile, payLoad, position); + return handleTransferFilePieceResp(file, clientAndStatus, reader, position + readLength, resp); + } + + private byte[] buildFilePiecePayload(final byte[] readBuffer, final int readLength) { + return readLength == readBuffer.length + ? readBuffer + : Arrays.copyOfRange(readBuffer, 0, readLength); + } + + private PipeTransferFilePieceResp doTransferFilePiece( + final Map, Double> pipe2WeightMap, + final File file, + final Pair clientAndStatus, + final boolean isMultiFile, + final byte[] payLoad, + final long position) + throws PipeException, IOException { + try { + final TPipeTransferReq req = + compressIfNeeded( + isMultiFile + ? getTransferMultiFilePieceReq(file.getName(), position, payLoad) + : getTransferSingleFilePieceReq(file.getName(), position, payLoad)); + pipe2WeightMap.forEach( + (namePair, weight) -> + rateLimitIfNeeded( + namePair.getLeft(), + namePair.getRight(), + clientAndStatus.getLeft().getEndPoint(), + (long) (req.getBody().length * weight))); + return PipeTransferFilePieceResp.fromTPipeTransferResp( + clientAndStatus.getLeft().pipeTransfer(req)); + } catch (final Exception e) { + clientAndStatus.setRight(false); + throw new PipeConnectionException( + String.format("Network error when transfer file %s, because %s.", file, e.getMessage()), + e); + } + } + + private long handleTransferFilePieceResp( + final File file, + final Pair clientAndStatus, + final RandomAccessFile reader, + final long nextPosition, + final PipeTransferFilePieceResp resp) + throws PipeException, IOException { + final TSStatus status = resp.getStatus(); + if (status.getCode() == TSStatusCode.PIPE_TRANSFER_FILE_OFFSET_RESET.getStatusCode()) { + reader.seek(resp.getEndWritingOffset()); + LOGGER.info(DataNodePipeMessages.REDIRECT_FILE_POSITION_TO, resp.getEndWritingOffset()); + return resp.getEndWritingOffset(); + } + + if (status.getCode() == TSStatusCode.PIPE_CONFIG_RECEIVER_HANDSHAKE_NEEDED.getStatusCode()) { + getClientManager().sendHandshakeReq(clientAndStatus); + } + + if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode() + && status.getCode() != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { + receiverStatusHandler.handle( + resp.getStatus(), + String.format("Transfer file %s error, result status %s.", file, resp.getStatus()), + file.getName()); + } + return nextPosition; + } + + private int getReadFileBufferSize(final File file) { + return (int) + Math.min( + PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), Math.max(file.length(), 1L)); + } + @Override public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException { if (Objects.isNull(compressionTimer) && Objects.nonNull(sinkTaskId)) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgentTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgentTest.java index 0582a94170ddf..d3249933aa114 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgentTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgentTest.java @@ -23,19 +23,13 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; -import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; -import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; import org.junit.Assert; import org.junit.Test; -import java.lang.reflect.Method; import java.util.HashMap; -import java.util.Map; public class PipeDataNodeTaskAgentTest { @@ -74,86 +68,4 @@ public void testCreateMemoryCheckStillRunsWhenNoPipeTasksNeedToBeCreated() throw .setPipeTotalFloatingMemoryProportion(originalPipeTotalFloatingMemoryProportion); } } - - @Test - public void testPlainBatchMemoryIncludesLeaderCacheEndpointShards() throws Exception { - final Map sinkAttributes = new HashMap<>(); - sinkAttributes.put( - PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE); - sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1024"); - sinkAttributes.put( - PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667, 127.0.0.2:6667"); - sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_IP_KEY, "127.0.0.3"); - sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_PORT_KEY, "6667"); - - Assert.assertEquals( - 4 * 1024L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes))); - - sinkAttributes.put( - PipeSinkConstant.CONNECTOR_LEADER_CACHE_ENABLE_KEY, Boolean.FALSE.toString()); - Assert.assertEquals(1024L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes))); - } - - @Test - public void testTsFileBatchMemoryIgnoresLeaderCacheEndpointShards() throws Exception { - final Map sinkAttributes = new HashMap<>(); - sinkAttributes.put( - PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TS_FILE_VALUE); - sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "2048"); - sinkAttributes.put( - PipeSinkConstant.CONNECTOR_IOTDB_NODE_URLS_KEY, "127.0.0.1:6667,127.0.0.2:6667"); - - Assert.assertEquals(2048L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes))); - } - - @Test - public void testPlainBatchMemoryReturnsZeroWhenBatchModeIsDisabled() throws Exception { - final Map sinkAttributes = new HashMap<>(); - sinkAttributes.put( - PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE); - sinkAttributes.put( - PipeSinkConstant.CONNECTOR_IOTDB_BATCH_MODE_ENABLE_KEY, Boolean.FALSE.toString()); - sinkAttributes.put(PipeSinkConstant.CONNECTOR_IOTDB_BATCH_SIZE_KEY, "1024"); - - Assert.assertEquals(0L, invokeCalculateSinkBatchMemory(new PipeParameters(sinkAttributes))); - } - - @Test - public void testSendTsFileReadBufferMemoryUsesSinkReadFileBufferSize() throws Exception { - final Map sourceAttributes = new HashMap<>(); - sourceAttributes.put(PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, Boolean.FALSE.toString()); - - final Map sinkAttributes = new HashMap<>(); - sinkAttributes.put( - PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_TABLET_VALUE); - Assert.assertEquals( - 0L, - invokeCalculateSendTsFileReadBufferMemory( - new PipeParameters(sourceAttributes), new PipeParameters(sinkAttributes))); - - sinkAttributes.put( - PipeSinkConstant.CONNECTOR_FORMAT_KEY, PipeSinkConstant.CONNECTOR_FORMAT_HYBRID_VALUE); - Assert.assertEquals( - PipeConfig.getInstance().getPipeSinkReadFileBufferSize(), - invokeCalculateSendTsFileReadBufferMemory( - new PipeParameters(sourceAttributes), new PipeParameters(sinkAttributes))); - } - - private long invokeCalculateSinkBatchMemory(final PipeParameters sinkParameters) - throws Exception { - final Method method = - PipeDataNodeTaskAgent.class.getDeclaredMethod( - "calculateSinkBatchMemory", PipeParameters.class); - method.setAccessible(true); - return (long) method.invoke(null, sinkParameters); - } - - private long invokeCalculateSendTsFileReadBufferMemory( - final PipeParameters sourceParameters, final PipeParameters sinkParameters) throws Exception { - final Method method = - PipeDataNodeTaskAgent.class.getDeclaredMethod( - "calculateSendTsFileReadBufferMemory", PipeParameters.class, PipeParameters.class); - method.setAccessible(true); - return (long) method.invoke(null, sourceParameters, sinkParameters); - } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 91b6d7d56c532..3d81e143ac867 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.pipe.event; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; @@ -155,6 +156,87 @@ public void testScanParser() throws Exception { System.out.println(System.currentTimeMillis() - startTime); } + @Test + public void testScanParserReleasesTabletMemoryAfterRawTabletGenerated() throws Exception { + nonalignedTsFile = + TsFileGeneratorUtils.generateNonAlignedTsFile( + "nonaligned-release-tablet-memory.tsfile", 1, 1, 10, 0, 100, 10, 10); + + try (final TsFileInsertionEventScanParser parser = + new TsFileInsertionEventScanParser( + nonalignedTsFile, + new PrefixTreePattern("root"), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + false)) { + final Iterator iterator = parser.toTabletInsertionEvents().iterator(); + + Assert.assertTrue(iterator.hasNext()); + final TabletInsertionEvent event = iterator.next(); + Assert.assertTrue(event instanceof PipeRawTabletInsertionEvent); + Assert.assertEquals(0, getAllocatedTabletMemory(parser).getMemoryUsageInBytes()); + + ((PipeRawTabletInsertionEvent) event).clearReferenceCount(getClass().getName()); + } + } + + @Test + public void testConsumeTabletInsertionEventsWithRetryReleasesParserOnOutOfMemory() + throws Exception { + nonalignedTsFile = + TsFileGeneratorUtils.generateNonAlignedTsFile( + "nonaligned-consume-oom.tsfile", 1, 1, 10, 0, 100, 10, 10); + resource = new TsFileResource(nonalignedTsFile); + resource.setStatusForTest(TsFileResourceStatus.NORMAL); + + // The TsFile generator only creates the file, so mark the resource non-empty explicitly. + final IDeviceID deviceID = IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d0"); + resource.updateStartTime(deviceID, 0); + resource.updateEndTime(deviceID, 9); + + final PipeTsFileInsertionEvent event = + new PipeTsFileInsertionEvent( + false, + "root", + resource, + null, + false, + false, + false, + null, + null, + 0, + null, + new PrefixTreePattern("root"), + null, + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + final AtomicReference parsedEventReference = + new AtomicReference<>(); + + final PipeRuntimeOutOfMemoryCriticalException exception = + Assert.assertThrows( + PipeRuntimeOutOfMemoryCriticalException.class, + () -> + event.consumeTabletInsertionEventsWithRetry( + parsedEvent -> { + parsedEventReference.set(parsedEvent); + throw new PipeRuntimeOutOfMemoryCriticalException("expected oom"); + }, + "test")); + + Assert.assertEquals("expected oom", exception.getMessage()); + Assert.assertNotNull(parsedEventReference.get()); + Assert.assertTrue(parsedEventReference.get().isReleased()); + Assert.assertNull(getEventParser(event).get()); + } + @Test public void testScanParserSplitNonAlignedSinglePageChunkByEstimatedPageMemory() throws Exception { final long originalPipeMaxReaderChunkSize = @@ -1995,6 +2077,22 @@ private PipeMemoryBlock getAllocatedBatchDataMemory(final TsFileInsertionEventSc return (PipeMemoryBlock) field.get(parser); } + private PipeMemoryBlock getAllocatedTabletMemory(final TsFileInsertionEventParser parser) + throws NoSuchFieldException, IllegalAccessException { + final Field field = + TsFileInsertionEventParser.class.getDeclaredField("allocatedMemoryBlockForTablet"); + field.setAccessible(true); + return (PipeMemoryBlock) field.get(parser); + } + + @SuppressWarnings("unchecked") + private AtomicReference getEventParser( + final PipeTsFileInsertionEvent event) throws NoSuchFieldException, IllegalAccessException { + final Field field = PipeTsFileInsertionEvent.class.getDeclaredField("eventParser"); + field.setAccessible(true); + return (AtomicReference) field.get(event); + } + private long calculatePipeMaxReaderChunkSizeForSinglePageAlignedChunk(final File tsFile) throws Exception { try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) {