diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeWriteBackSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeWriteBackSinkIT.java new file mode 100644 index 0000000000000..dda55ad4871d4 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeWriteBackSinkIT.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.pipe.it.manual; + +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema; +import org.apache.iotdb.rpc.TSStatusCode; + +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2ManualCreateSchema.class}) +public class IoTDBPipeWriteBackSinkIT extends AbstractPipeDualManualIT { + + @Test + public void testWriteBackSinkWithTargetDatabaseForTreeModel() throws Exception { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create database root.source", + "create timeseries root.source.d1.s1 with datatype=INT32,encoding=PLAIN", + "create database root.target.db", + "create timeseries root.target.db.d1.s1 with datatype=INT32,encoding=PLAIN"), + null); + + try (final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { + final Map sourceAttributes = new HashMap<>(); + final Map processorAttributes = new HashMap<>(); + final Map sinkAttributes = new HashMap<>(); + + sourceAttributes.put("extractor.inclusion", "data.insert"); + sourceAttributes.put("extractor.forwarding-pipe-requests", "false"); + sourceAttributes.put("extractor.path", "root.source.**"); + sourceAttributes.put("user", "root"); + + sinkAttributes.put("sink", "write-back-sink"); + sinkAttributes.put("sink.database", "root.target.db"); + sinkAttributes.put("user", "root"); + + final TSStatus status = + client.createPipe( + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(sourceAttributes) + .setProcessorAttributes(processorAttributes)); + Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); + } + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.source.d1(time, s1) values (1, 1)", + "insert into root.source.d1(time, s1) values (2, 2)", + "flush"), + null); + + TestUtils.assertDataEventuallyOnEnv( + senderEnv, + "select * from root.target.db.**", + "Time,root.target.db.d1.s1,", + Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1,", "2,2,")))); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java index 269d0059c6695..f5a7ad2ff31a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java @@ -20,8 +20,13 @@ package org.apache.iotdb.db.pipe.sink.protocol.writeback; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException; +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; @@ -36,7 +41,11 @@ import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; +import org.apache.iotdb.db.schemaengine.schemaregion.utils.MetaFormatUtils; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; @@ -52,21 +61,93 @@ import org.slf4j.LoggerFactory; import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Locale; +import java.util.Objects; + +import static org.apache.iotdb.commons.conf.IoTDBConstant.MAX_DATABASE_NAME_LENGTH; public class WriteBackSink implements PipeConnector { private static final Logger LOGGER = LoggerFactory.getLogger(WriteBackSink.class); + private static final String CONNECTOR_IOTDB_DATABASE_KEY = "connector.database"; + private static final String SINK_IOTDB_DATABASE_KEY = "sink.database"; + + private String targetTreeModelDatabaseName; @Override public void validate(final PipeParameterValidator validator) throws Exception { - // Do nothing + validator.validateSynonymAttributes( + Collections.singletonList(CONNECTOR_IOTDB_DATABASE_KEY), + Collections.singletonList(SINK_IOTDB_DATABASE_KEY), + false); + + final String targetDatabase = + validator + .getParameters() + .getStringByKeys(CONNECTOR_IOTDB_DATABASE_KEY, SINK_IOTDB_DATABASE_KEY); + if (Objects.nonNull(targetDatabase)) { + validateTargetDatabase(targetDatabase); + } } @Override public void customize( final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) throws Exception { - // Do nothing + final String targetDatabase = + parameters.getStringByKeys(CONNECTOR_IOTDB_DATABASE_KEY, SINK_IOTDB_DATABASE_KEY); + if (Objects.nonNull(targetDatabase)) { + targetTreeModelDatabaseName = validateTargetDatabase(targetDatabase); + } + } + + private static String validateTargetDatabase(final String targetDatabase) { + final String trimmedTargetDatabase = targetDatabase.trim(); + if (trimmedTargetDatabase.startsWith(IoTDBConstant.PATH_ROOT + ".")) { + return validateAndNormalizeTreeModelDatabaseName(trimmedTargetDatabase); + } + + try { + PathUtils.checkAndReturnSingleMeasurement(trimmedTargetDatabase); + return validateAndNormalizeTreeModelDatabaseName( + IoTDBConstant.PATH_ROOT + + IoTDBConstant.PATH_SEPARATOR + + trimmedTargetDatabase.toLowerCase(Locale.ENGLISH)); + } catch (final Exception e) { + throw new PipeException( + String.format("The target database %s is invalid.", targetDatabase), e); + } + } + + private static String validateAndNormalizeTreeModelDatabaseName(final String databaseName) { + try { + final PartialPath databasePath = new PartialPath(databaseName); + final String[] nodes = databasePath.getNodes(); + if (nodes.length <= 1 || !IoTDBConstant.PATH_ROOT.equals(nodes[0])) { + throw new IllegalPathException( + databaseName, "the database name in tree model must start with 'root.'."); + } + + final String normalizedDatabaseName = databasePath.getFullPath(); + MetaFormatUtils.checkDatabase(normalizedDatabaseName); + + if (normalizedDatabaseName.length() > MAX_DATABASE_NAME_LENGTH) { + throw new IllegalPathException( + normalizedDatabaseName, + "the length of database name shall not exceed " + MAX_DATABASE_NAME_LENGTH); + } + return normalizedDatabaseName; + } catch (final Exception e) { + throw new PipeException( + String.format( + "The tree model database %s is invalid. The database name should match %s " + + "and the length should not exceed %s.", + databaseName, IoTDBConfig.STORAGE_GROUP_PATTERN, MAX_DATABASE_NAME_LENGTH), + e); + } } @Override @@ -129,6 +210,7 @@ private void doTransfer( final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode(); final InsertBaseStatement statement = PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(insertNode).constructStatement(); + rewriteTreeModelDatabaseNameIfNecessary(statement, null); status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : executeStatement(statement); if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { @@ -160,6 +242,8 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion pipeRawTabletInsertionEvent.convertToTablet(), pipeRawTabletInsertionEvent.isAligned()) .constructStatement(); + rewriteTreeModelDatabaseNameIfNecessary( + statement, pipeRawTabletInsertionEvent.getTreeModelDatabaseName()); final TSStatus status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : executeStatement(statement); @@ -181,6 +265,114 @@ private static void throwWriteBackExceptionIfNecessary( throw new PipeException(exceptionMessage); } + private InsertBaseStatement rewriteTreeModelDatabaseNameIfNecessary( + final InsertBaseStatement statement, final String sourceTreeModelDatabaseName) { + if (Objects.isNull(targetTreeModelDatabaseName)) { + return statement; + } + + rewriteTreeModelDatabaseName(statement, sourceTreeModelDatabaseName); + return statement; + } + + private void rewriteTreeModelDatabaseName( + final InsertBaseStatement statement, final String sourceTreeModelDatabaseName) { + if (statement instanceof InsertRowsStatement) { + ((InsertRowsStatement) statement) + .getInsertRowStatementList() + .forEach( + rowStatement -> + rewriteTreeModelDatabaseName(rowStatement, sourceTreeModelDatabaseName)); + return; + } + + if (statement instanceof InsertRowsOfOneDeviceStatement) { + final InsertRowsOfOneDeviceStatement insertRowsOfOneDeviceStatement = + (InsertRowsOfOneDeviceStatement) statement; + insertRowsOfOneDeviceStatement + .getInsertRowStatementList() + .forEach( + rowStatement -> + rewriteTreeModelDatabaseName(rowStatement, sourceTreeModelDatabaseName)); + insertRowsOfOneDeviceStatement.setDevicePath( + rewriteTreeModelDevicePath( + insertRowsOfOneDeviceStatement.getDevicePath(), sourceTreeModelDatabaseName)); + return; + } + + if (statement instanceof InsertMultiTabletsStatement) { + ((InsertMultiTabletsStatement) statement) + .getInsertTabletStatementList() + .forEach( + tabletStatement -> + rewriteTreeModelDatabaseName(tabletStatement, sourceTreeModelDatabaseName)); + return; + } + + statement.setDevicePath( + rewriteTreeModelDevicePath(statement.getDevicePath(), sourceTreeModelDatabaseName)); + } + + private PartialPath rewriteTreeModelDevicePath( + final PartialPath devicePath, final String sourceTreeModelDatabaseName) { + if (Objects.isNull(devicePath)) { + return devicePath; + } + + final String normalizedSourceTreeModelDatabaseName = + Objects.nonNull(sourceTreeModelDatabaseName) + ? validateAndNormalizeTreeModelDatabaseName(sourceTreeModelDatabaseName) + : inferTreeModelDatabaseName(devicePath); + if (Objects.isNull(normalizedSourceTreeModelDatabaseName)) { + return devicePath; + } + + try { + final String[] sourceDatabaseNodes = + new PartialPath(normalizedSourceTreeModelDatabaseName).getNodes(); + final String[] targetDatabaseNodes = new PartialPath(targetTreeModelDatabaseName).getNodes(); + final String[] deviceNodes = devicePath.getNodes(); + if (!startsWith(deviceNodes, sourceDatabaseNodes)) { + return devicePath; + } + + final ArrayList rebasedNodes = + new ArrayList<>( + targetDatabaseNodes.length + deviceNodes.length - sourceDatabaseNodes.length); + rebasedNodes.addAll(Arrays.asList(targetDatabaseNodes)); + rebasedNodes.addAll( + Arrays.asList(deviceNodes).subList(sourceDatabaseNodes.length, deviceNodes.length)); + return new PartialPath(rebasedNodes.toArray(new String[0])); + } catch (final Exception e) { + throw new PipeException( + String.format( + "Failed to rewrite tree model database from %s to %s for device path %s.", + normalizedSourceTreeModelDatabaseName, targetTreeModelDatabaseName, devicePath), + e); + } + } + + private String inferTreeModelDatabaseName(final PartialPath devicePath) { + final String[] deviceNodes = devicePath.getNodes(); + if (deviceNodes.length < 2 || !IoTDBConstant.PATH_ROOT.equals(deviceNodes[0])) { + return null; + } + + return IoTDBConstant.PATH_ROOT + IoTDBConstant.PATH_SEPARATOR + deviceNodes[1]; + } + + private static boolean startsWith(final String[] nodes, final String[] prefixNodes) { + if (nodes.length < prefixNodes.length) { + return false; + } + for (int i = 0; i < prefixNodes.length; ++i) { + if (!Objects.equals(nodes[i], prefixNodes[i])) { + return false; + } + } + return true; + } + private TSStatus executeStatement(final InsertBaseStatement statement) { return Coordinator.getInstance() .executeForTreeModel(