From 742ca01d6cdbdf04d5da74f5a15ddf37d6d98d10 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Wed, 1 Jul 2026 18:32:51 +0800 Subject: [PATCH] Pipe: add ConfigNode memory control for log reducer (#18069) * Pipe: add ConfigNode memory control for log reducer * Address ConfigNode memory config review (cherry picked from commit 9554e5d8ef09763c32d7af408c491e6b1b617c0a) --- .../confignode/conf/ConfigNodeDescriptor.java | 9 ++ .../conf/ConfigNodeMemoryConfig.java | 82 +++++++++++++++++++ .../runtime/PipeConfigNodeRuntimeAgent.java | 10 +++ .../PipeConfigNodeResourceManager.java | 16 ++++ .../conf/ConfigNodeMemoryConfigTest.java | 42 ++++++++++ .../apache/iotdb/db/conf/IoTDBDescriptor.java | 2 +- .../runtime/PipeDataNodeRuntimeAgent.java | 22 ++++- .../thrift/IoTDBDataNodeReceiver.java | 2 +- .../conf/iotdb-system.properties.template | 5 ++ .../log/PipePeriodicalLogReducer.java | 71 ++++++++-------- .../log/PipePeriodicalLogReducerTest.java | 65 +++++++++++++++ 11 files changed, 290 insertions(+), 36 deletions(-) create mode 100644 iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java create mode 100644 iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfigTest.java rename iotdb-core/{datanode/src/main/java/org/apache/iotdb/db => node-commons/src/main/java/org/apache/iotdb/commons}/pipe/resource/log/PipePeriodicalLogReducer.java (52%) create mode 100644 iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducerTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 83cf1b612d01d..d91f6a2251c2f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -56,6 +56,8 @@ public class ConfigNodeDescriptor { private final ConfigNodeConfig conf = new ConfigNodeConfig(); + private final ConfigNodeMemoryConfig memoryConfig = new ConfigNodeMemoryConfig(); + static { URL systemConfigUrl = getPropsUrl(CommonConfig.SYSTEM_CONFIG_NAME); URL configNodeUrl = getPropsUrl(CommonConfig.OLD_CONFIG_NODE_CONFIG_NAME); @@ -79,6 +81,10 @@ public ConfigNodeConfig getConf() { return conf; } + public ConfigNodeMemoryConfig getMemoryConfig() { + return memoryConfig; + } + /** * Get props url location. * @@ -143,10 +149,13 @@ private void loadProps() { LOGGER.warn( "Couldn't load the configuration {} from any of the known sources.", CommonConfig.SYSTEM_CONFIG_NAME); + memoryConfig.init(trimProperties); } } private void loadProperties(TrimProperties properties) throws BadNodeUrlException, IOException { + memoryConfig.init(properties); + conf.setClusterName( properties.getProperty(IoTDBConstant.CLUSTER_NAME, conf.getClusterName()).trim()); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java new file mode 100644 index 0000000000000..8791549c371da --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfig.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.conf; + +import org.apache.iotdb.commons.conf.TrimProperties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConfigNodeMemoryConfig { + public static final String PIPE_MEMORY_MANAGER_NAME = "Pipe"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigNodeMemoryConfig.class); + + private long maxMemorySizeInBytes; + + private long pipeMemorySizeInBytes; + + private long freeMemorySizeInBytes; + + public void init(final TrimProperties properties) { + final String memoryAllocateProportion = + properties.getProperty("confignode_memory_proportion", null); + + maxMemorySizeInBytes = Runtime.getRuntime().maxMemory(); + pipeMemorySizeInBytes = maxMemorySizeInBytes / 10; + freeMemorySizeInBytes = maxMemorySizeInBytes - pipeMemorySizeInBytes; + + if (memoryAllocateProportion != null) { + final String[] proportions = memoryAllocateProportion.split(":"); + if (proportions.length >= 2) { + int proportionSum = 0; + for (final String proportion : proportions) { + proportionSum += Integer.parseInt(proportion.trim()); + } + + if (proportionSum != 0) { + pipeMemorySizeInBytes = + maxMemorySizeInBytes * Integer.parseInt(proportions[0].trim()) / proportionSum; + freeMemorySizeInBytes = maxMemorySizeInBytes - pipeMemorySizeInBytes; + } + } else { + LOGGER.warn( + "The parameter confignode_memory_proportion should be in the form of Pipe:Free, " + + "but got {}. Use default value 1:9.", + memoryAllocateProportion); + } + } + + LOGGER.info("initial ConfigNode allocateMemoryForPipe = {}", pipeMemorySizeInBytes); + LOGGER.info("initial ConfigNode freeMemory = {}", freeMemorySizeInBytes); + } + + public long getMaxMemorySizeInBytes() { + return maxMemorySizeInBytes; + } + + public long getPipeMemorySizeInBytes() { + return pipeMemorySizeInBytes; + } + + public long getFreeMemorySizeInBytes() { + return freeMemorySizeInBytes; + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java index e8cc16a1edf7d..10c710a64adca 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/runtime/PipeConfigNodeRuntimeAgent.java @@ -26,10 +26,13 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent; import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeCopiedFileDirStartupCleaner; +import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager; import org.apache.iotdb.confignode.manager.pipe.source.ConfigRegionListeningQueue; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; @@ -55,6 +58,7 @@ public class PipeConfigNodeRuntimeAgent implements IService { @Override public synchronized void start() { PipeConfig.getInstance().printAllConfigs(); + initPipePeriodicalLogReducer(); // PipeTasks will not be started here and will be started by "HandleLeaderChange" // procedure when the consensus layer notify leader ready @@ -91,6 +95,12 @@ public synchronized void stop() { LOGGER.info("PipeRuntimeConfigNodeAgent stopped"); } + private void initPipePeriodicalLogReducer() { + PipePeriodicalLogReducer.setMemoryResizeFunction( + PipeConfigNodeResourceManager::resizeLogReducerMemory); + PipeLogger.setLogger(PipePeriodicalLogReducer::log); + } + public boolean isShutdown() { return isShutdown.get(); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java index 33b68f63821aa..635e6b7b1f8c1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/resource/PipeConfigNodeResourceManager.java @@ -22,12 +22,16 @@ import org.apache.iotdb.commons.pipe.resource.log.PipeLogManager; import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.manager.pipe.resource.ref.PipeConfigNodePhantomReferenceManager; import org.apache.iotdb.confignode.manager.pipe.resource.snapshot.PipeConfigNodeSnapshotResourceManager; +import java.util.concurrent.atomic.AtomicLong; + public class PipeConfigNodeResourceManager { private final PipeSnapshotResourceManager pipeSnapshotResourceManager; + private final AtomicLong pipeLogReducerMemoryUsageInBytes = new AtomicLong(0); private final PipeLogManager pipeLogManager; private final PipePhantomReferenceManager pipePhantomReferenceManager; @@ -36,6 +40,10 @@ public static PipeSnapshotResourceManager snapshot() { .pipeSnapshotResourceManager; } + public static long resizeLogReducerMemory(final long targetSizeInBytes) { + return PipeResourceManagerHolder.INSTANCE.resizePipeLogReducerMemory(targetSizeInBytes); + } + public static PipeLogManager log() { return PipeConfigNodeResourceManager.PipeResourceManagerHolder.INSTANCE.pipeLogManager; } @@ -46,6 +54,14 @@ public static PipePhantomReferenceManager ref() { ///////////////////////////// SINGLETON ///////////////////////////// + private long resizePipeLogReducerMemory(final long targetSizeInBytes) { + final long pipeMemorySizeInBytes = + ConfigNodeDescriptor.getInstance().getMemoryConfig().getPipeMemorySizeInBytes(); + final long resizedSizeInBytes = Math.min(Math.max(0, targetSizeInBytes), pipeMemorySizeInBytes); + pipeLogReducerMemoryUsageInBytes.set(resizedSizeInBytes); + return pipeLogReducerMemoryUsageInBytes.get(); + } + private PipeConfigNodeResourceManager() { pipeSnapshotResourceManager = new PipeConfigNodeSnapshotResourceManager(); pipeLogManager = new PipeLogManager(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfigTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfigTest.java new file mode 100644 index 0000000000000..ff615131a030d --- /dev/null +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/conf/ConfigNodeMemoryConfigTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.conf; + +import org.apache.iotdb.commons.conf.TrimProperties; + +import org.junit.Assert; +import org.junit.Test; + +public class ConfigNodeMemoryConfigTest { + + @Test + public void testConfigNodeMemoryProportion() { + final TrimProperties properties = new TrimProperties(); + properties.setProperty("confignode_memory_proportion", "1:3"); + + final ConfigNodeMemoryConfig memoryConfig = new ConfigNodeMemoryConfig(); + memoryConfig.init(properties); + + Assert.assertEquals( + Runtime.getRuntime().maxMemory() / 4, memoryConfig.getPipeMemorySizeInBytes()); + Assert.assertEquals( + Runtime.getRuntime().maxMemory() * 3 / 4, memoryConfig.getFreeMemorySizeInBytes()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index a838203a94381..792433829c6d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.conf.TrimProperties; import org.apache.iotdb.commons.exception.BadNodeUrlException; import org.apache.iotdb.commons.pipe.config.PipeDescriptor; +import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.utils.NodeUrlUtils; @@ -34,7 +35,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig; import org.apache.iotdb.db.consensus.DataRegionConsensusImpl; import org.apache.iotdb.db.exception.query.QueryProcessException; -import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy; import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter; import org.apache.iotdb.db.storageengine.StorageEngine; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java index d47a8c4dbbd8a..f88e98d7165de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java @@ -32,13 +32,15 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.commons.service.IService; import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent; import org.apache.iotdb.db.pipe.resource.PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner; -import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.source.schemaregion.SchemaRegionListeningQueue; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.service.ResourcesInformationHolder; @@ -70,6 +72,8 @@ public class PipeDataNodeRuntimeAgent implements IService { private final PipePeriodicalPhantomReferenceCleaner pipePeriodicalPhantomReferenceCleaner = new PipePeriodicalPhantomReferenceCleaner(); + private PipeMemoryBlock pipeLogReducerMemoryBlock; + //////////////////////////// System Service Interface //////////////////////////// public synchronized void preparePipeResources( @@ -85,6 +89,22 @@ public synchronized void preparePipeResources( IoTDBPipePattern.setDevicePathGetter(CompactionPathUtils::getPath); IoTDBPipePattern.setMeasurementPathGetter(CompactionPathUtils::getPath); + initPipePeriodicalLogReducer(); + } + + private void initPipePeriodicalLogReducer() { + if (pipeLogReducerMemoryBlock == null) { + pipeLogReducerMemoryBlock = + PipeDataNodeResourceManager.memory() + .tryAllocate(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes()); + } + + PipePeriodicalLogReducer.setMemoryResizeFunction( + targetSizeInBytes -> { + PipeDataNodeResourceManager.memory() + .resize(pipeLogReducerMemoryBlock, Math.max(0, targetSizeInBytes), false); + return pipeLogReducerMemoryBlock.getMemoryUsageInBytes(); + }); PipeLogger.setLogger(PipePeriodicalLogReducer::log); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 23f537fa2aea8..3cf0b434fea4c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -31,6 +31,7 @@ import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver; import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.commons.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.commons.pipe.sink.payload.airgap.AirGapPseudoTPipeTransferRequest; import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; @@ -52,7 +53,6 @@ import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementTSStatusVisitor; import org.apache.iotdb.db.pipe.receiver.visitor.PipeStatementToBatchVisitor; import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; -import org.apache.iotdb.db.pipe.resource.log.PipePeriodicalLogReducer; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 9b5b69b304d45..b977ce9de8c1e 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -730,6 +730,11 @@ disk_space_warning_threshold=0.05 # effectiveMode: restart datanode_memory_proportion=3:3:1:1:1:1 +# ConfigNode Memory Allocation Ratio: Pipe and Free Memory. +# The parameter form is a:b, where a and b are integers. Currently, only PipePeriodicalLogReducer is connected to Pipe memory on ConfigNode. +# effectiveMode: restart +confignode_memory_proportion=1:9 + # Schema Memory Allocation Ratio: SchemaRegion, SchemaCache, and PartitionCache. # The parameter form is a:b:c, where a, b and c are integers. for example: 1:1:1 , 6:2:1 # effectiveMode: restart diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java similarity index 52% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java rename to iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java index 3f5a013320d68..fa3f29ccbe347 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/log/PipePeriodicalLogReducer.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducer.java @@ -17,11 +17,9 @@ * under the License. */ -package org.apache.iotdb.db.pipe.resource.log; +package org.apache.iotdb.commons.pipe.resource.log; import org.apache.iotdb.commons.pipe.config.PipeConfig; -import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; -import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -31,43 +29,57 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.function.LongUnaryOperator; public class PipePeriodicalLogReducer { private static final Logger LOGGER = LoggerFactory.getLogger(PipePeriodicalLogReducer.class); - private static final PipeMemoryBlock block; - protected static final Cache loggerCache; - static { - // Never close because it's static - block = - PipeDataNodeResourceManager.memory() - .tryAllocate(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes()); - loggerCache = - Caffeine.newBuilder() - .expireAfterWrite( - PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(), TimeUnit.SECONDS) - .weigher( - (k, v) -> - Math.toIntExact( - RamUsageEstimator.sizeOf((String) k) - + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY)) - .maximumWeight(block.getMemoryUsageInBytes()) - .build(); + private static final LongUnaryOperator DEFAULT_MEMORY_RESIZE_FUNCTION = + sizeInBytes -> sizeInBytes; + + private static volatile LongUnaryOperator memoryResizeFunction = DEFAULT_MEMORY_RESIZE_FUNCTION; + + protected static final Cache LOGGER_CACHE = + Caffeine.newBuilder() + .expireAfterWrite( + PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(), TimeUnit.SECONDS) + .weigher(PipePeriodicalLogReducer::estimateSize) + .maximumWeight(PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes()) + .build(); + + private static int estimateSize(final String key, final String value) { + return Math.toIntExact( + RamUsageEstimator.sizeOf(key) + RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY); } public static boolean log( final Consumer loggerFunction, final String rawMessage, final Object... formatter) { final String loggerMessage = String.format(rawMessage, formatter); - if (!loggerCache.asMap().containsKey(loggerMessage)) { - loggerCache.put(loggerMessage, loggerMessage); + if (!LOGGER_CACHE.asMap().containsKey(loggerMessage)) { + LOGGER_CACHE.put(loggerMessage, loggerMessage); loggerFunction.accept(loggerMessage); return true; } return false; } - public static void update() { - loggerCache + public static synchronized void setMemoryResizeFunction( + final LongUnaryOperator memoryResizeFunction) { + PipePeriodicalLogReducer.memoryResizeFunction = + memoryResizeFunction == null ? DEFAULT_MEMORY_RESIZE_FUNCTION : memoryResizeFunction; + update(); + } + + public static synchronized void update() { + final long maxWeight = + memoryResizeFunction.applyAsLong( + PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes()); + LOGGER.info("PipePeriodicalLogReducer is allocated to {} bytes.", maxWeight); + update(maxWeight); + } + + public static synchronized void update(final long maxWeight) { + LOGGER_CACHE .policy() .expireAfterWrite() .ifPresent( @@ -75,14 +87,7 @@ public static void update() { time.setExpiresAfter( PipeConfig.getInstance().getPipePeriodicalLogMinIntervalSeconds(), TimeUnit.SECONDS)); - PipeDataNodeResourceManager.memory() - .resize(block, PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes(), false); - LOGGER.info( - "PipePeriodicalLogReducer is allocated to {} bytes.", block.getMemoryUsageInBytes()); - loggerCache - .policy() - .eviction() - .ifPresent(eviction -> eviction.setMaximum(block.getMemoryUsageInBytes())); + LOGGER_CACHE.policy().eviction().ifPresent(eviction -> eviction.setMaximum(maxWeight)); } private PipePeriodicalLogReducer() { diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducerTest.java new file mode 100644 index 0000000000000..d390cbb9fddb7 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/resource/log/PipePeriodicalLogReducerTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.resource.log; + +import org.apache.iotdb.commons.pipe.config.PipeConfig; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +public class PipePeriodicalLogReducerTest { + + @After + public void tearDown() { + PipePeriodicalLogReducer.setMemoryResizeFunction(null); + } + + @Test + public void testLogReducesDuplicateMessages() { + final AtomicInteger logCount = new AtomicInteger(0); + final String message = "PipePeriodicalLogReducerTest-" + System.nanoTime(); + + Assert.assertTrue(PipePeriodicalLogReducer.log(log -> logCount.incrementAndGet(), message)); + Assert.assertFalse(PipePeriodicalLogReducer.log(log -> logCount.incrementAndGet(), message)); + Assert.assertEquals(1, logCount.get()); + } + + @Test + public void testUpdateUsesMemoryResizeFunction() { + final AtomicLong requestedSizeInBytes = new AtomicLong(-1); + final long allocatedSizeInBytes = 1024; + + PipePeriodicalLogReducer.setMemoryResizeFunction( + sizeInBytes -> { + requestedSizeInBytes.set(sizeInBytes); + return allocatedSizeInBytes; + }); + + Assert.assertEquals( + PipeConfig.getInstance().getPipeLoggerCacheMaxSizeInBytes(), requestedSizeInBytes.get()); + Assert.assertEquals( + allocatedSizeInBytes, + PipePeriodicalLogReducer.LOGGER_CACHE.policy().eviction().get().getMaximum()); + } +}