From 3f35a50ea2adec12d3573fd10b33d34028a3066f Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 21 Apr 2026 17:33:14 +0800 Subject: [PATCH] fix --- .../opcua/client/IoTDBOpcUaClient.java | 65 ++++++++++++++----- .../protocol/opcua/server/OpcUaNameSpace.java | 12 ++-- 2 files changed, 56 insertions(+), 21 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java index 977d672df7356..a9aa95cacdf2a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.common.constant.TsFileConstant; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.write.record.Tablet; import org.apache.tsfile.write.schema.IMeasurementSchema; @@ -56,9 +57,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.ExecutionException; @@ -130,7 +132,6 @@ private void transferTabletRowForClientServerModel( StatusCode currentQuality = sink.getDefaultQuality(); Object value = null; long timestamp = 0; - NodeId nodeId = null; NodeId opcDataType = null; for (int i = 0; i < measurementSchemas.size(); ++i) { @@ -153,17 +154,43 @@ private void transferTabletRowForClientServerModel( "When the 'with-quality' mode is enabled, the measurement must be either \"value-name\" or \"quality-name\""); continue; } - nodeId = new NodeId(NAME_SPACE_INDEX, String.join("/", segments)); final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0)); - value = values.get(i); - timestamp = utcTimestamp; - opcDataType = convertToOpcDataType(type); + if (Objects.isNull(sink.getValueName())) { + writeValue( + values.get(i), + utcTimestamp, + convertToOpcDataType(type), + currentQuality, + segments, + name); + } else { + value = values.get(i); + timestamp = utcTimestamp; + opcDataType = convertToOpcDataType(type); + } } if (Objects.isNull(value)) { return; } + writeValue(value, timestamp, opcDataType, currentQuality, segments, null); + } + + private void writeValue( + final Object value, + final long timestamp, + final NodeId opcDataType, + final StatusCode currentQuality, + final String[] segments, + final @Nullable String name) + throws Exception { + final NodeId nodeId = + new NodeId( + NAME_SPACE_INDEX, + Objects.nonNull(name) + ? String.join("/", segments) + "/" + name + : String.join("/", segments)); final Variant variant = new Variant(value); final DataValue dataValue = new DataValue(variant, currentQuality, new DateTime(timestamp), new DateTime()); @@ -171,36 +198,41 @@ private void transferTabletRowForClientServerModel( if (writeStatus.getValue() == StatusCodes.Bad_NodeIdUnknown) { final AddNodesResponse addStatus = - client.addNodes(getNodesToAdd(segments, opcDataType, variant)).get(); + client.addNodes(getNodesToAdd(segments, name, opcDataType, variant)).get(); for (final AddNodesResult result : addStatus.getResults()) { if (!result.getStatusCode().equals(StatusCode.GOOD) && !(result.getStatusCode().getValue() == StatusCodes.Bad_NodeIdExists)) { throw new PipeException( "Failed to create nodes after transfer data value, creation status: " + addStatus - + getErrorString(segments, opcDataType, value, writeStatus)); + + getErrorString(segments, name, opcDataType, value, writeStatus)); } } writeStatus = client.writeValue(nodeId, dataValue).get(); if (writeStatus.getValue() != StatusCode.GOOD.getValue()) { throw new PipeException( "Failed to transfer dataValue after successfully created nodes" - + getErrorString(segments, opcDataType, value, writeStatus)); + + getErrorString(segments, name, opcDataType, value, writeStatus)); } } else if (writeStatus.getValue() != StatusCode.GOOD.getValue()) { throw new PipeException( "Failed to transfer dataValue" - + getErrorString(segments, opcDataType, value, writeStatus)); + + getErrorString(segments, name, opcDataType, value, writeStatus)); } } private static String getErrorString( final String[] segments, + final @Nullable String name, final NodeId dataType, final Object value, final StatusCode writeStatus) { - return ", segments: " - + Arrays.toString(segments) + return ", measurement: " + + (Objects.nonNull(name) + ? String.join(TsFileConstant.PATH_SEPARATOR, segments) + + TsFileConstant.PATH_SEPARATOR + + name + : String.join(TsFileConstant.PATH_SEPARATOR, segments)) + ", dataType: " + dataType + ", value: " @@ -210,7 +242,10 @@ private static String getErrorString( } public List getNodesToAdd( - final String[] segments, final NodeId opcDataType, final Variant initialValue) { + final String[] segments, + final @Nullable String name, + final NodeId opcDataType, + final Variant initialValue) { final List addNodesItems = new ArrayList<>(); final StringBuilder sb = new StringBuilder(segments[0]); ExpandedNodeId curNodeId = new NodeId(NAME_SPACE_INDEX, segments[0]).expanded(); @@ -226,7 +261,7 @@ public List getNodesToAdd( Identifiers.FolderType.expanded())); // segments.length >= 3 - for (int i = 1; i < segments.length - 1; ++i) { + for (int i = 1; i < (Objects.nonNull(name) ? segments.length : segments.length - 1); ++i) { sb.append("/").append(segments[i]); final ExpandedNodeId nextId = new NodeId(NAME_SPACE_INDEX, sb.toString()).expanded(); addNodesItems.add( @@ -242,7 +277,7 @@ public List getNodesToAdd( curNodeId = nextId; } - final String measurementName = segments[segments.length - 1]; + final String measurementName = Objects.nonNull(name) ? name : segments[segments.length - 1]; sb.append("/").append(measurementName); addNodesItems.add( new AddNodesItem( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java index d1182e59b52c6..ea3c0539fd51d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java @@ -294,14 +294,14 @@ private void transferTabletRowForClientServerModel( } final UaVariableNode measurementNode; final long utcTimestamp = timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0)); - final DataValue dataValue = - new DataValue( - new Variant(values.get(i)), - currentQuality, - new DateTime(utcTimestamp), - new DateTime()); if (Objects.isNull(sink.getValueName())) { + final DataValue dataValue = + new DataValue( + new Variant(values.get(i)), + currentQuality, + new DateTime(utcTimestamp), + new DateTime()); measurementNode = addNode(name, currentFolder, folderNode, dataValue, type); if (Objects.isNull(measurementNode.getValue()) || Objects.isNull(measurementNode.getValue().getSourceTime())