Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
this.isAligned = isAligned;
this.sourceEvent = sourceEvent;
this.needToReport = needToReport;
inheritSourceEventReportSkippingIfNecessary();

// Allocate empty memory block, will be resized later.
this.allocatedMemoryBlock =
Expand Down Expand Up @@ -399,6 +400,18 @@
});
}
this.needToReport = true;
inheritSourceEventReportSkippingIfNecessary();
}

private void inheritSourceEventReportSkippingIfNecessary() {
if (needToReport && shouldSkipReportOnCommitBecauseOfSourceEvent()) {
skipReportOnCommit();
}
}

private boolean shouldSkipReportOnCommitBecauseOfSourceEvent() {
return sourceEvent instanceof PipeTsFileInsertionEvent

Check warning on line 413 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this instanceof check and cast with 'instanceof PipeTsFileInsertionEvent pipetsfileinsertionevent'

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8gv7UHXgCgWw53Sruy&open=AZ8gv7UHXgCgWw53Sruy&pullRequest=18088
&& !((PipeTsFileInsertionEvent) sourceEvent).shouldReportGeneratedEventsOnCommit();
}

// This getter is reserved for user-defined plugins
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@
protected volatile ProgressIndex overridingProgressIndex;
private Set<String> tableNames;
private String tsFileDedupScopeID;
// False when generated tablet events should wait for an external progress report.
private volatile boolean shouldReportGeneratedEventsOnCommit = true;

// This is set to check the tsFile paths by privilege
private Map<IDeviceID, String[]> treeSchemaMap;
Expand Down Expand Up @@ -462,6 +464,23 @@
return resource.getMaxProgressIndex();
}

public PipeTsFileInsertionEvent skipReportOnCommitAndGeneratedEvents() {
return setShouldReportGeneratedEventsOnCommit(false);
}

public boolean shouldReportGeneratedEventsOnCommit() {
return shouldReportGeneratedEventsOnCommit;
}

private PipeTsFileInsertionEvent setShouldReportGeneratedEventsOnCommit(
final boolean shouldReportGeneratedEventsOnCommit) {
this.shouldReportGeneratedEventsOnCommit = shouldReportGeneratedEventsOnCommit;
if (!shouldReportGeneratedEventsOnCommit) {
skipReportOnCommit();
}
return this;
}

public void eliminateProgressIndex() {
if (Objects.isNull(overridingProgressIndex)
&& Objects.nonNull(resource)
Expand Down Expand Up @@ -517,7 +536,8 @@
startTime,
endTime,
isTsFileSealed)
.bindTsFileDedupScopeID(tsFileDedupScopeID);
.bindTsFileDedupScopeID(tsFileDedupScopeID)
.setShouldReportGeneratedEventsOnCommit(shouldReportGeneratedEventsOnCommit);
}

@Override
Expand All @@ -526,7 +546,7 @@
}

@Override
public void throwIfNoPrivilege() {

Check warning on line 549 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 100 to 64, Complexity from 21 to 14, Nesting Level from 5 to 2, Number of Variables from 13 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8gv7ZSXgCgWw53Sruz&open=AZ8gv7ZSXgCgWw53Sruz&pullRequest=18088
try {
if (AuthorityChecker.SUPER_USER.equals(userName)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STREAMING_DEFAULT_VALUE;
Expand Down Expand Up @@ -107,6 +108,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_SNAPSHOT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STREAMING_KEY;
Expand Down Expand Up @@ -186,6 +188,16 @@ public void validate(final PipeParameterValidator validator) throws Exception {
SOURCE_HISTORY_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
.validateAttributeValueRange(
SOURCE_REALTIME_ENABLE_KEY, true, Boolean.TRUE.toString(), Boolean.FALSE.toString())
.validateAttributeValueRange(
EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY,
true,
Boolean.TRUE.toString(),
Boolean.FALSE.toString())
.validateAttributeValueRange(
SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY,
true,
Boolean.TRUE.toString(),
Boolean.FALSE.toString())
.validate(
args -> (boolean) args[0] || (boolean) args[1],
"Should not set both history.enable and realtime.enable to false.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.StateProgressIndex;
import org.apache.iotdb.commons.consensus.index.impl.TimeWindowStateProgressIndex;
Expand Down Expand Up @@ -98,6 +99,8 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_PATH_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_LOOSE_RANGE_TIME_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODE_STRICT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_MODS_DEFAULT_VALUE;
Expand All @@ -110,6 +113,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_END_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_LOOSE_RANGE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_START_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODE_STRICT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_ENABLE_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.SOURCE_MODS_KEY;
Expand Down Expand Up @@ -149,6 +153,8 @@

private boolean sloppyTimeRange; // true to disable time range filter after extraction
private boolean sloppyPattern; // true to disable pattern filter after extraction
private boolean shouldOrderHistoricalTsFileByFlushTime =
EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_DEFAULT_VALUE;

private Pair<Boolean, Boolean> listeningOptionPair;
private boolean shouldExtractInsertion;
Expand All @@ -170,6 +176,8 @@
new HashMap<>();
private final Map<PersistentResource, Long> pendingResource2ReplicateIndexForIoTV2 =
new HashMap<>();
private ProgressIndex maxHistoricalProgressIndex = MinimumProgressIndex.INSTANCE;
private boolean shouldReportMaxHistoricalProgressIndex = false;
private int extractedHistoricalTsFileCount = 0;
private int extractedHistoricalDeletionCount = 0;

Expand All @@ -185,6 +193,13 @@
throw new PipeParameterNotValidException(e.getMessage());
}

shouldOrderHistoricalTsFileByFlushTime =
parameters.getBooleanOrDefault(
Arrays.asList(
EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY,
SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY),
EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_DEFAULT_VALUE);

if (parameters.hasAnyAttributes(EXTRACTOR_MODE_STRICT_KEY, SOURCE_MODE_STRICT_KEY)) {
final boolean isStrictMode =
parameters.getBooleanOrDefault(
Expand Down Expand Up @@ -311,6 +326,12 @@
throws IllegalPathException {
shouldExtractInsertion = listeningOptionPair.getLeft();
shouldExtractDeletion = listeningOptionPair.getRight();
shouldOrderHistoricalTsFileByFlushTime =
parameters.getBooleanOrDefault(
Arrays.asList(
EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY,
SOURCE_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_KEY),
EXTRACTOR_HISTORY_TSFILE_ORDER_BY_FLUSH_TIME_DEFAULT_VALUE);

final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment();

Expand Down Expand Up @@ -496,6 +517,8 @@
hasBeenStarted = true;
extractedHistoricalTsFileCount = 0;
extractedHistoricalDeletionCount = 0;
maxHistoricalProgressIndex = MinimumProgressIndex.INSTANCE;
shouldReportMaxHistoricalProgressIndex = false;

final DataRegion dataRegion =
StorageEngine.getInstance().getDataRegion(new DataRegionId(dataRegionId));
Expand All @@ -519,15 +542,15 @@
.ifPresent(manager -> extractDeletions(manager, originalResourceList));
}

if (shouldUseHistoricalTsFileFlushTimeOrder()) {
prepareResourcesForHistoricalTsFileFlushTimeOrder(originalResourceList);
}

// Sort tsFileResource and deletionResource
long startTime = System.currentTimeMillis();
LOGGER.info(
DataNodePipeMessages.PIPE_START_TO_SORT_ALL_EXTRACTED_RESOURCES, pipeName, dataRegionId);
originalResourceList.sort(
(o1, o2) ->
startIndex instanceof TimeWindowStateProgressIndex
? Long.compare(o1.getFileStartTime(), o2.getFileStartTime())
: o1.getProgressIndex().topologicalCompareTo(o2.getProgressIndex()));
sortExtractedResources(originalResourceList);
pendingQueue = new ArrayDeque<>(originalResourceList);
PipeTerminateEvent.initializeHistoricalTransferSummary(
pipeName,
Expand All @@ -546,6 +569,88 @@
}
}

private boolean shouldUseHistoricalTsFileFlushTimeOrder() {
// Deletion resources only carry progressIndex. Keep the old progressIndex order when deletions
// are extracted together with TsFiles so insertion/deletion ordering semantics are unchanged.
return shouldOrderHistoricalTsFileByFlushTime
&& shouldExtractInsertion
&& !shouldExtractDeletion;
}

private void prepareResourcesForHistoricalTsFileFlushTimeOrder(
final List<PersistentResource> resourceList) {
// Flush-time order is intentionally not compatible with progressIndex order, so report progress
// only after all selected historical TsFiles are supplied. This prefers possible retransmission
// over losing overwrite semantics.
resourceList.removeIf(
resource ->
resource instanceof TsFileResource
&& !filteredTsFileResources2TableNames.containsKey(resource));
updateMaxHistoricalProgressIndex(resourceList);
shouldReportMaxHistoricalProgressIndex = !resourceList.isEmpty();
}

private void updateMaxHistoricalProgressIndex(final List<PersistentResource> resourceList) {
for (final PersistentResource resource : resourceList) {
final ProgressIndex progressIndex = resource.getProgressIndex();
if (Objects.nonNull(progressIndex)) {
maxHistoricalProgressIndex =
maxHistoricalProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex);
}
}
}

private void sortExtractedResources(final List<PersistentResource> resourceList) {
if (shouldUseHistoricalTsFileFlushTimeOrder()) {
// Send TsFiles in source-side file creation order. For duplicated points, older files are
// loaded first on the receiver and newer files are loaded later to preserve overwrite
// semantics.
resourceList.sort(
(o1, o2) ->
o1 instanceof TsFileResource && o2 instanceof TsFileResource

Check warning on line 610 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this instanceof check and cast with 'instanceof TsFileResource tsfileresource'

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8gv7ZwXgCgWw53Sru0&open=AZ8gv7ZwXgCgWw53Sru0&pullRequest=18088
? compareTsFileResourcesByFlushTime((TsFileResource) o1, (TsFileResource) o2)
: comparePersistentResourcesByProgressIndex(o1, o2));
return;
}

resourceList.sort(
(o1, o2) ->
startIndex instanceof TimeWindowStateProgressIndex
? Long.compare(o1.getFileStartTime(), o2.getFileStartTime())
: comparePersistentResourcesByProgressIndex(o1, o2));
}

private int comparePersistentResourcesByProgressIndex(
final PersistentResource resource1, final PersistentResource resource2) {
return resource1.getProgressIndex().topologicalCompareTo(resource2.getProgressIndex());
}

private int compareTsFileResourcesByFlushTime(
final TsFileResource resource1, final TsFileResource resource2) {
if (resource1.isSeq() != resource2.isSeq()) {
return resource1.isSeq() ? -1 : 1;
}

int result = Long.compare(resource1.getTsFileID().timestamp, resource2.getTsFileID().timestamp);
if (result != 0) {
return result;
}

result = Long.compare(resource1.getVersion(), resource2.getVersion());
if (result != 0) {
return result;
}

result =
Long.compare(
resource1.getTsFileID().compactionVersion, resource2.getTsFileID().compactionVersion);
if (result != 0) {
return result;
}

return resource1.getTsFilePath().compareTo(resource2.getTsFilePath());
}

private void flushTsFilesForExtraction(DataRegion dataRegion) {
LOGGER.info(DataNodePipeMessages.PIPE_START_TO_FLUSH_DATA_REGION, pipeName, dataRegionId);

Expand All @@ -554,7 +659,8 @@
// Since a large number of consensus pipes are not created at the same time, resulting in no
// serious waiting for locks. Therefore, the flush operation is always performed for the
// consensus pipe, and the lastFlushed timestamp is not updated here.
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) {
if (pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
|| shouldUseHistoricalTsFileFlushTimeOrder()) {
dataRegion.syncCloseAllWorkingTsFileProcessors();
} else {
dataRegion.asyncCloseAllWorkingTsFileProcessors();
Expand Down Expand Up @@ -872,7 +978,7 @@
}

@Override
public synchronized Event supply() {

Check failure on line 981 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 18 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8gv7ZwXgCgWw53Sru1&open=AZ8gv7ZwXgCgWw53Sru1&pullRequest=18088
if (!hasBeenStarted && StorageEngine.getInstance().isReadyForNonReadWriteFunctions()) {
start();
}
Expand All @@ -881,27 +987,36 @@
return null;
}

final PersistentResource resource = pendingQueue.peek();
if (resource == null) {
return supplyTerminateEvent();
}
while (true) {
final PersistentResource resource = pendingQueue.peek();
if (resource == null) {
if (shouldReportMaxHistoricalProgressIndex) {
shouldReportMaxHistoricalProgressIndex = false;
return supplyProgressReportEvent(maxHistoricalProgressIndex);
}
return supplyTerminateEvent();
}

if (resource instanceof TsFileResource) {
final TsFileResource tsFileResource = (TsFileResource) resource;
if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) {
clearReplicateIndexForResource(tsFileResource);
if (resource instanceof TsFileResource) {
final TsFileResource tsFileResource = (TsFileResource) resource;
if (consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)) {
clearReplicateIndexForResource(tsFileResource);
pendingQueue.poll();
if (shouldUseHistoricalTsFileFlushTimeOrder()) {
continue;
}
return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex());
}

final Event event = supplyTsFileEvent(tsFileResource);
pendingQueue.poll();
return supplyProgressReportEvent(tsFileResource.getMaxProgressIndex());
return event;
}

final Event event = supplyTsFileEvent(tsFileResource);
final Event event = supplyDeletionEvent((DeletionResource) resource);
pendingQueue.poll();
return event;
}

final Event event = supplyDeletionEvent((DeletionResource) resource);
pendingQueue.poll();
return event;
}

private Event supplyTerminateEvent() {
Expand Down Expand Up @@ -979,10 +1094,12 @@
return isReferenceCountIncreased ? progressReportEvent : null;
}

protected Event supplyTsFileEvent(final TsFileResource resource) {

Check failure on line 1097 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ8gv7ZwXgCgWw53Sru2&open=AZ8gv7ZwXgCgWw53Sru2&pullRequest=18088
if (!filteredTsFileResources2TableNames.containsKey(resource)) {
clearReplicateIndexForResource(resource);
return supplyProgressReportEvent(resource.getMaxProgressIndex());
return shouldUseHistoricalTsFileFlushTimeOrder()
? null
: supplyProgressReportEvent(resource.getMaxProgressIndex());
}

boolean shouldUnpinResource = false;
Expand Down Expand Up @@ -1010,6 +1127,10 @@
historicalDataExtractionStartTime,
historicalDataExtractionEndTime);

if (shouldUseHistoricalTsFileFlushTimeOrder()) {
event.skipReportOnCommitAndGeneratedEvents();
}

// if using IoTV2, assign a replicateIndex for this event
if (shouldAssignReplicateIndexForIoTV2(event)) {
event.setReplicateIndexForIoTV2(assignReplicateIndexForResource(resource));
Expand Down Expand Up @@ -1081,7 +1202,6 @@
cliHostname,
skipIfNoPrivileges,
false);

// if using IoTV2, assign a replicateIndex for this historical deletion event
if (shouldAssignReplicateIndexForIoTV2(event)) {
event.setReplicateIndexForIoTV2(assignReplicateIndexForResource(deletionResource));
Expand Down
Loading
Loading