diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 150419970a0c8..f566c8e5995b7 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -183,7 +183,6 @@ org.bouncycastle bcprov-jdk18on - test junit diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java index 3bb3ecce21e55..3ade13c7209de 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/AbstractPipeSingleIT.java @@ -31,7 +31,7 @@ abstract class AbstractPipeSingleIT { @Before public void setUp() { - MultiEnvFactory.createEnv(2); + MultiEnvFactory.createEnv(1); env = MultiEnvFactory.getEnv(0); env.getConfig() .getCommonConfig() diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java index ee9e5400aad4c..a190f7c5182b1 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java @@ -20,68 +20,151 @@ package org.apache.iotdb.pipe.it.single; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT1; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.tsfile.common.conf.TSFileConfig; +import org.eclipse.milo.opcua.sdk.client.OpcUaClient; +import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider; +import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider; +import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; +import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; +import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; +import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; +import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.io.File; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE; +import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc; @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT1.class}) public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT { @Test - public void testOPCUASink() throws Exception { + public void testOPCUAServerSink() throws Exception { try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) env.getLeaderConfigNodeConnection()) { TestUtils.executeNonQuery(env, "insert into root.db.d1(time, s1) values (1, 1)", null); - final Map connectorAttributes = new HashMap<>(); - connectorAttributes.put("sink", "opc-ua-sink"); - connectorAttributes.put("opcua.model", "client-server"); + final Map sinkAttributes = new HashMap<>(); + + sinkAttributes.put("sink", "opc-ua-sink"); + sinkAttributes.put("opcua.model", "client-server"); + sinkAttributes.put("security-policy", "None"); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client .createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) - .setExtractorAttributes(Collections.emptyMap()) + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(Collections.singletonMap("user", "root")) .setProcessorAttributes(Collections.emptyMap())) .getCode()); + + final OpcUaClient opcUaClient = + getOpcUaClient("opc.tcp://127.0.0.1:12686/iotdb", SecurityPolicy.None, "root", "root"); + DataValue value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/d1/s1")).get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client + .alterPipe( + new TAlterPipeReq() + .setPipeName("testPipe") + .setIsReplaceAllConnectorAttributes(false) + .setConnectorAttributes(Collections.singletonMap("with-quality", "true")) + .setProcessorAttributes(Collections.emptyMap()) + .setExtractorAttributes(Collections.emptyMap())) + .getCode()); + + TestUtils.executeNonQuery( + env, + "insert into root.db.opc(time, value, quality, other) values (1, 1, false, 1)", + null); + value = opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); + Assert.assertEquals(new Variant(1.0), value.getValue()); + Assert.assertEquals(StatusCode.BAD, value.getStatusCode()); + Assert.assertEquals(new DateTime(timestampToUtc(1)), value.getSourceTime()); + + TestUtils.executeNonQuery( + env, "insert into root.db.opc(time, quality) values (2, true)", null); + TestUtils.executeNonQuery(env, "insert into root.db.opc(time, value) values (2, 2)", null); + + final long startTime = System.currentTimeMillis(); + while (true) { + try { + value = + opcUaClient.readValue(0, TimestampsToReturn.Both, new NodeId(2, "root/db/opc")).get(); + Assert.assertEquals(new DateTime(timestampToUtc(2)), value.getSourceTime()); + Assert.assertEquals(new Variant(2.0), value.getValue()); + Assert.assertEquals(StatusCode.UNCERTAIN, value.getStatusCode()); + break; + } catch (final Throwable t) { + if (System.currentTimeMillis() - startTime > 10_000L) { + throw t; + } + } + } + + opcUaClient.disconnect().get(); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode()); // Test reconstruction - connectorAttributes.put("password123456", "test"); + sinkAttributes.put("password", "test"); + sinkAttributes.put("security-policy", "basic256sha256"); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client .createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) + new TCreatePipeReq("testPipe", sinkAttributes) .setExtractorAttributes(Collections.emptyMap()) .setProcessorAttributes(Collections.emptyMap())) .getCode()); + // Banned none, only allows basic256sha256 + Assert.assertThrows( + PipeException.class, + () -> + getOpcUaClient( + "opc.tcp://127.0.0.1:12686/iotdb", SecurityPolicy.None, "root", "root")); + // Test conflict - connectorAttributes.put("password123456", "conflict"); - Assert.assertEquals( - TSStatusCode.PIPE_ERROR.getStatusCode(), - client - .createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) - .setExtractorAttributes(Collections.emptyMap()) - .setProcessorAttributes(Collections.emptyMap())) - .getCode()); + sinkAttributes.put("password", "conflict"); + try { + TestUtils.executeNonQuery( + env, "create pipe test1 ('sink'='opc-ua-sink', 'password'='conflict')", null); + Assert.fail(); + } catch (final Exception e) { + Assert.assertEquals( + "org.apache.iotdb.jdbc.IoTDBSQLException: 1107: The existing server with tcp port 12686 and https port 8443's password test conflicts to the new password conflict, reject reusing.", + e.getMessage()); + } } } @@ -93,42 +176,68 @@ public void testOPCUASinkInTableModel() throws Exception { TableModelUtils.createDataBaseAndTable(env, "test", "test"); TableModelUtils.insertData("test", "test", 0, 10, env); - final Map connectorAttributes = new HashMap<>(); - connectorAttributes.put("sink", "opc-ua-sink"); - connectorAttributes.put("opcua.model", "client-server"); + final Map sourceAttributes = new HashMap<>(); + final Map sinkAttributes = new HashMap<>(); + sourceAttributes.put("capture.table", "true"); + sourceAttributes.put("user", "root"); + + sinkAttributes.put("sink", "opc-ua-sink"); + sinkAttributes.put("opcua.model", "client-server"); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client .createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) - .setExtractorAttributes(Collections.singletonMap("capture.table", "true")) + new TCreatePipeReq("testPipe", sinkAttributes) + .setExtractorAttributes(sourceAttributes) .setProcessorAttributes(Collections.emptyMap())) .getCode()); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.dropPipe("testPipe").getCode()); // Test reconstruction - connectorAttributes.put("password123456", "test"); + sinkAttributes.put("password123456", "test"); Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client .createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) + new TCreatePipeReq("testPipe", sinkAttributes) .setExtractorAttributes(Collections.emptyMap()) .setProcessorAttributes(Collections.emptyMap())) .getCode()); // Test conflict - connectorAttributes.put("password123456", "conflict"); + sinkAttributes.put("password123456", "conflict"); Assert.assertEquals( TSStatusCode.PIPE_ERROR.getStatusCode(), client .createPipe( - new TCreatePipeReq("testPipe", connectorAttributes) + new TCreatePipeReq("testPipe", sinkAttributes) .setExtractorAttributes(Collections.emptyMap()) .setProcessorAttributes(Collections.emptyMap())) .getCode()); } } + + private static OpcUaClient getOpcUaClient( + final String nodeUrl, + final SecurityPolicy policy, + final String userName, + final String password) { + final IoTDBOpcUaClient client; + + final IdentityProvider provider = + Objects.nonNull(userName) + ? new UsernameProvider(userName, password) + : new AnonymousProvider(); + + final String securityDir = + CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE + + File.separatorChar + + UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET)); + + client = new IoTDBOpcUaClient(nodeUrl, policy, provider, false); + new ClientRunner(client, securityDir, password).run(); + return client.getClient(); + } } diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 284eb6796727e..85c96595381e9 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -191,6 +191,18 @@ org.eclipse.milo stack-server + + org.eclipse.milo + stack-client + + + org.eclipse.milo + sdk-client + + + org.bouncycastle + bcprov-jdk18on + org.eclipse.jetty jetty-http diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java index 7404a8b03bcbf..071db688dd8b6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java @@ -24,6 +24,10 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder; import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.pipe.api.PipeConnector; @@ -36,9 +40,15 @@ import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; +import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider; +import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider; +import org.eclipse.milo.opcua.sdk.client.api.identity.UsernameProvider; import org.eclipse.milo.opcua.sdk.server.OpcUaServer; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; +import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,28 +58,46 @@ import java.util.Arrays; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_IOTDB_USER_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HISTORIZING_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE; -import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_NODE_URL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE; @@ -79,12 +107,16 @@ import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_DEFAULT_QUALITY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HISTORIZING_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_NODE_URL_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_PLACEHOLDER_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY; +import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY; @@ -107,12 +139,18 @@ public class OpcUaSink implements PipeConnector { SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP = new ConcurrentHashMap<>(); private String serverKey; - boolean isClientServerModel; - String unQualifiedDatabaseName; - String placeHolder; - @Nullable String valueName; - @Nullable String qualityName; - private OpcUaNameSpace nameSpace; + private boolean isClientServerModel; + private String databaseName; + private String placeHolder4NullTag; + private @Nullable String valueName; + private @Nullable String qualityName; + private StatusCode defaultQuality; + + // Inner server + private @Nullable OpcUaNameSpace nameSpace; + + // Outer server + private @Nullable IoTDBOpcUaClient client; @Override public void validate(final PipeParameterValidator validator) throws Exception { @@ -131,12 +169,89 @@ public void validate(final PipeParameterValidator validator) throws Exception { Arrays.asList(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY), Arrays.asList(CONNECTOR_IOTDB_USERNAME_KEY, SINK_IOTDB_USERNAME_KEY), false); + + final PipeParameters parameters = validator.getParameters(); + if (validator + .getParameters() + .hasAnyAttributes(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY) + || parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY), + CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE)) { + validator.validate( + CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE::equals, + String.format( + "When the OPC UA sink points to an outer server or sets 'with-quality' to true, the %s or %s must be %s.", + CONNECTOR_OPC_UA_MODEL_KEY, + SINK_OPC_UA_MODEL_KEY, + CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE), + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY), + CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)); + } } @Override public void customize( final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) throws Exception { + final boolean withQuality = + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY), + CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE); + valueName = + withQuality + ? parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, SINK_OPC_UA_VALUE_NAME_KEY), + CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE) + : null; + qualityName = + withQuality + ? parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, SINK_OPC_UA_QUALITY_NAME_KEY), + CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE) + : null; + defaultQuality = + getQuality( + withQuality + ? parameters.getStringOrDefault( + Arrays.asList( + CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY, SINK_OPC_UA_DEFAULT_QUALITY_KEY), + CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE) + : CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE); + isClientServerModel = + parameters + .getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY), + CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE) + .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE); + placeHolder4NullTag = + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY, SINK_OPC_UA_PLACEHOLDER_KEY), + CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE); + final DataRegion region = + StorageEngine.getInstance() + .getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId())); + databaseName = Objects.nonNull(region) ? region.getDatabaseName() : "root.__temp_db"; + + if (withQuality && PathUtils.isTableModelDatabase(databaseName)) { + throw new PipeException( + "When the OPC UA sink sets 'with-quality' to true, the table model data is not supported."); + } + + final String nodeUrl = + parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY, SINK_OPC_UA_NODE_URL_KEY); + if (Objects.isNull(nodeUrl)) { + customizeServer(parameters); + } else { + if (PathUtils.isTableModelDatabase(databaseName)) { + throw new PipeException( + "When the OPC UA sink points to an outer server, the table model data is not supported."); + } + customizeClient(nodeUrl, parameters); + } + } + + private void customizeServer(final PipeParameters parameters) { final int tcpBindPort = parameters.getIntOrDefault( Arrays.asList(CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY, SINK_OPC_UA_TCP_BIND_PORT_KEY), @@ -173,40 +288,21 @@ public void customize( CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY, SINK_OPC_UA_ENABLE_ANONYMOUS_ACCESS_KEY), CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE); - placeHolder = - parameters.getStringOrDefault( - Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY, SINK_OPC_UA_PLACEHOLDER_KEY), - CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE); - final boolean withQuality = - parameters.getBooleanOrDefault( - Arrays.asList(CONNECTOR_OPC_UA_WITH_QUALITY_KEY, SINK_OPC_UA_WITH_QUALITY_KEY), - CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE); - valueName = - withQuality - ? parameters.getStringOrDefault( - Arrays.asList(CONNECTOR_OPC_UA_VALUE_NAME_KEY, SINK_OPC_UA_VALUE_NAME_KEY), - CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE) - : null; - qualityName = - withQuality - ? parameters.getStringOrDefault( - Arrays.asList(CONNECTOR_OPC_UA_QUALITY_NAME_KEY, SINK_OPC_UA_QUALITY_NAME_KEY), - CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE) - : null; - isClientServerModel = - parameters - .getStringOrDefault( - Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY, SINK_OPC_UA_MODEL_KEY), - CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE) - .equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE); - - final DataRegion region = - StorageEngine.getInstance() - .getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId())); - unQualifiedDatabaseName = - Objects.nonNull(region) - ? PathUtils.unQualifyDatabaseName(region.getDatabaseName()) - : "__temp_db"; + final Set securityPolicies = + (parameters.hasAnyAttributes( + CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY) + ? Arrays.stream( + parameters + .getStringByKeys( + CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY) + .replace(" ", "") + .split(",")) + : CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES.stream()) + .map(this::getSecurityPolicy) + .collect(Collectors.toSet()); + if (securityPolicies.isEmpty()) { + throw new PipeException("The security policy cannot be empty."); + } synchronized (SERVER_KEY_TO_REFERENCE_COUNT_AND_NAME_SPACE_MAP) { serverKey = httpsBindPort + ":" + tcpBindPort; @@ -225,7 +321,8 @@ public void customize( .setUser(user) .setPassword(password) .setSecurityDir(securityDir) - .setEnableAnonymousAccess(enableAnonymousAccess); + .setEnableAnonymousAccess(enableAnonymousAccess) + .setSecurityPolicies(securityPolicies); final OpcUaServer newServer = builder.build(); nameSpace = new OpcUaNameSpace(newServer, builder); nameSpace.startup(); @@ -234,7 +331,12 @@ public void customize( } else { oldValue .getRight() - .checkEquals(user, password, securityDir, enableAnonymousAccess); + .checkEquals( + user, + password, + securityDir, + enableAnonymousAccess, + securityPolicies); return oldValue; } } catch (final PipeException e) { @@ -248,6 +350,80 @@ public void customize( } } + private void customizeClient(final String nodeUrl, final PipeParameters parameters) { + final SecurityPolicy policy = + getSecurityPolicy( + parameters + .getStringOrDefault( + Arrays.asList( + CONNECTOR_OPC_UA_SECURITY_POLICY_KEY, SINK_OPC_UA_SECURITY_POLICY_KEY), + CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE) + .toUpperCase()); + + final IdentityProvider provider; + final String userName = + parameters.getStringByKeys(CONNECTOR_IOTDB_USER_KEY, SINK_IOTDB_USER_KEY); + final String password = + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_IOTDB_PASSWORD_KEY, SINK_IOTDB_PASSWORD_KEY), + CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE); + provider = + Objects.nonNull(userName) + ? new UsernameProvider(userName, password) + : new AnonymousProvider(); + + final String securityDir = + IoTDBConfig.addDataHomeDir( + parameters.getStringOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_SECURITY_DIR_KEY, SINK_OPC_UA_SECURITY_DIR_KEY), + CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE + + File.separatorChar + + UUID.nameUUIDFromBytes(nodeUrl.getBytes(TSFileConfig.STRING_CHARSET)))); + + client = + new IoTDBOpcUaClient( + nodeUrl, + policy, + provider, + parameters.getBooleanOrDefault( + Arrays.asList(CONNECTOR_OPC_UA_HISTORIZING_KEY, SINK_OPC_UA_HISTORIZING_KEY), + CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE)); + new ClientRunner(client, securityDir, password).run(); + } + + private SecurityPolicy getSecurityPolicy(final String securityPolicy) { + switch (securityPolicy.toUpperCase()) { + case CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE: + return SecurityPolicy.None; + case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE: + return SecurityPolicy.Basic128Rsa15; + case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE: + return SecurityPolicy.Basic256; + case CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE: + return SecurityPolicy.Basic256Sha256; + case CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE: + return SecurityPolicy.Aes128_Sha256_RsaOaep; + case CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE: + return SecurityPolicy.Aes256_Sha256_RsaPss; + default: + throw new PipeException( + "The security policy can only be 'None', 'Basic128Rsa15', 'Basic256', 'Basic256Sha256', 'Aes128_Sha256_RsaOaep' or 'Aes256_Sha256_RsaPss'."); + } + } + + private StatusCode getQuality(final String quality) { + switch (quality.toUpperCase()) { + case CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE: + return StatusCode.GOOD; + case CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE: + return StatusCode.BAD; + case CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE: + return StatusCode.UNCERTAIN; + default: + throw new PipeException("The default quality can only be 'GOOD', 'BAD' or 'UNCERTAIN'."); + } + } + @Override public void handshake() throws Exception { // Server side, do nothing @@ -268,7 +444,16 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc transferByTablet( tabletInsertionEvent, LOGGER, - (tablet, isTableModel) -> nameSpace.transfer(tablet, isTableModel, this)); + (tablet, isTableModel) -> { + if (Objects.nonNull(nameSpace)) { + nameSpace.transfer(tablet, isTableModel, this); + } else if (Objects.nonNull(client)) { + client.transfer(tablet, this); + } else { + throw new PipeException( + "No OPC client or server is specified when transferring tablet"); + } + }); } public static void transferByTablet( @@ -336,6 +521,10 @@ public interface ThrowingBiConsumer { @Override public void close() throws Exception { + if (Objects.nonNull(client)) { + client.disconnect(); + } + if (serverKey == null) { return; } @@ -356,4 +545,32 @@ public void close() throws Exception { } } } + + /////////////////////////////// Getter /////////////////////////////// + + public boolean isClientServerModel() { + return isClientServerModel; + } + + public String getDatabaseName() { + return databaseName; + } + + public String getPlaceHolder4NullTag() { + return placeHolder4NullTag; + } + + @Nullable + public String getValueName() { + return valueName; + } + + @Nullable + public String getQualityName() { + return qualityName; + } + + public StatusCode getDefaultQuality() { + return defaultQuality; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java new file mode 100644 index 0000000000000..725ecbbae98a6 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/ClientRunner.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.opcua.client; + +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.eclipse.milo.opcua.sdk.client.OpcUaClient; +import org.eclipse.milo.opcua.stack.client.security.DefaultClientCertificateValidator; +import org.eclipse.milo.opcua.stack.core.security.DefaultTrustListManager; +import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.security.Security; + +import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint; + +public class ClientRunner { + + private static final Logger logger = LoggerFactory.getLogger(ClientRunner.class); + + static { + // Required for SecurityPolicy.Aes256_Sha256_RsaPss + Security.addProvider(new BouncyCastleProvider()); + } + + private final IoTDBOpcUaClient configurableUaClient; + private final Path securityDir; + private final String password; + + public ClientRunner( + final IoTDBOpcUaClient configurableUaClient, + final String securityDir, + final String password) { + this.configurableUaClient = configurableUaClient; + this.securityDir = Paths.get(securityDir); + this.password = password; + } + + private OpcUaClient createClient() throws Exception { + Files.createDirectories(securityDir); + if (!Files.exists(securityDir)) { + throw new Exception("unable to create security dir: " + securityDir); + } + + final File pkiDir = securityDir.resolve("pki").toFile(); + + logger.info("security dir: {}", securityDir.toAbsolutePath()); + logger.info("security pki dir: {}", pkiDir.getAbsolutePath()); + + final IoTDBKeyStoreLoaderClient loader = + new IoTDBKeyStoreLoaderClient().load(securityDir, password.toCharArray()); + + final DefaultTrustListManager trustListManager = new DefaultTrustListManager(pkiDir); + + final DefaultClientCertificateValidator certificateValidator = + new DefaultClientCertificateValidator(trustListManager); + + return OpcUaClient.create( + configurableUaClient.getNodeUrl(), + endpoints -> endpoints.stream().filter(configurableUaClient.endpointFilter()).findFirst(), + configBuilder -> + configBuilder + .setApplicationName(LocalizedText.english("Apache IoTDB OPC UA client")) + .setApplicationUri("urn:apache:iotdb:opc-ua-client") + .setKeyPair(loader.getClientKeyPair()) + .setCertificate(loader.getClientCertificate()) + .setCertificateChain(loader.getClientCertificateChain()) + .setCertificateValidator(certificateValidator) + .setIdentityProvider(configurableUaClient.getIdentityProvider()) + .setRequestTimeout(uint(5000)) + .build()); + } + + public void run() { + try { + final OpcUaClient client = createClient(); + + try { + configurableUaClient.run(client); + } catch (final Exception e) { + throw new PipeException( + "Error running opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } catch (final Exception e) { + throw new PipeException( + "Error getting opc client: " + e.getClass().getSimpleName() + ": " + e.getMessage()); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java new file mode 100644 index 0000000000000..bfaf378822c31 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBKeyStoreLoaderClient.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.opcua.client; + +import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil; +import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder; +import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.Key; +import java.security.KeyPair; +import java.security.KeyStore; +import java.security.PrivateKey; +import java.security.PublicKey; +import java.security.cert.X509Certificate; +import java.util.Arrays; +import java.util.regex.Pattern; + +class IoTDBKeyStoreLoaderClient { + + private static final Logger logger = LoggerFactory.getLogger(ClientRunner.class); + + private static final Pattern IP_ADDR_PATTERN = + Pattern.compile("^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$"); + + private static final String CLIENT_ALIAS = "client-ai"; + + private X509Certificate[] clientCertificateChain; + private X509Certificate clientCertificate; + private KeyPair clientKeyPair; + + IoTDBKeyStoreLoaderClient load(final Path baseDir, final char[] password) throws Exception { + final KeyStore keyStore = KeyStore.getInstance("PKCS12"); + + final Path serverKeyStore = baseDir.resolve("iotdb-client.pfx"); + + logger.info("Loading KeyStore at {}.", serverKeyStore); + + if (!Files.exists(serverKeyStore)) { + keyStore.load(null, password); + + final KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048); + + final SelfSignedCertificateBuilder builder = + new SelfSignedCertificateBuilder(keyPair) + .setCommonName("Apache IoTDB OPC UA client") + .setOrganization("Apache") + .setOrganizationalUnit("dev") + .setLocalityName("Beijing") + .setStateName("China") + .setCountryCode("CN") + .setApplicationUri("urn:apache:iotdb:opc-ua-client") + .addDnsName("localhost") + .addIpAddress("127.0.0.1"); + + // Get as many hostnames and IP addresses as we can listed in the certificate. + for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) { + if (IP_ADDR_PATTERN.matcher(hostname).matches()) { + builder.addIpAddress(hostname); + } else { + builder.addDnsName(hostname); + } + } + + final X509Certificate certificate = builder.build(); + + keyStore.setKeyEntry( + CLIENT_ALIAS, keyPair.getPrivate(), password, new X509Certificate[] {certificate}); + try (OutputStream out = Files.newOutputStream(serverKeyStore)) { + keyStore.store(out, password); + } + } else { + try (InputStream in = Files.newInputStream(serverKeyStore)) { + keyStore.load(in, password); + } + } + + final Key clientPrivateKey = keyStore.getKey(CLIENT_ALIAS, password); + if (clientPrivateKey instanceof PrivateKey) { + clientCertificate = (X509Certificate) keyStore.getCertificate(CLIENT_ALIAS); + + clientCertificateChain = + Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS)) + .map(X509Certificate.class::cast) + .toArray(X509Certificate[]::new); + + final PublicKey serverPublicKey = clientCertificate.getPublicKey(); + clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) clientPrivateKey); + } + + return this; + } + + X509Certificate getClientCertificate() { + return clientCertificate; + } + + public X509Certificate[] getClientCertificateChain() { + return clientCertificateChain; + } + + KeyPair getClientKeyPair() { + return clientKeyPair; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java new file mode 100644 index 0000000000000..bf96d9881807e --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.opcua.client; + +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.eclipse.milo.opcua.sdk.client.OpcUaClient; +import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider; +import org.eclipse.milo.opcua.sdk.core.AccessLevel; +import org.eclipse.milo.opcua.sdk.core.ValueRanks; +import org.eclipse.milo.opcua.stack.core.Identifiers; +import org.eclipse.milo.opcua.stack.core.StatusCodes; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; +import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; +import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; +import org.eclipse.milo.opcua.stack.core.types.builtin.ExpandedNodeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.ExtensionObject; +import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; +import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; +import org.eclipse.milo.opcua.stack.core.types.builtin.QualifiedName; +import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; +import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned; +import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass; +import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesItem; +import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResponse; +import org.eclipse.milo.opcua.stack.core.types.structured.AddNodesResult; +import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription; +import org.eclipse.milo.opcua.stack.core.types.structured.ObjectAttributes; +import org.eclipse.milo.opcua.stack.core.types.structured.VariableAttributes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.function.Predicate; + +import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.convertToOpcDataType; +import static org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace.timestampToUtc; + +public class IoTDBOpcUaClient { + private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class); + + // Customized nodes + private static final int NAME_SPACE_INDEX = 2; + + // Useless for a server only accept client writing + private static final double SAMPLING_INTERVAL_PLACEHOLDER = 500; + private final String nodeUrl; + + private final SecurityPolicy securityPolicy; + private final IdentityProvider identityProvider; + private OpcUaClient client; + private final boolean historizing; + + public IoTDBOpcUaClient( + final String nodeUrl, + final SecurityPolicy securityPolicy, + final IdentityProvider identityProvider, + final boolean historizing) { + this.nodeUrl = nodeUrl; + this.securityPolicy = securityPolicy; + this.identityProvider = identityProvider; + this.historizing = historizing; + } + + public void run(final OpcUaClient client) throws Exception { + // synchronous connect + this.client = client; + client.connect().get(); + } + + // Only support tree model & client-server + public void transfer(final Tablet tablet, final OpcUaSink sink) throws Exception { + OpcUaNameSpace.transferTabletForClientServerModel( + tablet, false, sink, this::transferTabletRowForClientServerModel); + } + + private void transferTabletRowForClientServerModel( + final String[] segments, + final List measurementSchemas, + final List timestamps, + final List values, + final OpcUaSink sink) + throws Exception { + StatusCode currentQuality = sink.getDefaultQuality(); + Object value = null; + long timestamp = 0; + NodeId nodeId = null; + NodeId opcDataType = null; + + for (int i = 0; i < measurementSchemas.size(); ++i) { + if (Objects.isNull(values.get(i))) { + continue; + } + final String name = measurementSchemas.get(i).getMeasurementName(); + final TSDataType type = measurementSchemas.get(i).getType(); + if (Objects.nonNull(sink.getQualityName()) && sink.getQualityName().equals(name)) { + if (!type.equals(TSDataType.BOOLEAN)) { + throw new UnsupportedOperationException( + "The quality value only supports boolean type, while true == GOOD and false == BAD."); + } + currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : StatusCode.BAD; + continue; + } + if (Objects.nonNull(sink.getValueName()) && !sink.getValueName().equals(name)) { + PipeLogger.log( + LOGGER::warn, + "When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\""); + continue; + } + nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments)); + + final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0)); + value = values.get(i); + timestamp = utcTimestamp; + opcDataType = convertToOpcDataType(type); + } + if (Objects.isNull(value)) { + return; + } + + final Variant variant = new Variant(value); + final DataValue dataValue = + new DataValue(variant, currentQuality, new DateTime(timestamp), new DateTime()); + StatusCode writeStatus = client.writeValue(nodeId, dataValue).get(); + + if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) { + final AddNodesResponse addStatus = + client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get(); + for (final AddNodesResult result : addStatus.getResults()) { + if (!result.getStatusCode().equals(StatusCode.GOOD) + && !(result.getStatusCode().getValue() == StatusCodes.Bad_NodeIdExists)) { + throw new PipeException( + "Failed to create nodes after transfer data value, creation status: " + + addStatus + + getErrorString(segments, opcDataType, value, writeStatus)); + } + } + writeStatus = client.writeValue(nodeId, dataValue).get(); + if (writeStatus.getValue() != StatusCode.GOOD.getValue()) { + throw new PipeException( + "Failed to transfer dataValue after successfully created nodes" + + getErrorString(segments, opcDataType, value, writeStatus)); + } + } else if (writeStatus.getValue() != StatusCode.GOOD.getValue()) { + throw new PipeException( + "Failed to transfer dataValue" + + getErrorString(segments, opcDataType, value, writeStatus)); + } + } + + private static String getErrorString( + final String[] segments, + final NodeId dataType, + final Object value, + final StatusCode writeStatus) { + return ", segments: " + + Arrays.toString(segments) + + ", dataType: " + + dataType + + ", value: " + + value + + ", error: " + + writeStatus; + } + + public List getNodesToAdd( + final String[] segments, final NodeId opcDataType, final Variant initialValue) { + final List addNodesItems = new ArrayList<>(); + final StringBuilder sb = new StringBuilder(segments[0]); + ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX, segments[0]).expanded(); + addNodesItems.add( + new AddNodesItem( + Identifiers.ObjectsFolder.expanded(), + Identifiers.Organizes, + curNodeId, + new QualifiedName(NAME_SPACE_INDEX, segments[0]), + NodeClass.Object, + ExtensionObject.encode( + client.getStaticSerializationContext(), createFolderAttributes(segments[0])), + Identifiers.FolderType.expanded())); + + // segments.length >= 3 + for (int i = 1; i < segments.length - 1; ++i) { + sb.append("/").append(segments[i]); + final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded(); + addNodesItems.add( + new AddNodesItem( + curNodeId, + Identifiers.Organizes, + nextId, + new QualifiedName(NAME_SPACE_INDEX, segments[i]), + NodeClass.Object, + ExtensionObject.encode( + client.getStaticSerializationContext(), createFolderAttributes(segments[i])), + Identifiers.FolderType.expanded())); + curNodeId = nextId; + } + + final String measurementName = segments[segments.length - 1]; + sb.append("/").append(measurementName); + addNodesItems.add( + new AddNodesItem( + curNodeId, + Identifiers.Organizes, + new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded(), + new QualifiedName(NAME_SPACE_INDEX, measurementName), + NodeClass.Variable, + ExtensionObject.encode( + client.getStaticSerializationContext(), + createMeasurementAttributes(measurementName, opcDataType, initialValue)), + Identifiers.BaseDataVariableType.expanded())); + + return addNodesItems; + } + + public void disconnect() throws Exception { + client.disconnect().get(); + } + + /////////////////////////////// Getter /////////////////////////////// + + String getNodeUrl() { + return nodeUrl; + } + + Predicate endpointFilter() { + return e -> getSecurityPolicy().getUri().equals(e.getSecurityPolicyUri()); + } + + SecurityPolicy getSecurityPolicy() { + return securityPolicy; + } + + IdentityProvider getIdentityProvider() { + return identityProvider; + } + + @TestOnly + public OpcUaClient getClient() { + return client; + } + + /////////////////////////////// Attribute creator /////////////////////////////// + + private VariableAttributes createMeasurementAttributes( + final String name, final NodeId objectType, final Variant initialValue) { + return new VariableAttributes( + Unsigned.uint(0xFFFF), // specifiedAttributes + LocalizedText.english(name), + LocalizedText.english(name), + Unsigned.uint(0), // writeMask + Unsigned.uint(0), // userWriteMask + initialValue, + objectType, + ValueRanks.Scalar, + null, // arrayDimensions + AccessLevel.toValue(AccessLevel.READ_WRITE), + AccessLevel.toValue(AccessLevel.READ_WRITE), + SAMPLING_INTERVAL_PLACEHOLDER, + historizing); + } + + private static ObjectAttributes createFolderAttributes(final String name) { + return new ObjectAttributes( + Unsigned.uint(0xFFFF), // specifiedAttributes + LocalizedText.english(name), + LocalizedText.english(name), + Unsigned.uint(0), // writeMask + Unsigned.uint(0), // userWriteMask + null // notifier + ); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java similarity index 98% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java index b17f27532d7ae..56b231fb4608a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaKeyStoreLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaKeyStoreLoader.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.iotdb.db.pipe.sink.protocol.opcua; +package org.apache.iotdb.db.pipe.sink.protocol.opcua.server; import org.apache.iotdb.commons.utils.FileUtils; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java similarity index 87% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java index 6850fba8f20dc..56a9068018557 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java @@ -17,10 +17,12 @@ * under the License. */ -package org.apache.iotdb.db.pipe.sink.protocol.opcua; +package org.apache.iotdb.db.pipe.sink.protocol.opcua.server; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeNonCriticalException; +import org.apache.iotdb.commons.pipe.resource.log.PipeLogger; +import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.db.utils.DateTimeUtils; @@ -48,12 +50,15 @@ import org.eclipse.milo.opcua.sdk.server.util.SubscriptionModel; import org.eclipse.milo.opcua.stack.core.Identifiers; import org.eclipse.milo.opcua.stack.core.UaException; +import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy; import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue; import org.eclipse.milo.opcua.stack.core.types.builtin.DateTime; import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText; import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId; import org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode; import org.eclipse.milo.opcua.stack.core.types.builtin.Variant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.file.Paths; import java.sql.Date; @@ -64,15 +69,17 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle { + private static final Logger LOGGER = LoggerFactory.getLogger(OpcUaNameSpace.class); public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server"; private final SubscriptionModel subscriptionModel; private final OpcUaServerBuilder builder; - OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) { + public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder builder) { super(server, NAMESPACE_URI); this.builder = builder; @@ -94,17 +101,22 @@ public void shutdown() { }); } - void transfer(final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) - throws UaException { - if (sink.isClientServerModel) { - transferTabletForClientServerModel(tablet, isTableModel, sink); + public void transfer(final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) + throws Exception { + if (sink.isClientServerModel()) { + transferTabletForClientServerModel( + tablet, isTableModel, sink, this::transferTabletRowForClientServerModel); } else { transferTabletForPubSubModel(tablet, isTableModel, sink); } } - private void transferTabletForClientServerModel( - final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) { + public static void transferTabletForClientServerModel( + final Tablet tablet, + final boolean isTableModel, + final OpcUaSink sink, + final TabletRowConsumer consumer) + throws Exception { final List schemas = tablet.getSchemas(); final List newSchemas = new ArrayList<>(); if (!isTableModel) { @@ -125,8 +137,7 @@ private void transferTabletForClientServerModel( } } - transferTabletRowForClientServerModel( - tablet.getDeviceId().split("\\."), newSchemas, timestamps, values, sink); + consumer.accept(tablet.getDeviceId().split("\\."), newSchemas, timestamps, values, sink); } else { new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary(); @@ -141,15 +152,15 @@ private void transferTabletForClientServerModel( for (int i = 0; i < tablet.getRowSize(); ++i) { final Object[] segments = tablet.getDeviceID(i).getSegments(); final String[] folderSegments = new String[segments.length + 1]; - folderSegments[0] = sink.unQualifiedDatabaseName; + folderSegments[0] = sink.getDatabaseName(); for (int j = 0; j < segments.length; ++j) { folderSegments[j + 1] = - Objects.isNull(segments[j]) ? sink.placeHolder : (String) segments[j]; + Objects.isNull(segments[j]) ? sink.getPlaceHolder4NullTag() : (String) segments[j]; } final int finalI = i; - transferTabletRowForClientServerModel( + consumer.accept( folderSegments, newSchemas, Collections.singletonList(tablet.getTimestamp(i)), @@ -166,6 +177,17 @@ private void transferTabletForClientServerModel( } } + @FunctionalInterface + public interface TabletRowConsumer { + void accept( + final String[] segments, + final List measurementSchemas, + final List timestamps, + final List values, + final OpcUaSink sink) + throws Exception; + } + private void transferTabletRowForClientServerModel( final String[] segments, final List measurementSchemas, @@ -179,7 +201,7 @@ private void transferTabletRowForClientServerModel( UaNode folderNode = null; NodeId folderNodeId; for (int i = 0; - i < (Objects.isNull(sink.valueName) ? segments.length : segments.length - 1); + i < (Objects.isNull(sink.getValueName()) ? segments.length : segments.length - 1); ++i) { final String segment = segments[i]; final UaNode nextFolderNode; @@ -227,8 +249,7 @@ private void transferTabletRowForClientServerModel( final String currentFolder = currentStr.toString(); - StatusCode currentQuality = - Objects.isNull(sink.valueName) ? StatusCode.GOOD : StatusCode.UNCERTAIN; + StatusCode currentQuality = sink.getDefaultQuality(); UaVariableNode valueNode = null; Object value = null; long timestamp = 0; @@ -239,7 +260,7 @@ private void transferTabletRowForClientServerModel( } final String name = measurementSchemas.get(i).getMeasurementName(); final TSDataType type = measurementSchemas.get(i).getType(); - if (Objects.nonNull(sink.qualityName) && sink.qualityName.equals(name)) { + if (Objects.nonNull(sink.getQualityName()) && sink.getQualityName().equals(name)) { if (!type.equals(TSDataType.BOOLEAN)) { throw new UnsupportedOperationException( "The quality value only supports boolean type, while true == GOOD and false == BAD."); @@ -247,11 +268,14 @@ private void transferTabletRowForClientServerModel( currentQuality = values.get(i) == Boolean.TRUE ? StatusCode.GOOD : StatusCode.BAD; continue; } - if (Objects.nonNull(sink.valueName) && !sink.valueName.equals(name)) { - throw new UnsupportedOperationException( + if (Objects.nonNull(sink.getValueName()) && !sink.getValueName().equals(name)) { + PipeLogger.log( + LOGGER::warn, "When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\""); + continue; } - final String nodeName = Objects.isNull(sink.valueName) ? name : segments[segments.length - 1]; + final String nodeName = + Objects.isNull(sink.getValueName()) ? name : segments[segments.length - 1]; final NodeId nodeId = newNodeId(currentFolder + nodeName); final UaVariableNode measurementNode; if (!getNodeManager().containsNode(nodeId)) { @@ -288,7 +312,7 @@ private void transferTabletRowForClientServerModel( } final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0)); - if (Objects.isNull(sink.valueName)) { + if (Objects.isNull(sink.getValueName())) { if (Objects.isNull(measurementNode.getValue()) || Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime() < utcTimestamp) { @@ -342,7 +366,7 @@ private static Object getTabletObjectValue4Opc( } } - private static long timestampToUtc(final long timeStamp) { + public static long timestampToUtc(final long timeStamp) { return TimestampPrecisionUtils.currPrecision.toNanos(timeStamp) / 100L + 116444736000000000L; } @@ -365,11 +389,11 @@ private void transferTabletForPubSubModel( if (isTableModel) { sourceNameList = new ArrayList<>(tablet.getRowSize()); for (int i = 0; i < tablet.getRowSize(); ++i) { - final StringBuilder idBuilder = new StringBuilder(sink.unQualifiedDatabaseName); + final StringBuilder idBuilder = new StringBuilder(sink.getDatabaseName()); for (final Object segment : tablet.getDeviceID(i).getSegments()) { idBuilder .append(TsFileConstant.PATH_SEPARATOR) - .append(Objects.isNull(segment) ? sink.placeHolder : segment); + .append(Objects.isNull(segment) ? sink.getPlaceHolder4NullTag() : segment); } sourceNameList.add(idBuilder.toString()); } @@ -473,7 +497,7 @@ private void transferTabletForPubSubModel( eventNode.delete(); } - private NodeId convertToOpcDataType(final TSDataType type) { + public static NodeId convertToOpcDataType(final TSDataType type) { switch (type) { case BOOLEAN: return Identifiers.Boolean; @@ -493,6 +517,7 @@ private NodeId convertToOpcDataType(final TSDataType type) { case STRING: return Identifiers.String; case VECTOR: + case OBJECT: case UNKNOWN: default: throw new PipeRuntimeNonCriticalException("Unsupported data type: " + type); @@ -521,11 +546,13 @@ public void onMonitoringModeChanged(final List monitoredItems) { /////////////////////////////// Conflict detection /////////////////////////////// - void checkEquals( + public void checkEquals( final String user, final String password, final String securityDir, - final boolean enableAnonymousAccess) { - builder.checkEquals(user, password, Paths.get(securityDir), enableAnonymousAccess); + final boolean enableAnonymousAccess, + final Set securityPolicies) { + builder.checkEquals( + user, password, Paths.get(securityDir), enableAnonymousAccess, securityPolicies); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java similarity index 84% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java index bc2df4839e2bc..2bc19b7167da9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaServerBuilder.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaServerBuilder.java @@ -17,9 +17,8 @@ * under the License. */ -package org.apache.iotdb.db.pipe.sink.protocol.opcua; +package org.apache.iotdb.db.pipe.sink.protocol.opcua.server; -import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.pipe.api.exception.PipeException; import org.eclipse.milo.opcua.sdk.server.OpcUaServer; @@ -58,6 +57,7 @@ import java.nio.file.Paths; import java.security.KeyPair; import java.security.cert.X509Certificate; +import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; @@ -84,48 +84,45 @@ public class OpcUaServerBuilder implements Closeable { private String password; private Path securityDir; private boolean enableAnonymousAccess; + private Set securityPolicies; private DefaultTrustListManager trustListManager; - OpcUaServerBuilder() { - tcpBindPort = PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE; - httpsBindPort = PipeSinkConstant.CONNECTOR_OPC_UA_HTTPS_BIND_PORT_DEFAULT_VALUE; - user = PipeSinkConstant.CONNECTOR_IOTDB_USER_DEFAULT_VALUE; - password = PipeSinkConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; - securityDir = Paths.get(PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE); - enableAnonymousAccess = PipeSinkConstant.CONNECTOR_OPC_UA_ENABLE_ANONYMOUS_ACCESS_DEFAULT_VALUE; - } - - OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) { + public OpcUaServerBuilder setTcpBindPort(final int tcpBindPort) { this.tcpBindPort = tcpBindPort; return this; } - OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) { + public OpcUaServerBuilder setHttpsBindPort(final int httpsBindPort) { this.httpsBindPort = httpsBindPort; return this; } - OpcUaServerBuilder setUser(final String user) { + public OpcUaServerBuilder setUser(final String user) { this.user = user; return this; } - OpcUaServerBuilder setPassword(final String password) { + public OpcUaServerBuilder setPassword(final String password) { this.password = password; return this; } - OpcUaServerBuilder setSecurityDir(final String securityDir) { + public OpcUaServerBuilder setSecurityDir(final String securityDir) { this.securityDir = Paths.get(securityDir); return this; } - OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) { + public OpcUaServerBuilder setEnableAnonymousAccess(final boolean enableAnonymousAccess) { this.enableAnonymousAccess = enableAnonymousAccess; return this; } - OpcUaServer build() throws Exception { + public OpcUaServerBuilder setSecurityPolicies(final Set securityPolicies) { + this.securityPolicies = securityPolicies; + return this; + } + + public OpcUaServer build() throws Exception { Files.createDirectories(securityDir); if (!Files.exists(securityDir)) { throw new PipeException("Unable to create security dir: " + securityDir); @@ -247,30 +244,36 @@ private Set createEndpointConfigurations( USER_TOKEN_POLICY_USERNAME, USER_TOKEN_POLICY_X509); - final EndpointConfiguration.Builder noSecurityBuilder = - builder - .copy() - .setSecurityPolicy(SecurityPolicy.None) - .setSecurityMode(MessageSecurityMode.None); - - endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, tcpBindPort)); - endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, httpsBindPort)); - - endpointConfigurations.add( - buildTcpEndpoint( - builder - .copy() - .setSecurityPolicy(SecurityPolicy.Basic256Sha256) - .setSecurityMode(MessageSecurityMode.SignAndEncrypt), - tcpBindPort)); - - endpointConfigurations.add( - buildHttpsEndpoint( - builder - .copy() - .setSecurityPolicy(SecurityPolicy.Basic256Sha256) - .setSecurityMode(MessageSecurityMode.Sign), - httpsBindPort)); + final Set securityPolicySet = new HashSet<>(securityPolicies); + if (securityPolicySet.contains(SecurityPolicy.None)) { + final EndpointConfiguration.Builder noSecurityBuilder = + builder + .copy() + .setSecurityPolicy(SecurityPolicy.None) + .setSecurityMode(MessageSecurityMode.None); + + endpointConfigurations.add(buildTcpEndpoint(noSecurityBuilder, tcpBindPort)); + endpointConfigurations.add(buildHttpsEndpoint(noSecurityBuilder, httpsBindPort)); + securityPolicySet.remove(SecurityPolicy.None); + } + + for (final SecurityPolicy securityPolicy : securityPolicySet) { + endpointConfigurations.add( + buildTcpEndpoint( + builder + .copy() + .setSecurityPolicy(securityPolicy) + .setSecurityMode(MessageSecurityMode.SignAndEncrypt), + tcpBindPort)); + + endpointConfigurations.add( + buildHttpsEndpoint( + builder + .copy() + .setSecurityPolicy(securityPolicy) + .setSecurityMode(MessageSecurityMode.Sign), + httpsBindPort)); + } final EndpointConfiguration.Builder discoveryBuilder = builder @@ -309,7 +312,8 @@ void checkEquals( final String user, final String password, final Path securityDir, - final boolean enableAnonymousAccess) { + final boolean enableAnonymousAccess, + final Set securityPolicies) { checkEquals("user", this.user, user); checkEquals("password", this.password, password); checkEquals( @@ -317,6 +321,7 @@ void checkEquals( FileSystems.getDefault().getPath(this.securityDir.toAbsolutePath().toString()), FileSystems.getDefault().getPath(securityDir.toAbsolutePath().toString())); checkEquals("enableAnonymousAccess option", this.enableAnonymousAccess, enableAnonymousAccess); + checkEquals("securityPolicies", this.securityPolicies, securityPolicies); } private void checkEquals(final String attrName, final Object thisAttr, final Object thatAttr) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java index 1543e339fc86a..269978e87bde0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/CreatePipe.java @@ -115,9 +115,9 @@ public long ramBytesUsed() { long size = INSTANCE_SIZE; size += AstMemoryEstimationHelper.getEstimatedSizeOfNodeLocation(getLocationInternal()); size += RamUsageEstimator.sizeOf(pipeName); - size += RamUsageEstimator.sizeOfMap(extractorAttributes); + size += RamUsageEstimator.sizeOfMap(sourceAttributes); size += RamUsageEstimator.sizeOfMap(processorAttributes); - size += RamUsageEstimator.sizeOfMap(connectorAttributes); + size += RamUsageEstimator.sizeOfMap(sinkAttributes); return size; } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java index 26ff72cde2405..ec2122b917574 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java @@ -172,7 +172,8 @@ public void testOpcUaSink() { false, "root.db", "db", "root.db", tablet, false, "pipe", 0L, null, null, false); event.increaseReferenceCount(""); normalOPC.transfer(event); - Assert.assertThrows(UnsupportedOperationException.class, () -> qualityOPC.transfer(event)); + // Shall not throw + qualityOPC.transfer(event); event.decreaseReferenceCount("", false); qualityOPC.transfer( diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java index ecdc01237e911..b27bb59224da4 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSinkConstant.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Set; import static org.apache.iotdb.commons.conf.IoTDBConstant.MB; @@ -180,7 +181,7 @@ public class PipeSinkConstant { public static final String CONNECTOR_OPC_UA_PLACEHOLDER_KEY = "connector.opcua.placeholder"; public static final String SINK_OPC_UA_PLACEHOLDER_KEY = "sink.opcua.placeholder"; - public static final String CONNECTOR_OPC_UA_PLACEHOLDER_DEFAULT_VALUE = "null"; + public static final String CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE = "null"; public static final String CONNECTOR_OPC_UA_WITH_QUALITY_KEY = "connector.opcua.with-quality"; public static final String SINK_OPC_UA_WITH_QUALITY_KEY = "sink.opcua.with-quality"; @@ -194,6 +195,40 @@ public class PipeSinkConstant { public static final String SINK_OPC_UA_QUALITY_NAME_KEY = "sink.opcua.quality-name"; public static final String CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE = "quality"; + public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_KEY = + "connector.opcua.default-quality"; + public static final String SINK_OPC_UA_DEFAULT_QUALITY_KEY = "sink.opcua.default-quality"; + public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_GOOD_VALUE = "GOOD"; + public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_BAD_VALUE = "BAD"; + public static final String CONNECTOR_OPC_UA_DEFAULT_QUALITY_UNCERTAIN_VALUE = "UNCERTAIN"; + + public static final String CONNECTOR_OPC_UA_NODE_URL_KEY = "connector.opcua.node-url"; + public static final String SINK_OPC_UA_NODE_URL_KEY = "sink.opcua.node-url"; + + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_KEY = + "connector.opcua.security-policy"; + public static final String SINK_OPC_UA_SECURITY_POLICY_KEY = "sink.opcua.security-policy"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_NONE_VALUE = "NONE"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_128_RSA_15_VALUE = + "BASIC128RSA15"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_VALUE = "BASIC256"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE = + "BASIC256SHA256"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE = + "AES128_SHA256_RSAOAEP"; + public static final String CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE = + "AES256_SHA256_RSAPSS"; + + public static final List CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES = + Arrays.asList( + CONNECTOR_OPC_UA_SECURITY_POLICY_BASIC_256_SHA_256_VALUE, + CONNECTOR_OPC_UA_SECURITY_POLICY_AES128_SHA256_RSAOAEP_VALUE, + CONNECTOR_OPC_UA_SECURITY_POLICY_AES256_SHA256_RSAPSS_VALUE); + + public static final String CONNECTOR_OPC_UA_HISTORIZING_KEY = "connector.opcua.historizing"; + public static final String SINK_OPC_UA_HISTORIZING_KEY = "sink.opcua.historizing"; + public static final boolean CONNECTOR_OPC_UA_HISTORIZING_DEFAULT_VALUE = false; + public static final String CONNECTOR_LEADER_CACHE_ENABLE_KEY = "connector.leader-cache.enable"; public static final String SINK_LEADER_CACHE_ENABLE_KEY = "sink.leader-cache.enable"; public static final boolean CONNECTOR_LEADER_CACHE_ENABLE_DEFAULT_VALUE = true; diff --git a/pom.xml b/pom.xml index b39fbb78af842..07651de14c16d 100644 --- a/pom.xml +++ b/pom.xml @@ -409,6 +409,16 @@ stack-server ${milo.version} + + org.eclipse.milo + stack-client + ${milo.version} + + + org.eclipse.milo + sdk-client + ${milo.version} + io.airlift