diff --git a/integration-test/pom.xml b/integration-test/pom.xml
index 150419970a0c8..f566c8e5995b7 100644
--- a/integration-test/pom.xml
+++ b/integration-test/pom.xml
@@ -183,7 +183,6 @@
org.bouncycastle
bcprov-jdk18on
- test
junit
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
index 3bb3ecce21e55..3ade13c7209de 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java
@@ -31,7 +31,7 @@ abstract class AbstractPipeSingleIT {
@Before
public void setUp() {
- MultiEnvFactory.createEnv(2);
+ MultiEnvFactory.createEnv(1);
env = MultiEnvFactory.getEnv(0);
env.getConfig()
.getCommonConfig()
diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index ee9e5400aad4c..a190f7c5182b1 100644
--- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -20,68 +20,151 @@
package org.apache.iotdb.pipe.it.single;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT1;
+import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
+import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT1.class})
public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
@Test
- public void testOPCUASink() throws Exception {
+ public void testOPCUAServerSink() throws Exception {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) {
TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null);
- final Map connectorAttributes = new HashMap<>();
- connectorAttributes.put("sink", "opc-ua-sink");
- connectorAttributes.put("opcua.model", "client-server");
+ final Map sinkAttributes = new HashMap<>();
+
+ sinkAttributes.put("sink", "opc-ua-sink");
+ sinkAttributes.put("opcua.model", "client-server");
+ sinkAttributes.put("security-policy", "None");
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
- .setExtractorAttributes(Collections.emptyMap())
+ new TCreatePipeReq("testPipe", sinkAttributes)
+ .setExtractorAttributes(Collections.singletonMap("user", "root"))
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
+
+ final OpcUaClient opcUaClient =
+ getOpcUaClient("opc.tcp://127.0.0.1:12686/iotdb", SecurityPolicy.None, "root", "root");
+ DataValue value =
+ opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get();
+ Assert.assertEquals(new Variant(1.0), value.getValue());
+ Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
+
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
+ client
+ .alterPipe(
+ new TAlterPipeReq()
+ .setPipeName("testPipe")
+ .setIsReplaceAllConnectorAttributes(false)
+ .setConnectorAttributes(Collections.singletonMap("with-quality", "true"))
+ .setProcessorAttributes(Collections.emptyMap())
+ .setExtractorAttributes(Collections.emptyMap()))
+ .getCode());
+
+ TestUtils.executeNonQuery(
+ env,
+ "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)",
+ null);
+ value = opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
+ Assert.assertEquals(new Variant(1.0), value.getValue());
+ Assert.assertEquals(StatusCode.BAD, value.getStatusCode());
+ Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime());
+
+ TestUtils.executeNonQuery(
+ env, "insert into root.db.opc(time, quality) values (2, true)", null);
+ TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null);
+
+ final long startTime = System.currentTimeMillis();
+ while (true) {
+ try {
+ value =
+ opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get();
+ Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime());
+ Assert.assertEquals(new Variant(2.0), value.getValue());
+ Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode());
+ break;
+ } catch (final Throwable t) {
+ if (System.currentTimeMillis() - startTime > 10_000L) {
+ throw t;
+ }
+ }
+ }
+
+ opcUaClient.disconnect().get();
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());
// Test reconstruction
- connectorAttributes.put("password123456", "test");
+ sinkAttributes.put("password", "test");
+ sinkAttributes.put("security-policy", "basic256sha256");
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
+ new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
+ // Banned none, only allows basic256sha256
+ Assert.assertThrows(
+ PipeException.class,
+ () ->
+ getOpcUaClient(
+ "opc.tcp://127.0.0.1:12686/iotdb", SecurityPolicy.None, "root", "root"));
+
// Test conflict
- connectorAttributes.put("password123456", "conflict");
- Assert.assertEquals(
- TSStatusCode.PIPE_ERROR.getStatusCode(),
- client
- .createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
- .setExtractorAttributes(Collections.emptyMap())
- .setProcessorAttributes(Collections.emptyMap()))
- .getCode());
+ sinkAttributes.put("password", "conflict");
+ try {
+ TestUtils.executeNonQuery(
+ env, "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict')", null);
+ Assert.fail();
+ } catch (final Exception e) {
+ Assert.assertEquals(
+ "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port 12686 and https port 8443's password test conflicts to the new password conflict, reject reusing.",
+ e.getMessage());
+ }
}
}
@@ -93,42 +176,68 @@ public void testOPCUASinkInTableModel() throws Exception {
TableModelUtils.createDataBaseAndTable(env, "test", "test");
TableModelUtils.insertData("test", "test", 0, 10, env);
- final Map connectorAttributes = new HashMap<>();
- connectorAttributes.put("sink", "opc-ua-sink");
- connectorAttributes.put("opcua.model", "client-server");
+ final Map sourceAttributes = new HashMap<>();
+ final Map sinkAttributes = new HashMap<>();
+ sourceAttributes.put("capture.table", "true");
+ sourceAttributes.put("user", "root");
+
+ sinkAttributes.put("sink", "opc-ua-sink");
+ sinkAttributes.put("opcua.model", "client-server");
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
- .setExtractorAttributes(Collections.singletonMap("capture.table", "true"))
+ new TCreatePipeReq("testPipe", sinkAttributes)
+ .setExtractorAttributes(sourceAttributes)
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode());
// Test reconstruction
- connectorAttributes.put("password123456", "test");
+ sinkAttributes.put("password123456", "test");
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client
.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
+ new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
// Test conflict
- connectorAttributes.put("password123456", "conflict");
+ sinkAttributes.put("password123456", "conflict");
Assert.assertEquals(
TSStatusCode.PIPE_ERROR.getStatusCode(),
client
.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
+ new TCreatePipeReq("testPipe", sinkAttributes)
.setExtractorAttributes(Collections.emptyMap())
.setProcessorAttributes(Collections.emptyMap()))
.getCode());
}
}
+
+ private static OpcUaClient getOpcUaClient(
+ final String nodeUrl,
+ final SecurityPolicy policy,
+ final String userName,
+ final String password) {
+ final IoTDBOpcUaClient client;
+
+ final IdentityProvider provider =
+ Objects.nonNull(userName)
+ ? new UsernameProvider(userName, password)
+ : new AnonymousProvider();
+
+ final String securityDir =
+ CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
+ + File.separatorChar
+ + UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET));
+
+ client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false);
+ new ClientRunner(client, securityDir, password).run();
+ return client.getClient();
+ }
}
diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml
index 284eb6796727e..85c96595381e9 100644
--- a/iotdb-core/datanode/pom.xml
+++ b/iotdb-core/datanode/pom.xml
@@ -191,6 +191,18 @@
org.eclipse.milo
stack-server
+
+ org.eclipse.milo
+ stack-client
+
+
+ org.eclipse.milo
+ sdk-client
+
+
+ org.bouncycastle
+ bcprov-jdk18on
+
org.eclipse.jetty
jetty-http
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 7404a8b03bcbf..071db688dd8b6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -24,6 +24,10 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -36,9 +40,15 @@
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
+import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider;
import org.eclipse.milo.opcua.sdk.server.OpcUaServer;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,28 +58,46 @@
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
-import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_NODE_URL_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
@@ -79,12 +107,16 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_NODE_URL_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_PLACEHOLDER_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
+import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
@@ -107,12 +139,18 @@ public class OpcUaSink implements PipeConnector {
SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new ConcurrentHashMap<>();
private String serverKey;
- boolean isClientServerModel;
- String unQualifiedDatabaseName;
- String placeHolder;
- @Nullable String valueName;
- @Nullable String qualityName;
- private OpcUaNameSpace nameSpace;
+ private boolean isClientServerModel;
+ private String databaseName;
+ private String placeHolder4NullTag;
+ private @Nullable String valueName;
+ private @Nullable String qualityName;
+ private StatusCode defaultQuality;
+
+ // Inner server
+ private @Nullable OpcUaNameSpace nameSpace;
+
+ // Outer server
+ private @Nullable IoTDBOpcUaClient client;
@Override
public void validate(final PipeParameterValidator validator) throws Exception {
@@ -131,12 +169,89 @@ public void validate(final PipeParameterValidator validator) throws Exception {
Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY),
Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY),
false);
+
+ final PipeParameters parameters = validator.getParameters();
+ if (validator
+ .getParameters()
+ .hasAnyAttributes(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY)
+ || parameters.getBooleanOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY),
+ CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE)) {
+ validator.validate(
+ CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE::equals,
+ String.format(
+ "When the OPC UA sink points to an outer server or sets 'with-quality' to true, the %s or %s must be %s.",
+ CONNECTOR_OPC_UA_MODEL_KEY,
+ SINK_OPC_UA_MODEL_KEY,
+ CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
+ CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE));
+ }
}
@Override
public void customize(
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
throws Exception {
+ final boolean withQuality =
+ parameters.getBooleanOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY),
+ CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE);
+ valueName =
+ withQuality
+ ? parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, SINK_OPC_UA_VALUE_NAME_KEY),
+ CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE)
+ : null;
+ qualityName =
+ withQuality
+ ? parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, SINK_OPC_UA_QUALITY_NAME_KEY),
+ CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE)
+ : null;
+ defaultQuality =
+ getQuality(
+ withQuality
+ ? parameters.getStringOrDefault(
+ Arrays.asList(
+ CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY, SINK_OPC_UA_DEFAULT_QUALITY_KEY),
+ CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE)
+ : CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE);
+ isClientServerModel =
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
+ CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
+ .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
+ placeHolder4NullTag =
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY, SINK_OPC_UA_PLACEHOLDER_KEY),
+ CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE);
+ final DataRegion region =
+ StorageEngine.getInstance()
+ .getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
+ databaseName = Objects.nonNull(region) ? region.getDatabaseName() : "root.__temp_db";
+
+ if (withQuality && PathUtils.isTableModelDatabase(databaseName)) {
+ throw new PipeException(
+ "When the OPC UA sink sets 'with-quality' to true, the table model data is not supported.");
+ }
+
+ final String nodeUrl =
+ parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY);
+ if (Objects.isNull(nodeUrl)) {
+ customizeServer(parameters);
+ } else {
+ if (PathUtils.isTableModelDatabase(databaseName)) {
+ throw new PipeException(
+ "When the OPC UA sink points to an outer server, the table model data is not supported.");
+ }
+ customizeClient(nodeUrl, parameters);
+ }
+ }
+
+ private void customizeServer(final PipeParameters parameters) {
final int tcpBindPort =
parameters.getIntOrDefault(
Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, SINK_OPC_UA_TCP_BIND_PORT_KEY),
@@ -173,40 +288,21 @@ public void customize(
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY,
SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY),
CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE);
- placeHolder =
- parameters.getStringOrDefault(
- Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY, SINK_OPC_UA_PLACEHOLDER_KEY),
- CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE);
- final boolean withQuality =
- parameters.getBooleanOrDefault(
- Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY),
- CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE);
- valueName =
- withQuality
- ? parameters.getStringOrDefault(
- Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, SINK_OPC_UA_VALUE_NAME_KEY),
- CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE)
- : null;
- qualityName =
- withQuality
- ? parameters.getStringOrDefault(
- Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, SINK_OPC_UA_QUALITY_NAME_KEY),
- CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE)
- : null;
- isClientServerModel =
- parameters
- .getStringOrDefault(
- Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY),
- CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
- .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
-
- final DataRegion region =
- StorageEngine.getInstance()
- .getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
- unQualifiedDatabaseName =
- Objects.nonNull(region)
- ? PathUtils.unQualifyDatabaseName(region.getDatabaseName())
- : "__temp_db";
+ final Set securityPolicies =
+ (parameters.hasAnyAttributes(
+ CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY)
+ ? Arrays.stream(
+ parameters
+ .getStringByKeys(
+ CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY)
+ .replace(" ", "")
+ .split(","))
+ : CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES.stream())
+ .map(this::getSecurityPolicy)
+ .collect(Collectors.toSet());
+ if (securityPolicies.isEmpty()) {
+ throw new PipeException("The security policy cannot be empty.");
+ }
synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) {
serverKey = httpsBindPort + ":" + tcpBindPort;
@@ -225,7 +321,8 @@ public void customize(
.setUser(user)
.setPassword(password)
.setSecurityDir(securityDir)
- .setEnableAnonymousAccess(enableAnonymousAccess);
+ .setEnableAnonymousAccess(enableAnonymousAccess)
+ .setSecurityPolicies(securityPolicies);
final OpcUaServer newServer = builder.build();
nameSpace = new OpcUaNameSpace(newServer, builder);
nameSpace.startup();
@@ -234,7 +331,12 @@ public void customize(
} else {
oldValue
.getRight()
- .checkEquals(user, password, securityDir, enableAnonymousAccess);
+ .checkEquals(
+ user,
+ password,
+ securityDir,
+ enableAnonymousAccess,
+ securityPolicies);
return oldValue;
}
} catch (final PipeException e) {
@@ -248,6 +350,80 @@ public void customize(
}
}
+ private void customizeClient(final String nodeUrl, final PipeParameters parameters) {
+ final SecurityPolicy policy =
+ getSecurityPolicy(
+ parameters
+ .getStringOrDefault(
+ Arrays.asList(
+ CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY),
+ CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE)
+ .toUpperCase());
+
+ final IdentityProvider provider;
+ final String userName =
+ parameters.getStringByKeys(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY);
+ final String password =
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY),
+ CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE);
+ provider =
+ Objects.nonNull(userName)
+ ? new UsernameProvider(userName, password)
+ : new AnonymousProvider();
+
+ final String securityDir =
+ IoTDBConfig.addDataHomeDir(
+ parameters.getStringOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, SINK_OPC_UA_SECURITY_DIR_KEY),
+ CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE
+ + File.separatorChar
+ + UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET))));
+
+ client =
+ new IoTDBOpcUaClient(
+ nodeUrl,
+ policy,
+ provider,
+ parameters.getBooleanOrDefault(
+ Arrays.asList(CONNECTOR_OPC_UA_HISTORIZING_KEY, SINK_OPC_UA_HISTORIZING_KEY),
+ CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE));
+ new ClientRunner(client, securityDir, password).run();
+ }
+
+ private SecurityPolicy getSecurityPolicy(final String securityPolicy) {
+ switch (securityPolicy.toUpperCase()) {
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE:
+ return SecurityPolicy.None;
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE:
+ return SecurityPolicy.Basic128Rsa15;
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE:
+ return SecurityPolicy.Basic256;
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE:
+ return SecurityPolicy.Basic256Sha256;
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE:
+ return SecurityPolicy.Aes128_Sha256_RsaOaep;
+ case CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE:
+ return SecurityPolicy.Aes256_Sha256_RsaPss;
+ default:
+ throw new PipeException(
+ "The security policy can only be 'None', 'Basic128Rsa15', 'Basic256', 'Basic256Sha256', 'Aes128_Sha256_RsaOaep' or 'Aes256_Sha256_RsaPss'.");
+ }
+ }
+
+ private StatusCode getQuality(final String quality) {
+ switch (quality.toUpperCase()) {
+ case CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE:
+ return StatusCode.GOOD;
+ case CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE:
+ return StatusCode.BAD;
+ case CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE:
+ return StatusCode.UNCERTAIN;
+ default:
+ throw new PipeException("The default quality can only be 'GOOD', 'BAD' or 'UNCERTAIN'.");
+ }
+ }
+
@Override
public void handshake() throws Exception {
// Server side, do nothing
@@ -268,7 +444,16 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
transferByTablet(
tabletInsertionEvent,
LOGGER,
- (tablet, isTableModel) -> nameSpace.transfer(tablet, isTableModel, this));
+ (tablet, isTableModel) -> {
+ if (Objects.nonNull(nameSpace)) {
+ nameSpace.transfer(tablet, isTableModel, this);
+ } else if (Objects.nonNull(client)) {
+ client.transfer(tablet, this);
+ } else {
+ throw new PipeException(
+ "No OPC client or server is specified when transferring tablet");
+ }
+ });
}
public static void transferByTablet(
@@ -336,6 +521,10 @@ public interface ThrowingBiConsumer {
@Override
public void close() throws Exception {
+ if (Objects.nonNull(client)) {
+ client.disconnect();
+ }
+
if (serverKey == null) {
return;
}
@@ -356,4 +545,32 @@ public void close() throws Exception {
}
}
}
+
+ /////////////////////////////// Getter ///////////////////////////////
+
+ public boolean isClientServerModel() {
+ return isClientServerModel;
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public String getPlaceHolder4NullTag() {
+ return placeHolder4NullTag;
+ }
+
+ @Nullable
+ public String getValueName() {
+ return valueName;
+ }
+
+ @Nullable
+ public String getQualityName() {
+ return qualityName;
+ }
+
+ public StatusCode getDefaultQuality() {
+ return defaultQuality;
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
new file mode 100644
index 0000000000000..725ecbbae98a6
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator;
+import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.Security;
+
+import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
+
+public class ClientRunner {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClientRunner.class);
+
+ static {
+ // Required for SecurityPolicy.Aes256_Sha256_RsaPss
+ Security.addProvider(new BouncyCastleProvider());
+ }
+
+ private final IoTDBOpcUaClient configurableUaClient;
+ private final Path securityDir;
+ private final String password;
+
+ public ClientRunner(
+ final IoTDBOpcUaClient configurableUaClient,
+ final String securityDir,
+ final String password) {
+ this.configurableUaClient = configurableUaClient;
+ this.securityDir = Paths.get(securityDir);
+ this.password = password;
+ }
+
+ private OpcUaClient createClient() throws Exception {
+ Files.createDirectories(securityDir);
+ if (!Files.exists(securityDir)) {
+ throw new Exception("unable to create security dir: " + securityDir);
+ }
+
+ final File pkiDir = securityDir.resolve("pki").toFile();
+
+ logger.info("security dir: {}", securityDir.toAbsolutePath());
+ logger.info("security pki dir: {}", pkiDir.getAbsolutePath());
+
+ final IoTDBKeyStoreLoaderClient loader =
+ new IoTDBKeyStoreLoaderClient().load(securityDir, password.toCharArray());
+
+ final DefaultTrustListManager trustListManager = new DefaultTrustListManager(pkiDir);
+
+ final DefaultClientCertificateValidator certificateValidator =
+ new DefaultClientCertificateValidator(trustListManager);
+
+ return OpcUaClient.create(
+ configurableUaClient.getNodeUrl(),
+ endpoints -> endpoints.stream().filter(configurableUaClient.endpointFilter()).findFirst(),
+ configBuilder ->
+ configBuilder
+ .setApplicationName(LocalizedText.english("Apache IoTDB OPC UA client"))
+ .setApplicationUri("urn:apache:iotdb:opc-ua-client")
+ .setKeyPair(loader.getClientKeyPair())
+ .setCertificate(loader.getClientCertificate())
+ .setCertificateChain(loader.getClientCertificateChain())
+ .setCertificateValidator(certificateValidator)
+ .setIdentityProvider(configurableUaClient.getIdentityProvider())
+ .setRequestTimeout(uint(5000))
+ .build());
+ }
+
+ public void run() {
+ try {
+ final OpcUaClient client = createClient();
+
+ try {
+ configurableUaClient.run(client);
+ } catch (final Exception e) {
+ throw new PipeException(
+ "Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ } catch (final Exception e) {
+ throw new PipeException(
+ "Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage());
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
new file mode 100644
index 0000000000000..bfaf378822c31
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
+import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.Key;
+import java.security.KeyPair;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.PublicKey;
+import java.security.cert.X509Certificate;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+class IoTDBKeyStoreLoaderClient {
+
+ private static final Logger logger = LoggerFactory.getLogger(ClientRunner.class);
+
+ private static final Pattern IP_ADDR_PATTERN =
+ Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
+
+ private static final String CLIENT_ALIAS = "client-ai";
+
+ private X509Certificate[] clientCertificateChain;
+ private X509Certificate clientCertificate;
+ private KeyPair clientKeyPair;
+
+ IoTDBKeyStoreLoaderClient load(final Path baseDir, final char[] password) throws Exception {
+ final KeyStore keyStore = KeyStore.getInstance("PKCS12");
+
+ final Path serverKeyStore = baseDir.resolve("iotdb-client.pfx");
+
+ logger.info("Loading KeyStore at {}.", serverKeyStore);
+
+ if (!Files.exists(serverKeyStore)) {
+ keyStore.load(null, password);
+
+ final KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
+
+ final SelfSignedCertificateBuilder builder =
+ new SelfSignedCertificateBuilder(keyPair)
+ .setCommonName("Apache IoTDB OPC UA client")
+ .setOrganization("Apache")
+ .setOrganizationalUnit("dev")
+ .setLocalityName("Beijing")
+ .setStateName("China")
+ .setCountryCode("CN")
+ .setApplicationUri("urn:apache:iotdb:opc-ua-client")
+ .addDnsName("localhost")
+ .addIpAddress("127.0.0.1");
+
+ // Get as many hostnames and IP addresses as we can listed in the certificate.
+ for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) {
+ if (IP_ADDR_PATTERN.matcher(hostname).matches()) {
+ builder.addIpAddress(hostname);
+ } else {
+ builder.addDnsName(hostname);
+ }
+ }
+
+ final X509Certificate certificate = builder.build();
+
+ keyStore.setKeyEntry(
+ CLIENT_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate});
+ try (OutputStream out = Files.newOutputStream(serverKeyStore)) {
+ keyStore.store(out, password);
+ }
+ } else {
+ try (InputStream in = Files.newInputStream(serverKeyStore)) {
+ keyStore.load(in, password);
+ }
+ }
+
+ final Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, password);
+ if (clientPrivateKey instanceof PrivateKey) {
+ clientCertificate = (X509Certificate) keyStore.getCertificate(CLIENT_ALIAS);
+
+ clientCertificateChain =
+ Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
+ .map(X509Certificate.class::cast)
+ .toArray(X509Certificate[]::new);
+
+ final PublicKey serverPublicKey = clientCertificate.getPublicKey();
+ clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) clientPrivateKey);
+ }
+
+ return this;
+ }
+
+ X509Certificate getClientCertificate() {
+ return clientCertificate;
+ }
+
+ public X509Certificate[] getClientCertificateChain() {
+ return clientCertificateChain;
+ }
+
+ KeyPair getClientKeyPair() {
+ return clientKeyPair;
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
new file mode 100644
index 0000000000000..bf96d9881807e
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.opcua.client;
+
+import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
+import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
+import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
+import org.eclipse.milo.opcua.sdk.core.AccessLevel;
+import org.eclipse.milo.opcua.sdk.core.ValueRanks;
+import org.eclipse.milo.opcua.stack.core.Identifiers;
+import org.eclipse.milo.opcua.stack.core.StatusCodes;
+import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
+import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject;
+import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
+import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
+import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName;
+import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
+import org.eclipse.milo.opcua.stack.core.types.builtin.Variant;
+import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned;
+import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse;
+import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult;
+import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
+import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes;
+import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType;
+import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc;
+
+public class IoTDBOpcUaClient {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class);
+
+ // Customized nodes
+ private static final int NAME_SPACE_INDEX = 2;
+
+ // Useless for a server only accept client writing
+ private static final double SAMPLING_INTERVAL_PLACEHOLDER = 500;
+ private final String nodeUrl;
+
+ private final SecurityPolicy securityPolicy;
+ private final IdentityProvider identityProvider;
+ private OpcUaClient client;
+ private final boolean historizing;
+
+ public IoTDBOpcUaClient(
+ final String nodeUrl,
+ final SecurityPolicy securityPolicy,
+ final IdentityProvider identityProvider,
+ final boolean historizing) {
+ this.nodeUrl = nodeUrl;
+ this.securityPolicy = securityPolicy;
+ this.identityProvider = identityProvider;
+ this.historizing = historizing;
+ }
+
+ public void run(final OpcUaClient client) throws Exception {
+ // synchronous connect
+ this.client = client;
+ client.connect().get();
+ }
+
+ // Only support tree model & client-server
+ public void transfer(final Tablet tablet, final OpcUaSink sink) throws Exception {
+ OpcUaNameSpace.transferTabletForClientServerModel(
+ tablet, false, sink, this::transferTabletRowForClientServerModel);
+ }
+
+ private void transferTabletRowForClientServerModel(
+ final String[] segments,
+ final List measurementSchemas,
+ final List timestamps,
+ final List
+
+ org.eclipse.milo
+ stack-client
+ ${milo.version}
+
+
+ org.eclipse.milo
+ sdk-client
+ ${milo.version}
+
io.airlift