diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index 5bdef252a2f4a..dc2ab1d381fd8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -112,6 +112,7 @@ private PipeRawTabletInsertionEvent( this.isAligned = isAligned; this.sourceEvent = sourceEvent; this.needToReport = needToReport; + inheritSourceEventReportSkippingIfNecessary(); // Allocate empty memory block, will be resized later. this.allocatedMemoryBlock = @@ -399,6 +400,18 @@ public void markAsNeedToReport() { }); } this.needToReport = true; + inheritSourceEventReportSkippingIfNecessary(); + } + + private void inheritSourceEventReportSkippingIfNecessary() { + if (needToReport && shouldSkipReportOnCommitBecauseOfSourceEvent()) { + skipReportOnCommit(); + } + } + + private boolean shouldSkipReportOnCommitBecauseOfSourceEvent() { + return sourceEvent instanceof PipeTsFileInsertionEvent + && !((PipeTsFileInsertionEvent) sourceEvent).shouldReportGeneratedEventsOnCommit(); } // This getter is reserved for user-defined plugins diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 8f664761aa032..b4e1df26dbecd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -105,6 +105,8 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent protected volatile ProgressIndex overridingProgressIndex; private Set tableNames; private String tsFileDedupScopeID; + // False when generated tablet events should wait for an external progress report. + private volatile boolean shouldReportGeneratedEventsOnCommit = true; // This is set to check the tsFile paths by privilege private Map treeSchemaMap; @@ -462,6 +464,23 @@ public ProgressIndex forceGetProgressIndex() { return resource.getMaxProgressIndex(); } + public PipeTsFileInsertionEvent skipReportOnCommitAndGeneratedEvents() { + return setShouldReportGeneratedEventsOnCommit(false); + } + + public boolean shouldReportGeneratedEventsOnCommit() { + return shouldReportGeneratedEventsOnCommit; + } + + private PipeTsFileInsertionEvent setShouldReportGeneratedEventsOnCommit( + final boolean shouldReportGeneratedEventsOnCommit) { + this.shouldReportGeneratedEventsOnCommit = shouldReportGeneratedEventsOnCommit; + if (!shouldReportGeneratedEventsOnCommit) { + skipReportOnCommit(); + } + return this; + } + public void eliminateProgressIndex() { if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource) @@ -517,7 +536,8 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep startTime, endTime, isTsFileSealed) - .bindTsFileDedupScopeID(tsFileDedupScopeID); + .bindTsFileDedupScopeID(tsFileDedupScopeID) + .setShouldReportGeneratedEventsOnCommit(shouldReportGeneratedEventsOnCommit); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java index c5dc7662bbc72..66944f776d0f8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/IoTDBDataRegionSource.java @@ -75,6 +75,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE; @@ -107,6 +108,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STREAMING_KEY; @@ -186,6 +188,16 @@ public void validate(final PipeParameterValidator validator) throws Exception { SOURCE_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) .validateAttributeValueRange( SOURCE_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString()) + .validateAttributeValueRange( + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY, + true, + Boolean.TRUE.toString(), + Boolean.FALSE.toString()) + .validateAttributeValueRange( + SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY, + true, + Boolean.TRUE.toString(), + Boolean.FALSE.toString()) .validate( args -> (boolean) args[0] || (boolean) args[1], "Should not set both history.enable and realtime.enable to false.", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 36b84e1e12712..b1faf4efa7f6e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex; import org.apache.iotdb.commons.consensus.index.ProgressIndexType; import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex; @@ -98,6 +99,8 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_DEFAULT_VALUE; @@ -110,6 +113,7 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STRICT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_KEY; @@ -149,6 +153,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource private boolean sloppyTimeRange; // true to disable time range filter after extraction private boolean sloppyPattern; // true to disable pattern filter after extraction + private boolean shouldOrderHistoricalTsFileByFlushTime = + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_DEFAULT_VALUE; private Pair listeningOptionPair; private boolean shouldExtractInsertion; @@ -170,6 +176,8 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource new HashMap<>(); private final Map pendingResource2ReplicateIndexForIoTV2 = new HashMap<>(); + private ProgressIndex maxHistoricalProgressIndex = MinimumProgressIndex.INSTANCE; + private boolean shouldReportMaxHistoricalProgressIndex = false; private int extractedHistoricalTsFileCount = 0; private int extractedHistoricalDeletionCount = 0; @@ -185,6 +193,13 @@ public void validate(final PipeParameterValidator validator) { throw new PipeParameterNotValidException(e.getMessage()); } + shouldOrderHistoricalTsFileByFlushTime = + parameters.getBooleanOrDefault( + Arrays.asList( + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY, + SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY), + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_DEFAULT_VALUE); + if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) { final boolean isStrictMode = parameters.getBooleanOrDefault( @@ -311,6 +326,12 @@ public void customize( throws IllegalPathException { shouldExtractInsertion = listeningOptionPair.getLeft(); shouldExtractDeletion = listeningOptionPair.getRight(); + shouldOrderHistoricalTsFileByFlushTime = + parameters.getBooleanOrDefault( + Arrays.asList( + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY, + SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY), + EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_DEFAULT_VALUE); final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment(); @@ -496,6 +517,8 @@ public synchronized void start() { hasBeenStarted = true; extractedHistoricalTsFileCount = 0; extractedHistoricalDeletionCount = 0; + maxHistoricalProgressIndex = MinimumProgressIndex.INSTANCE; + shouldReportMaxHistoricalProgressIndex = false; final DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId)); @@ -519,15 +542,15 @@ public synchronized void start() { .ifPresent(manager -> extractDeletions(manager, originalResourceList)); } + if (shouldUseHistoricalTsFileFlushTimeOrder()) { + prepareResourcesForHistoricalTsFileFlushTimeOrder(originalResourceList); + } + // Sort tsFileResource and deletionResource long startTime = System.currentTimeMillis(); LOGGER.info( DataNodePipeMessages.PIPE_START_TO_SORT_ALL_EXTRACTED_RESOURCES, pipeName, dataRegionId); - originalResourceList.sort( - (o1, o2) -> - startIndex instanceof TimeWindowStateProgressIndex - ? Long.compare(o1.getFileStartTime(), o2.getFileStartTime()) - : o1.getProgressIndex().topologicalCompareTo(o2.getProgressIndex())); + sortExtractedResources(originalResourceList); pendingQueue = new ArrayDeque<>(originalResourceList); PipeTerminateEvent.initializeHistoricalTransferSummary( pipeName, @@ -546,6 +569,88 @@ public synchronized void start() { } } + private boolean shouldUseHistoricalTsFileFlushTimeOrder() { + // Deletion resources only carry progressIndex. Keep the old progressIndex order when deletions + // are extracted together with TsFiles so insertion/deletion ordering semantics are unchanged. + return shouldOrderHistoricalTsFileByFlushTime + && shouldExtractInsertion + && !shouldExtractDeletion; + } + + private void prepareResourcesForHistoricalTsFileFlushTimeOrder( + final List resourceList) { + // Flush-time order is intentionally not compatible with progressIndex order, so report progress + // only after all selected historical TsFiles are supplied. This prefers possible retransmission + // over losing overwrite semantics. + resourceList.removeIf( + resource -> + resource instanceof TsFileResource + && !filteredTsFileResources2TableNames.containsKey(resource)); + updateMaxHistoricalProgressIndex(resourceList); + shouldReportMaxHistoricalProgressIndex = !resourceList.isEmpty(); + } + + private void updateMaxHistoricalProgressIndex(final List resourceList) { + for (final PersistentResource resource : resourceList) { + final ProgressIndex progressIndex = resource.getProgressIndex(); + if (Objects.nonNull(progressIndex)) { + maxHistoricalProgressIndex = + maxHistoricalProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex); + } + } + } + + private void sortExtractedResources(final List resourceList) { + if (shouldUseHistoricalTsFileFlushTimeOrder()) { + // Send TsFiles in source-side file creation order. For duplicated points, older files are + // loaded first on the receiver and newer files are loaded later to preserve overwrite + // semantics. + resourceList.sort( + (o1, o2) -> + o1 instanceof TsFileResource && o2 instanceof TsFileResource + ? compareTsFileResourcesByFlushTime((TsFileResource) o1, (TsFileResource) o2) + : comparePersistentResourcesByProgressIndex(o1, o2)); + return; + } + + resourceList.sort( + (o1, o2) -> + startIndex instanceof TimeWindowStateProgressIndex + ? Long.compare(o1.getFileStartTime(), o2.getFileStartTime()) + : comparePersistentResourcesByProgressIndex(o1, o2)); + } + + private int comparePersistentResourcesByProgressIndex( + final PersistentResource resource1, final PersistentResource resource2) { + return resource1.getProgressIndex().topologicalCompareTo(resource2.getProgressIndex()); + } + + private int compareTsFileResourcesByFlushTime( + final TsFileResource resource1, final TsFileResource resource2) { + if (resource1.isSeq() != resource2.isSeq()) { + return resource1.isSeq() ? -1 : 1; + } + + int result = Long.compare(resource1.getTsFileID().timestamp, resource2.getTsFileID().timestamp); + if (result != 0) { + return result; + } + + result = Long.compare(resource1.getVersion(), resource2.getVersion()); + if (result != 0) { + return result; + } + + result = + Long.compare( + resource1.getTsFileID().compactionVersion, resource2.getTsFileID().compactionVersion); + if (result != 0) { + return result; + } + + return resource1.getTsFilePath().compareTo(resource2.getTsFilePath()); + } + private void flushTsFilesForExtraction(DataRegion dataRegion) { LOGGER.info(DataNodePipeMessages.PIPE_START_TO_FLUSH_DATA_REGION, pipeName, dataRegionId); @@ -554,7 +659,8 @@ private void flushTsFilesForExtraction(DataRegion dataRegion) { // Since a large number of consensus pipes are not created at the same time, resulting in no // serious waiting for locks. Therefore, the flush operation is always performed for the // consensus pipe, and the lastFlushed timestamp is not updated here. - if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX) + || shouldUseHistoricalTsFileFlushTimeOrder()) { dataRegion.syncCloseAllWorkingTsFileProcessors(); } else { dataRegion.asyncCloseAllWorkingTsFileProcessors(); @@ -881,27 +987,36 @@ public synchronized Event supply() { return null; } - final PersistentResource resource = pendingQueue.peek(); - if (resource == null) { - return supplyTerminateEvent(); - } + while (true) { + final PersistentResource resource = pendingQueue.peek(); + if (resource == null) { + if (shouldReportMaxHistoricalProgressIndex) { + shouldReportMaxHistoricalProgressIndex = false; + return supplyProgressReportEvent(maxHistoricalProgressIndex); + } + return supplyTerminateEvent(); + } - if (resource instanceof TsFileResource) { - final TsFileResource tsFileResource = (TsFileResource) resource; - if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) { - clearReplicateIndexForResource(tsFileResource); + if (resource instanceof TsFileResource) { + final TsFileResource tsFileResource = (TsFileResource) resource; + if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) { + clearReplicateIndexForResource(tsFileResource); + pendingQueue.poll(); + if (shouldUseHistoricalTsFileFlushTimeOrder()) { + continue; + } + return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex()); + } + + final Event event = supplyTsFileEvent(tsFileResource); pendingQueue.poll(); - return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex()); + return event; } - final Event event = supplyTsFileEvent(tsFileResource); + final Event event = supplyDeletionEvent((DeletionResource) resource); pendingQueue.poll(); return event; } - - final Event event = supplyDeletionEvent((DeletionResource) resource); - pendingQueue.poll(); - return event; } private Event supplyTerminateEvent() { @@ -982,7 +1097,9 @@ protected Event supplyProgressReportEvent(final ProgressIndex progressIndex) { protected Event supplyTsFileEvent(final TsFileResource resource) { if (!filteredTsFileResources2TableNames.containsKey(resource)) { clearReplicateIndexForResource(resource); - return supplyProgressReportEvent(resource.getMaxProgressIndex()); + return shouldUseHistoricalTsFileFlushTimeOrder() + ? null + : supplyProgressReportEvent(resource.getMaxProgressIndex()); } boolean shouldUnpinResource = false; @@ -1010,6 +1127,10 @@ protected Event supplyTsFileEvent(final TsFileResource resource) { historicalDataExtractionStartTime, historicalDataExtractionEndTime); + if (shouldUseHistoricalTsFileFlushTimeOrder()) { + event.skipReportOnCommitAndGeneratedEvents(); + } + // if using IoTV2, assign a replicateIndex for this event if (shouldAssignReplicateIndexForIoTV2(event)) { event.setReplicateIndexForIoTV2(assignReplicateIndexForResource(resource)); @@ -1081,7 +1202,6 @@ private Event supplyDeletionEvent(final DeletionResource deletionResource) { cliHostname, skipIfNoPrivileges, false); - // if using IoTV2, assign a replicateIndex for this historical deletion event if (shouldAssignReplicateIndexForIoTV2(event)) { event.setReplicateIndexForIoTV2(assignReplicateIndexForResource(deletionResource)); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index cfe35ef041de9..f5c3f1fa05c8c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; @@ -53,6 +54,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; import java.time.LocalDate; import java.util.ArrayList; @@ -262,6 +264,63 @@ private void createTablet() { new Tablet(deviceId, Arrays.asList(schemas), times, values, null, times.length); } + @Test + public void markAsNeedToReportShouldInheritSourceTsFileGeneratedReportSkipping() + throws Exception { + final PipeTsFileInsertionEvent sourceEvent = Mockito.mock(PipeTsFileInsertionEvent.class); + Mockito.when(sourceEvent.shouldReportGeneratedEventsOnCommit()).thenReturn(true); + final PipeRawTabletInsertionEvent tabletEvent = + new PipeRawTabletInsertionEvent( + false, + null, + null, + null, + tabletForInsertTabletNode, + false, + null, + 0, + null, + sourceEvent, + false); + + tabletEvent.markAsNeedToReport(); + Assert.assertTrue(tabletEvent.isShouldReportOnCommit()); + + Mockito.when(sourceEvent.shouldReportGeneratedEventsOnCommit()).thenReturn(false); + final PipeRawTabletInsertionEvent skippedTabletEvent = + new PipeRawTabletInsertionEvent( + false, + null, + null, + null, + tabletForInsertTabletNode, + false, + null, + 0, + null, + sourceEvent, + false); + + skippedTabletEvent.markAsNeedToReport(); + Assert.assertFalse(skippedTabletEvent.isShouldReportOnCommit()); + + final PipeRawTabletInsertionEvent constructorSkippedTabletEvent = + new PipeRawTabletInsertionEvent( + false, + null, + null, + null, + tabletForInsertTabletNode, + false, + null, + 0, + null, + sourceEvent, + true); + + Assert.assertFalse(constructorSkippedTabletEvent.isShouldReportOnCommit()); + } + @Test public void convertToTabletForTest() throws Exception { TabletInsertionEventTreePatternParser container1 = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java index edffbb5b31c1f..53cf442c5239f 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java @@ -181,6 +181,152 @@ public void testSupplyRetriesSameTsFileAfterEventCreationFailure() throws Except } } + @Test + public void testHistoricalTsFileFlushTimeOrderDefaultsToTrue() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + + source.validate(new PipeParameterValidator(new PipeParameters(new HashMap<>()))); + + Assert.assertTrue((Boolean) getPrivateField(source, "shouldOrderHistoricalTsFileByFlushTime")); + } + + @Test + public void testHistoricalTsFileFlushTimeOrderSortsOlderFlushTimeFirst() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + final File tempDir = Files.createTempDirectory("pipeHistoricalTsFileOrder").toFile(); + + try { + final TsFileResource seqOlderFlushTimeNewerVersion = + createTsFileResource(tempDir, "50-3-0-0.tsfile"); + seqOlderFlushTimeNewerVersion.setSeq(true); + final TsFileResource seqSameFlushTimeOlderVersion = + createTsFileResource(tempDir, "100-1-0-0.tsfile"); + seqSameFlushTimeOlderVersion.setSeq(true); + final TsFileResource seqSameFlushTimeNewerVersion = + createTsFileResource(tempDir, "100-2-0-0.tsfile"); + seqSameFlushTimeNewerVersion.setSeq(true); + final TsFileResource unseqOldestFlushTime = createTsFileResource(tempDir, "1-1-0-0.tsfile"); + unseqOldestFlushTime.setSeq(false); + + setPrivateField(source, "shouldOrderHistoricalTsFileByFlushTime", true); + setPrivateField(source, "shouldExtractInsertion", true); + setPrivateField(source, "shouldExtractDeletion", false); + setPrivateField(source, "startIndex", MinimumProgressIndex.INSTANCE); + + final List resources = + new ArrayList<>( + Arrays.asList( + unseqOldestFlushTime, + seqSameFlushTimeNewerVersion, + seqSameFlushTimeOlderVersion, + seqOlderFlushTimeNewerVersion)); + sortExtractedResources(source, resources); + + Assert.assertEquals( + Arrays.asList( + seqOlderFlushTimeNewerVersion, + seqSameFlushTimeOlderVersion, + seqSameFlushTimeNewerVersion, + unseqOldestFlushTime), + resources); + } finally { + FileUtils.deleteFileOrDirectory(tempDir); + } + } + + @Test + public void testHistoricalTsFileFlushTimeOrderCanBeDisabled() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + final PipeParameters parameters = + new PipeParameters( + new HashMap() { + { + put( + PipeSourceConstant.SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY, + Boolean.FALSE.toString()); + } + }); + final File tempDir = Files.createTempDirectory("pipeHistoricalTsFileProgressOrder").toFile(); + + try { + source.validate(new PipeParameterValidator(parameters)); + final TsFileResource earlierProgressIndex = createTsFileResource(tempDir, "300-1-0-0.tsfile"); + earlierProgressIndex.updateProgressIndex(new SimpleProgressIndex(0, 1)); + final TsFileResource laterProgressIndex = createTsFileResource(tempDir, "100-1-0-0.tsfile"); + laterProgressIndex.updateProgressIndex(new SimpleProgressIndex(0, 2)); + + setPrivateField(source, "shouldExtractInsertion", true); + setPrivateField(source, "shouldExtractDeletion", false); + setPrivateField(source, "startIndex", MinimumProgressIndex.INSTANCE); + + final List resources = + new ArrayList<>(Arrays.asList(laterProgressIndex, earlierProgressIndex)); + sortExtractedResources(source, resources); + + Assert.assertFalse( + (Boolean) getPrivateField(source, "shouldOrderHistoricalTsFileByFlushTime")); + Assert.assertEquals(Arrays.asList(earlierProgressIndex, laterProgressIndex), resources); + } finally { + FileUtils.deleteFileOrDirectory(tempDir); + } + } + + @Test + public void testFlushTimeOrderReportsProgressAfterAllHistoricalResources() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + final ProgressIndex expectedProgressIndex = new SimpleProgressIndex(0, 10); + + setPrivateField(source, "hasBeenStarted", true); + setPrivateField(source, "pipeName", "pipe"); + setPrivateField(source, "creationTime", 1L); + setPrivateField(source, "dataRegionId", 1); + setPrivateField(source, "pipeTaskMeta", new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1)); + setPrivateField(source, "pendingQueue", new ArrayDeque()); + setPrivateField(source, "maxHistoricalProgressIndex", expectedProgressIndex); + setPrivateField(source, "shouldReportMaxHistoricalProgressIndex", true); + + final Event event = source.supply(); + + Assert.assertTrue(event instanceof ProgressReportEvent); + Assert.assertEquals(expectedProgressIndex, ((ProgressReportEvent) event).getProgressIndex()); + Assert.assertFalse((Boolean) getPrivateField(source, "shouldReportMaxHistoricalProgressIndex")); + } + + @Test + @SuppressWarnings("unchecked") + public void testFlushTimeOrderProgressOnlyCoversSelectedResources() throws Exception { + final PipeHistoricalDataRegionTsFileAndDeletionSource source = + new PipeHistoricalDataRegionTsFileAndDeletionSource(); + final File tempDir = Files.createTempDirectory("pipeHistoricalTsFileSelectedProgress").toFile(); + + try { + final TsFileResource selectedResource = createTsFileResource(tempDir, "100-1-0-0.tsfile"); + selectedResource.updateProgressIndex(new SimpleProgressIndex(0, 1)); + final TsFileResource filteredResource = createTsFileResource(tempDir, "200-1-0-0.tsfile"); + filteredResource.updateProgressIndex(new SimpleProgressIndex(0, 100)); + + ((Map>) + getPrivateField(source, "filteredTsFileResources2TableNames")) + .put(selectedResource, Set.of()); + + final List resources = + new ArrayList<>(Arrays.asList(filteredResource, selectedResource)); + prepareResourcesForHistoricalTsFileFlushTimeOrder(source, resources); + + Assert.assertEquals(Arrays.asList(selectedResource), resources); + Assert.assertEquals( + new SimpleProgressIndex(0, 1), getPrivateField(source, "maxHistoricalProgressIndex")); + Assert.assertTrue( + (Boolean) getPrivateField(source, "shouldReportMaxHistoricalProgressIndex")); + } finally { + FileUtils.deleteFileOrDirectory(tempDir); + } + } + @Test public void testReplicateIndexShouldBeStableBeforeResourceConsumed() throws Exception { final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source = @@ -302,6 +448,28 @@ private static ProgressIndex hybridProgressIndex( return result; } + private static void sortExtractedResources( + final PipeHistoricalDataRegionTsFileAndDeletionSource source, + final List resources) + throws ReflectiveOperationException { + final Method method = + PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredMethod( + "sortExtractedResources", List.class); + method.setAccessible(true); + method.invoke(source, resources); + } + + private static void prepareResourcesForHistoricalTsFileFlushTimeOrder( + final PipeHistoricalDataRegionTsFileAndDeletionSource source, + final List resources) + throws ReflectiveOperationException { + final Method method = + PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredMethod( + "prepareResourcesForHistoricalTsFileFlushTimeOrder", List.class); + method.setAccessible(true); + method.invoke(source, resources); + } + private static void setPrivateField( final PipeHistoricalDataRegionTsFileAndDeletionSource source, final String fieldName, diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java index e755ef3c6019e..a44a34d14fee6 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java @@ -101,6 +101,11 @@ public class PipeSourceConstant { public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE = "path"; public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_ALL_VALUE = "all"; public static final String EXTRACTOR_HISTORY_LOOSE_RANGE_DEFAULT_VALUE = ""; + public static final String EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY = + "extractor.history.tsfile.order-by-flush-time"; + public static final String SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY = + "source.history.tsfile.order-by-flush-time"; + public static final boolean EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_DEFAULT_VALUE = true; public static final String EXTRACTOR_MODS_ENABLE_KEY = "extractor.mods.enable"; public static final String SOURCE_MODS_ENABLE_KEY = "source.mods.enable"; public static final boolean EXTRACTOR_MODS_ENABLE_DEFAULT_VALUE = false;