Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferSliceReqHandler;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferCompressedReq;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFilePieceReq;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV1;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
Expand Down Expand Up @@ -192,6 +193,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();

private PipeMemoryBlock allocatedMemoryBlock;
private final List<PipeMemoryBlock> allocatedSliceMemoryBlocks = new ArrayList<>();
private final Set<String> autoCreatedTreeDatabases = ConcurrentHashMap.newKeySet();
private final Set<String> conflictedTreeDatabases = ConcurrentHashMap.newKeySet();

Expand Down Expand Up @@ -219,7 +221,7 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
if (PipeRequestType.isValidatedRequestType(rawRequestType)) {
final PipeRequestType requestType = PipeRequestType.valueOf(rawRequestType);
if (requestType != PipeRequestType.TRANSFER_SLICE) {
sliceReqHandler.clear();
clearSliceReqHandler();
}
switch (requestType) {
case HANDSHAKE_DATANODE_V1:
Expand Down Expand Up @@ -442,8 +444,18 @@ public synchronized TPipeTransferResp receive(final TPipeTransferReq req) {
}
case TRANSFER_COMPRESSED:
{
long requestedMemorySizeInBytes = 0;
try {
return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
requestedMemorySizeInBytes =
PipeTransferCompressedReq.getMaxDecompressedLengthInBytes(req);
try (final PipeMemoryBlock ignored =
tryAllocateReceiverMemory(requestedMemorySizeInBytes)) {
return receive(PipeTransferCompressedReq.fromTPipeTransferReq(req));
}
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
return new TPipeTransferResp(
getReceiverTemporaryUnavailableStatus(
"decompressing pipe transfer request", requestedMemorySizeInBytes, e));
} finally {
PipeDataNodeReceiverMetrics.getInstance()
.recordTransferCompressedTimer(System.nanoTime() - startTime);
Expand Down Expand Up @@ -823,22 +835,102 @@ private String getConfigReceiverId() {
}

private TPipeTransferResp handleTransferSlice(final PipeTransferSliceReq pipeTransferSliceReq) {
final boolean isInorder = sliceReqHandler.receiveSlice(pipeTransferSliceReq);
if (!isInorder) {
final long sliceBodySizeInBytes = getSliceBodySizeInBytes(pipeTransferSliceReq);
long requestedMemorySizeInBytes = sliceBodySizeInBytes;
String memoryAction = "buffering sliced pipe transfer request";
PipeMemoryBlock sliceMemoryBlock = null;
try {
sliceMemoryBlock = tryAllocateReceiverMemory(sliceBodySizeInBytes);

final boolean isInorder = sliceReqHandler.receiveSlice(pipeTransferSliceReq);
if (!isInorder) {
closeMemoryBlock(sliceMemoryBlock);
clearSliceReqHandler();
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER,
"Slice request is out of order, please check the request sequence."));
}

allocatedSliceMemoryBlocks.add(sliceMemoryBlock);
sliceMemoryBlock = null;

if (pipeTransferSliceReq.getSliceIndex() + 1 < pipeTransferSliceReq.getSliceCount()) {
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.SUCCESS_STATUS,
"Slice received, waiting for more slices to complete the request."));
}

memoryAction = "assembling sliced pipe transfer request";
requestedMemorySizeInBytes = pipeTransferSliceReq.getOriginBodySize();
try (final PipeMemoryBlock ignored = tryAllocateReceiverMemory(requestedMemorySizeInBytes)) {
final Optional<TPipeTransferReq> req = sliceReqHandler.makeReqIfComplete();
if (!req.isPresent()) {
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.SUCCESS_STATUS,
"Slice received, waiting for more slices to complete the request."));
}
clearSliceReqHandler();
return receive(req.get());
}
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
closeMemoryBlock(sliceMemoryBlock);
clearSliceReqHandler();
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.PIPE_TRANSFER_SLICE_OUT_OF_ORDER,
"Slice request is out of order, please check the request sequence."));
getReceiverTemporaryUnavailableStatus(memoryAction, requestedMemorySizeInBytes, e));
} catch (final RuntimeException e) {
closeMemoryBlock(sliceMemoryBlock);
clearSliceReqHandler();
throw e;
}
final Optional<TPipeTransferReq> req = sliceReqHandler.makeReqIfComplete();
if (!req.isPresent()) {
return new TPipeTransferResp(
RpcUtils.getStatus(
TSStatusCode.SUCCESS_STATUS,
"Slice received, waiting for more slices to complete the request."));
}

private long getSliceBodySizeInBytes(final PipeTransferSliceReq pipeTransferSliceReq) {
return pipeTransferSliceReq.getSliceBody() == null
? 0
: pipeTransferSliceReq.getSliceBody().length;
}

private void clearSliceReqHandler() {
sliceReqHandler.clear();
allocatedSliceMemoryBlocks.forEach(this::closeMemoryBlock);
allocatedSliceMemoryBlocks.clear();
}

private void closeMemoryBlock(final PipeMemoryBlock memoryBlock) {
if (Objects.nonNull(memoryBlock)) {
memoryBlock.close();
}
// sliceReqHandler will be cleared in the receive(req) method
return receive(req.get());
}

private PipeMemoryBlock tryAllocateReceiverMemory(final long requestedMemorySizeInBytes)
throws PipeRuntimeOutOfMemoryCriticalException {
return PipeDataNodeResourceManager.memory()
.forceAllocate(Math.max(requestedMemorySizeInBytes, 0));
}

@Override
protected AutoCloseable tryAllocateMemoryForFilePiece(final PipeTransferFilePieceReq req)
throws PipeRuntimeOutOfMemoryCriticalException {
return tryAllocateReceiverMemory(req.getFilePiece() == null ? 0 : req.getFilePiece().length);
}

@Override
protected TSStatus getReceiverTemporaryUnavailableStatus(
final String action,
final long requestedMemorySizeInBytes,
final PipeRuntimeOutOfMemoryCriticalException e) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
.setMessage(
String.format(
"Temporarily out of memory when %s. Requested memory: %d bytes, used memory: %d bytes, free memory: %d bytes, total non-floating memory: %d bytes",
action,
requestedMemorySizeInBytes,
PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(),
PipeDataNodeResourceManager.memory().getFreeMemorySizeInBytes(),
PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()));
}

/**
Expand Down Expand Up @@ -1388,6 +1480,7 @@ private TSStatus executeStatementForTableModelWithPermissionCheck(

@Override
public synchronized void handleExit() {
clearSliceReqHandler();
if (Objects.nonNull(configReceiverId.get())) {
try {
ClusterConfigTaskExecutor.getInstance().handlePipeConfigClientExit(configReceiverId.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.audit.UserEntity;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.i18n.PipeMessages;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
Expand Down Expand Up @@ -386,7 +387,7 @@ protected final TPipeTransferResp handleTransferFilePiece(
final PipeTransferFilePieceReq req,
final boolean isRequestThroughAirGap,
final boolean isSingleFile) {
try {
try (final AutoCloseable ignored = tryAllocateMemoryForFilePiece(req)) {
updateWritingFileIfNeeded(req.getFileName(), isSingleFile);

// If the request is through air gap, the sender will resend the file piece from the beginning
Expand Down Expand Up @@ -419,6 +420,18 @@ protected final TPipeTransferResp handleTransferFilePiece(
writingFileWriter.write(req.getFilePiece());
return PipeTransferFilePieceResp.toTPipeTransferResp(
RpcUtils.SUCCESS_STATUS, writingFileWriter.length());
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
final TSStatus status =
getReceiverTemporaryUnavailableStatus(
"receiving pipe file piece", getFilePieceSizeInBytes(req), e);
PipeLogger.log(
LOGGER::warn, e, PipeMessages.RECEIVER_FAILED_WRITE_FILE_PIECE, receiverId.get(), req);
try {
return PipeTransferFilePieceResp.toTPipeTransferResp(
status, PipeTransferFilePieceResp.ERROR_END_OFFSET);
} catch (Exception ex) {
return PipeTransferFilePieceResp.toTPipeTransferResp(status);
}
} catch (final Exception e) {
PipeLogger.log(
LOGGER::warn, e, PipeMessages.RECEIVER_FAILED_WRITE_FILE_PIECE, receiverId.get(), req);
Expand All @@ -435,6 +448,26 @@ protected final TPipeTransferResp handleTransferFilePiece(
}
}

protected AutoCloseable tryAllocateMemoryForFilePiece(final PipeTransferFilePieceReq req)
throws PipeRuntimeOutOfMemoryCriticalException {
return () -> {};
}

protected TSStatus getReceiverTemporaryUnavailableStatus(
final String action,
final long requestedMemorySizeInBytes,
final PipeRuntimeOutOfMemoryCriticalException e) {
return new TSStatus(TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
.setMessage(
String.format(
"Temporarily out of memory when %s. Requested memory: %d bytes. Root cause: %s",
action, requestedMemorySizeInBytes, e.getMessage()));
}

private static long getFilePieceSizeInBytes(final PipeTransferFilePieceReq req) {
return req.getFilePiece() == null ? 0 : req.getFilePiece().length;
}

protected final void updateWritingFileIfNeeded(final String fileName, final boolean isSingleFile)
throws IOException {
if (isFileExistedAndNameCorrect(fileName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ public class PipeTransferSliceReqHandler {

private int sliceCount = -1;
private final List<byte[]> sliceBodies = new ArrayList<>();
private long receivedBodySize = 0;

public boolean receiveSlice(final PipeTransferSliceReq req) {
if (!isValidSliceReq(req)) {
clear();
return false;
}

if (orderId == -1
|| originReqType == -1
|| originBodySize == -1
Expand All @@ -59,6 +65,7 @@ public boolean receiveSlice(final PipeTransferSliceReq req) {
originReqType = req.getOriginReqType();
originBodySize = req.getOriginBodySize();
sliceCount = req.getSliceCount();
receivedBodySize = 0;
} else {
LOGGER.warn(
PipeMessages.INVALID_STATE_SLICE,
Expand Down Expand Up @@ -105,10 +112,39 @@ public boolean receiveSlice(final PipeTransferSliceReq req) {
return false;
}

if (receivedBodySize + req.getSliceBody().length > originBodySize) {
LOGGER.warn(
"Received slice body size {} exceeds origin body size {}",
receivedBodySize + req.getSliceBody().length,
originBodySize);
clear();
return false;
}

sliceBodies.add(req.getSliceBody());
receivedBodySize += req.getSliceBody().length;

if (sliceBodies.size() == sliceCount && receivedBodySize != originBodySize) {
LOGGER.warn(
"Received slice body size {} is not equal to origin body size {}",
receivedBodySize,
originBodySize);
clear();
return false;
}

return true;
}

private boolean isValidSliceReq(final PipeTransferSliceReq req) {
return req.getOriginBodySize() >= 0
&& req.getSliceCount() > 0
&& req.getSliceIndex() >= 0
&& req.getSliceIndex() < req.getSliceCount()
&& req.getSliceBody() != null
&& req.getSliceBody().length <= req.getOriginBodySize();
}

public Optional<TPipeTransferReq> makeReqIfComplete() {
if (sliceBodies.size() != sliceCount) {
return Optional.empty();
Expand All @@ -132,5 +168,6 @@ public void clear() {
originBodySize = -1;
sliceCount = -1;
sliceBodies.clear();
receivedBodySize = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,21 @@ public static TPipeTransferReq fromTPipeTransferReq(final TPipeTransferReq trans
return decompressedReq;
}

/** Get the largest intermediate decompressed body size without consuming the request body. */
public static int getMaxDecompressedLengthInBytes(final TPipeTransferReq transferReq) {
final ByteBuffer compressedBuffer = transferReq.body.duplicate();

int maxDecompressedLength = compressedBuffer.remaining();
final int compressorsSize = ReadWriteIOUtils.readByte(compressedBuffer);
for (int i = 0; i < compressorsSize; ++i) {
ReadWriteIOUtils.readByte(compressedBuffer);
final int decompressedLength = ReadWriteIOUtils.readInt(compressedBuffer);
checkDecompressedLength(decompressedLength);
maxDecompressedLength = Math.max(maxDecompressedLength, decompressedLength);
}
return maxDecompressedLength;
}

/** This method is used to prevent decompression bomb attacks. */
private static void checkDecompressedLength(final int decompressedLength)
throws IllegalArgumentException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ public void testPipeTransferSliceReqFromLegacyV13Body() throws IOException {
Assert.assertEquals(2, sliceReq.getSliceCount());
}

@Test
public void testSliceReqHandlerRejectsOversizedSlices() throws IOException {
final TPipeTransferReq req = createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 6);
final PipeTransferSliceReq firstSlice =
PipeTransferSliceReq.toTPipeTransferReq(7, req.getType(), 0, 2, req.body.duplicate(), 0, 4);
final PipeTransferSliceReq oversizedSecondSlice =
PipeTransferSliceReq.toTPipeTransferReq(7, req.getType(), 1, 2, req.body.duplicate(), 2, 6);

final PipeTransferSliceReqHandler handler = new PipeTransferSliceReqHandler();
Assert.assertTrue(handler.receiveSlice(firstSlice));
Assert.assertFalse(handler.receiveSlice(oversizedSecondSlice));
Assert.assertFalse(handler.makeReqIfComplete().isPresent());
}

private static TPipeTransferReq createReq(final byte version, final int bodySize) {
final byte[] body = new byte[bodySize];
for (int i = 0; i < body.length; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public void testPipeTransferCompressedReq() throws IOException {
Collections.singletonList(
PipeCompressorFactory.getCompressor(
PipeCompressor.PipeCompressionType.GZIP.getIndex())));
Assert.assertEquals(
Math.max(compressedReq.body.remaining(), originalReq.body.remaining() + 3),
PipeTransferCompressedReq.getMaxDecompressedLengthInBytes(compressedReq));
Assert.assertEquals(0, compressedReq.body.position());

final TPipeTransferReq decompressedReq =
PipeTransferCompressedReq.fromTPipeTransferReq(compressedReq);

Expand Down
Loading