Skip to content

Pipe: improve incremental memory estimation#18049

Open
Caideyipi wants to merge 4 commits into
masterfrom
pipe-memory-estimation-short-term
Open

Pipe: improve incremental memory estimation#18049
Caideyipi wants to merge 4 commits into
masterfrom
pipe-memory-estimation-short-term

Conversation

@Caideyipi

Copy link
Copy Markdown
Collaborator

Description

This PR improves the pipe creation memory admission check from a coarse per-pipe estimate to an incremental estimate based on the tasks and sink subtasks that this DataNode will actually create.

Changes include:

  • Estimate memory by local region tasks, and also check newly added single-region tasks.
  • Account for sink subtask reuse so shared sink pending queues/subtasks are not charged repeatedly.
  • Reuse the actual sink subtask key and parallelism logic from PipeSinkSubtaskManager.
  • Match task builder parameter blending/preprocessing during admission estimation.
  • Estimate batch memory from connector.batch.size-bytes/sink.batch.size-bytes, including leader-cache endpoint sharding.
  • Estimate TsFile send read buffer with pipeSinkReadFileBufferSize.
  • Skip admission checks for idempotent create no-ops, while checking before dropping old tasks during recreate.

Verification

  • mvn spotless:apply -pl iotdb-core/datanode,iotdb-core/node-commons
  • git -c core.preloadIndex=false diff --check -- iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/builder/PipeDataNodeTaskBuilder.java iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtaskManager.java
  • mvn "-Ddevelocity.off=true" "-Dcheckstyle.skip=true" "-Dspotless.check.skip=true" "-DskipTests" compile -pl iotdb-core/node-commons

Also ran datanode compile:

  • mvn "-Ddevelocity.off=true" "-Dcheckstyle.skip=true" "-Dspotless.check.skip=true" "-DskipTests" compile -pl iotdb-core/datanode
  • It reached javac but failed on existing generated-source/dependency issues such as missing IFill, Accumulator, and org.apache.commons.collections4.comparators, not in the modified pipe files.

@codecov

codecov Bot commented Jun 29, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 44.97354% with 104 lines in your changes missing coverage. Please review.
✅ Project coverage is 41.47%. Comparing base (7f41f06) to head (f13df43).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
...otdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java 43.26% 80 Missing ⚠️
...e/iotdb/commons/pipe/agent/task/PipeTaskAgent.java 0.00% 11 Missing ⚠️
...gent/task/subtask/sink/PipeSinkSubtaskManager.java 67.85% 9 Missing ⚠️
...pe/agent/task/builder/PipeDataNodeTaskBuilder.java 55.55% 4 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18049      +/-   ##
============================================
+ Coverage     41.42%   41.47%   +0.04%     
  Complexity      318      318              
============================================
  Files          5282     5286       +4     
  Lines        369246   369673     +427     
  Branches      47786    47847      +61     
============================================
+ Hits         152978   153326     +348     
- Misses       216268   216347      +79     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines +1001 to +1003
String.format(
"%s Need Floating memory: %d bytes, free Floating memory: %d bytes",
PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE,
PipeConfig.getInstance().getPipeInsertNodeQueueMemory(),
remainingMemory);
PipeMessages.NOT_ENOUGH_MEMORY_FOR_PIPE, needFloatingMemory, remainingMemory);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i18n

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I moved the memory check details into PipeMessages with both English and Chinese i18n entries, and updated the code to use those message constants.

Caideyipi added 2 commits July 1, 2026 11:01
…ion-short-term

# Conflicts:
#	iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@sonarqubecloud

sonarqubecloud Bot commented Jul 3, 2026

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants