"""
High level node object, to access node attribute
and browse address space
"""
from datetime import datetime
from opcua import ua
from opcua.common import events
import opcua.common
def _check_results(results, reqlen=1):
assert len(results) == reqlen, results
for r in results:
r.check()
def _to_nodeid(nodeid):
if isinstance(nodeid, int):
return ua.TwoByteNodeId(nodeid)
elif isinstance(nodeid, Node):
return nodeid.nodeid
elif isinstance(nodeid, ua.NodeId):
return nodeid
elif type(nodeid) in (str, bytes):
return ua.NodeId.from_string(nodeid)
else:
raise ua.UaError("Could not resolve '{0}' to a type id".format(nodeid))
[docs]class Node(object):
"""
High level node object, to access node attribute,
browse and populate address space.
Node objects are usefull as-is but they do not expose the entire
OPC-UA protocol. Feel free to look at the code of this class and call
directly UA services methods to optimize your code
"""
def __init__(self, server, nodeid):
self.server = server
self.nodeid = None
if isinstance(nodeid, Node):
self.nodeid = nodeid.nodeid
elif isinstance(nodeid, ua.NodeId):
self.nodeid = nodeid
elif type(nodeid) in (str, bytes):
self.nodeid = ua.NodeId.from_string(nodeid)
elif isinstance(nodeid, int):
self.nodeid = ua.NodeId(nodeid, 0)
else:
raise ua.UaError("argument to node must be a NodeId object or a string defining a nodeid found {0} of type {1}".format(nodeid, type(nodeid)))
self.basenodeid = None
def __eq__(self, other):
if isinstance(other, Node) and self.nodeid == other.nodeid:
return True
return False
def __ne__(self, other):
return not self.__eq__(other)
def __str__(self):
return self.nodeid.to_string()
def __repr__(self):
return "Node({0})".format(self.nodeid)
def __hash__(self):
return self.nodeid.__hash__()
[docs] def get_browse_name(self):
"""
Get browse name of a node. A browse name is a QualifiedName object
composed of a string(name) and a namespace index.
"""
result = self.get_attribute(ua.AttributeIds.BrowseName)
return result.Value.Value
[docs] def get_display_name(self):
"""
get description attribute of node
"""
result = self.get_attribute(ua.AttributeIds.DisplayName)
return result.Value.Value
[docs] def get_data_type(self):
"""
get data type of node as NodeId
"""
result = self.get_attribute(ua.AttributeIds.DataType)
return result.Value.Value
[docs] def get_data_type_as_variant_type(self):
"""
get data type of node as VariantType
This only works if node is a variable, otherwise type
may not be convertible to VariantType
"""
result = self.get_attribute(ua.AttributeIds.DataType)
return opcua.common.ua_utils.data_type_to_variant_type(Node(self.server, result.Value.Value))
[docs] def get_access_level(self):
"""
Get the access level attribute of the node as a set of AccessLevel enum values.
"""
result = self.get_attribute(ua.AttributeIds.AccessLevel)
return ua.AccessLevel.parse_bitfield(result.Value.Value)
[docs] def get_user_access_level(self):
"""
Get the user access level attribute of the node as a set of AccessLevel enum values.
"""
result = self.get_attribute(ua.AttributeIds.UserAccessLevel)
return ua.AccessLevel.parse_bitfield(result.Value.Value)
[docs] def get_event_notifier(self):
"""
Get the event notifier attribute of the node as a set of EventNotifier enum values.
"""
result = self.get_attribute(ua.AttributeIds.EventNotifier)
return ua.EventNotifier.parse_bitfield(result.Value.Value)
[docs] def set_event_notifier(self, values):
"""
Set the event notifier attribute.
:param values: an iterable of EventNotifier enum values.
"""
event_notifier_bitfield = ua.EventNotifier.to_bitfield(values)
self.set_attribute(ua.AttributeIds.EventNotifier, ua.DataValue(ua.Variant(event_notifier_bitfield, ua.VariantType.Byte)))
[docs] def get_node_class(self):
"""
get node class attribute of node
"""
result = self.get_attribute(ua.AttributeIds.NodeClass)
return result.Value.Value
[docs] def get_description(self):
"""
get description attribute class of node
"""
result = self.get_attribute(ua.AttributeIds.Description)
return result.Value.Value
[docs] def get_value(self):
"""
Get value of a node as a python type. Only variables ( and properties) have values.
An exception will be generated for other node types.
WARNING: on server side, this function returns a ref to object in ua database. Do not modify it if it is a mutable
object unless you know what you are doing
"""
result = self.get_data_value()
return result.Value.Value
[docs] def get_data_value(self):
"""
Get value of a node as a DataValue object. Only variables (and properties) have values.
An exception will be generated for other node types.
DataValue contain a variable value as a variant as well as server and source timestamps
"""
return self.get_attribute(ua.AttributeIds.Value)
[docs] def set_array_dimensions(self, value):
"""
Set attribute ArrayDimensions of node
make sure it has the correct data type
"""
v = ua.Variant(value, ua.VariantType.UInt32)
self.set_attribute(ua.AttributeIds.ArrayDimensions, ua.DataValue(v))
[docs] def get_array_dimensions(self):
"""
Read and return ArrayDimensions attribute of node
"""
res = self.get_attribute(ua.AttributeIds.ArrayDimensions)
return res.Value.Value
[docs] def set_value_rank(self, value):
"""
Set attribute ArrayDimensions of node
"""
v = ua.Variant(value, ua.VariantType.Int32)
self.set_attribute(ua.AttributeIds.ValueRank, ua.DataValue(v))
[docs] def get_value_rank(self):
"""
Read and return ArrayDimensions attribute of node
"""
res = self.get_attribute(ua.AttributeIds.ValueRank)
return res.Value.Value
[docs] def set_value(self, value, varianttype=None):
"""
Set value of a node. Only variables(properties) have values.
An exception will be generated for other node types.
value argument is either:
* a python built-in type, converted to opc-ua
optionnaly using the variantype argument.
* a ua.Variant, varianttype is then ignored
* a ua.DataValue, you then have full control over data send to server
WARNING: On server side, ref to object is directly saved in our UA db, if this is a mutable object
and you modfy it afterward, then the object in db will be modified without any
data change event generated
"""
datavalue = None
if isinstance(value, ua.DataValue):
datavalue = value
elif isinstance(value, ua.Variant):
datavalue = ua.DataValue(value)
datavalue.SourceTimestamp = datetime.utcnow()
else:
datavalue = ua.DataValue(ua.Variant(value, varianttype))
datavalue.SourceTimestamp = datetime.utcnow()
self.set_attribute(ua.AttributeIds.Value, datavalue)
set_data_value = set_value
[docs] def set_writable(self, writable=True):
"""
Set node as writable by clients.
A node is always writable on server side.
"""
if writable:
self.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
self.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
else:
self.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.CurrentWrite)
self.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.CurrentWrite)
[docs] def set_attr_bit(self, attr, bit):
val = self.get_attribute(attr)
val.Value.Value = ua.ua_binary.set_bit(val.Value.Value, bit)
self.set_attribute(attr, val)
[docs] def unset_attr_bit(self, attr, bit):
val = self.get_attribute(attr)
val.Value.Value = ua.ua_binary.unset_bit(val.Value.Value, bit)
self.set_attribute(attr, val)
[docs] def set_read_only(self):
"""
Set a node as read-only for clients.
A node is always writable on server side.
"""
return self.set_writable(False)
[docs] def set_attribute(self, attributeid, datavalue):
"""
Set an attribute of a node
attributeid is a member of ua.AttributeIds
datavalue is a ua.DataValue object
"""
attr = ua.WriteValue()
attr.NodeId = self.nodeid
attr.AttributeId = attributeid
attr.Value = datavalue
params = ua.WriteParameters()
params.NodesToWrite = [attr]
result = self.server.write(params)
result[0].check()
[docs] def get_attribute(self, attr):
"""
Read one attribute of a node
result code from server is checked and an exception is raised in case of error
"""
rv = ua.ReadValueId()
rv.NodeId = self.nodeid
rv.AttributeId = attr
params = ua.ReadParameters()
params.NodesToRead.append(rv)
result = self.server.read(params)
result[0].StatusCode.check()
return result[0]
[docs] def get_attributes(self, attrs):
"""
Read several attributes of a node
list of DataValue is returned
"""
params = ua.ReadParameters()
for attr in attrs:
rv = ua.ReadValueId()
rv.NodeId = self.nodeid
rv.AttributeId = attr
params.NodesToRead.append(rv)
results = self.server.read(params)
return results
[docs] def get_children(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified):
"""
Get all children of a node. By default hierarchical references and all node classes are returned.
Other reference types may be given:
References = 31
NonHierarchicalReferences = 32
HierarchicalReferences = 33
HasChild = 34
Organizes = 35
HasEventSource = 36
HasModellingRule = 37
HasEncoding = 38
HasDescription = 39
HasTypeDefinition = 40
GeneratesEvent = 41
Aggregates = 44
HasSubtype = 45
HasProperty = 46
HasComponent = 47
HasNotifier = 48
HasOrderedComponent = 49
"""
return self.get_referenced_nodes(refs, ua.BrowseDirection.Forward, nodeclassmask)
[docs] def get_properties(self):
"""
return properties of node.
properties are child nodes with a reference of type HasProperty and a NodeClass of Variable
"""
return self.get_children(refs=ua.ObjectIds.HasProperty, nodeclassmask=ua.NodeClass.Variable)
[docs] def get_variables(self):
"""
return variables of node.
properties are child nodes with a reference of type HasComponent and a NodeClass of Variable
"""
return self.get_children(refs=ua.ObjectIds.HasComponent, nodeclassmask=ua.NodeClass.Variable)
[docs] def get_methods(self):
"""
return methods of node.
properties are child nodes with a reference of type HasComponent and a NodeClass of Method
"""
return self.get_children(refs=ua.ObjectIds.HasComponent, nodeclassmask=ua.NodeClass.Method)
[docs] def get_children_descriptions(self, refs=ua.ObjectIds.HierarchicalReferences, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
return self.get_references(refs, ua.BrowseDirection.Forward, nodeclassmask, includesubtypes)
[docs] def get_encoding_refs(self):
return self.get_referenced_nodes(ua.ObjectIds.HasEncoding, ua.BrowseDirection.Forward)
[docs] def get_description_refs(self):
return self.get_referenced_nodes(ua.ObjectIds.HasDescription, ua.BrowseDirection.Forward)
[docs] def get_references(self, refs=ua.ObjectIds.References, direction=ua.BrowseDirection.Both, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
"""
returns references of the node based on specific filter defined with:
refs = ObjectId of the Reference
direction = Browse direction for references
nodeclassmask = filter nodes based on specific class
includesubtypes = If true subtypes of the reference (ref) are also included
"""
desc = ua.BrowseDescription()
desc.BrowseDirection = direction
desc.ReferenceTypeId = _to_nodeid(refs)
desc.IncludeSubtypes = includesubtypes
desc.NodeClassMask = nodeclassmask
desc.ResultMask = ua.BrowseResultMask.All
desc.NodeId = self.nodeid
params = ua.BrowseParameters()
params.View.Timestamp = ua.get_win_epoch()
params.NodesToBrowse.append(desc)
params.RequestedMaxReferencesPerNode = 0
results = self.server.browse(params)
references = self._browse_next(results)
return references
def _browse_next(self, results):
references = results[0].References
while results[0].ContinuationPoint:
params = ua.BrowseNextParameters()
params.ContinuationPoints = [results[0].ContinuationPoint]
params.ReleaseContinuationPoints = False
results = self.server.browse_next(params)
references.extend(results[0].References)
return references
[docs] def get_referenced_nodes(self, refs=ua.ObjectIds.References, direction=ua.BrowseDirection.Both, nodeclassmask=ua.NodeClass.Unspecified, includesubtypes=True):
"""
returns referenced nodes based on specific filter
Paramters are the same as for get_references
"""
references = self.get_references(refs, direction, nodeclassmask, includesubtypes)
nodes = []
for desc in references:
node = Node(self.server, desc.NodeId)
nodes.append(node)
return nodes
[docs] def get_type_definition(self):
"""
returns type definition of the node.
"""
references = self.get_references(refs=ua.ObjectIds.HasTypeDefinition, direction=ua.BrowseDirection.Forward)
if len(references) == 0:
return None
return references[0].NodeId
[docs] def get_path(self, max_length=20, as_string=False):
"""
Attempt to find path of node from root node and return it as a list of Nodes.
There might several possible paths to a node, this function will return one
Some nodes may be missing references, so this method may
return an empty list
Since address space may have circular references, a max length is specified
"""
path = self._get_path(max_length)
path = [Node(self.server, ref.NodeId) for ref in path]
path.append(self)
if as_string:
path = [el.get_browse_name().to_string() for el in path]
return path
def _get_path(self, max_length=20):
"""
Attempt to find path of node from root node and return it as a list of Nodes.
There might several possible paths to a node, this function will return one
Some nodes may be missing references, so this method may
return an empty list
Since address space may have circular references, a max length is specified
"""
path = []
node = self
while True:
refs = node.get_references(refs=ua.ObjectIds.HierarchicalReferences, direction=ua.BrowseDirection.Inverse)
if len(refs) > 0:
path.insert(0, refs[0])
node = Node(self.server, refs[0].NodeId)
if len(path) >= (max_length -1):
return path
else:
return path
[docs] def get_parent(self):
"""
returns parent of the node.
A Node may have several parents, the first found is returned.
This method uses reverse references, a node might be missing such a link,
thus we will not find its parent.
"""
refs = self.get_references(refs=ua.ObjectIds.HierarchicalReferences, direction=ua.BrowseDirection.Inverse)
if len(refs) > 0:
return Node(self.server, refs[0].NodeId)
else:
return None
[docs] def get_child(self, path):
"""
get a child specified by its path from this node.
A path might be:
* a string representing a qualified name.
* a qualified name
* a list of string
* a list of qualified names
"""
if type(path) not in (list, tuple):
path = [path]
rpath = self._make_relative_path(path)
bpath = ua.BrowsePath()
bpath.StartingNode = self.nodeid
bpath.RelativePath = rpath
result = self.server.translate_browsepaths_to_nodeids([bpath])
result = result[0]
result.StatusCode.check()
# FIXME: seems this method may return several nodes
return Node(self.server, result.Targets[0].TargetId)
def _make_relative_path(self, path):
rpath = ua.RelativePath()
for item in path:
el = ua.RelativePathElement()
el.ReferenceTypeId = ua.TwoByteNodeId(ua.ObjectIds.HierarchicalReferences)
el.IsInverse = False
el.IncludeSubtypes = True
if isinstance(item, ua.QualifiedName):
el.TargetName = item
else:
el.TargetName = ua.QualifiedName.from_string(item)
rpath.Elements.append(el)
return rpath
[docs] def read_raw_history(self, starttime=None, endtime=None, numvalues=0):
"""
Read raw history of a node
result code from server is checked and an exception is raised in case of error
If numvalues is > 0 and number of events in period is > numvalues
then result will be truncated
"""
details = ua.ReadRawModifiedDetails()
details.IsReadModified = False
if starttime:
details.StartTime = starttime
else:
details.StartTime = ua.get_win_epoch()
if endtime:
details.EndTime = endtime
else:
details.EndTime = ua.get_win_epoch()
details.NumValuesPerNode = numvalues
details.ReturnBounds = True
result = self.history_read(details)
result.StatusCode.check()
return result.HistoryData.DataValues
[docs] def history_read(self, details):
"""
Read raw history of a node, low-level function
result code from server is checked and an exception is raised in case of error
"""
valueid = ua.HistoryReadValueId()
valueid.NodeId = self.nodeid
valueid.IndexRange = ''
params = ua.HistoryReadParameters()
params.HistoryReadDetails = details
params.TimestampsToReturn = ua.TimestampsToReturn.Both
params.ReleaseContinuationPoints = False
params.NodesToRead.append(valueid)
result = self.server.history_read(params)[0]
return result
[docs] def read_event_history(self, starttime=None, endtime=None, numvalues=0, evtypes=ua.ObjectIds.BaseEventType):
"""
Read event history of a source node
result code from server is checked and an exception is raised in case of error
If numvalues is > 0 and number of events in period is > numvalues
then result will be truncated
"""
details = ua.ReadEventDetails()
if starttime:
details.StartTime = starttime
else:
details.StartTime = ua.get_win_epoch()
if endtime:
details.EndTime = endtime
else:
details.EndTime = ua.get_win_epoch()
details.NumValuesPerNode = numvalues
if not isinstance(evtypes, (list, tuple)):
evtypes = [evtypes]
evtypes = [Node(self.server, evtype) for evtype in evtypes]
evfilter = events.get_filter_from_event_type(evtypes)
details.Filter = evfilter
result = self.history_read_events(details)
result.StatusCode.check()
event_res = []
for res in result.HistoryData.Events:
event_res.append(events.Event.from_event_fields(evfilter.SelectClauses, res.EventFields))
return event_res
[docs] def history_read_events(self, details):
"""
Read event history of a node, low-level function
result code from server is checked and an exception is raised in case of error
"""
valueid = ua.HistoryReadValueId()
valueid.NodeId = self.nodeid
valueid.IndexRange = ''
params = ua.HistoryReadParameters()
params.HistoryReadDetails = details
params.TimestampsToReturn = ua.TimestampsToReturn.Both
params.ReleaseContinuationPoints = False
params.NodesToRead.append(valueid)
result = self.server.history_read(params)[0]
return result
[docs] def delete(self, delete_references=True, recursive=False):
"""
Delete node from address space
"""
nodes, results = opcua.common.manage_nodes.delete_nodes(self.server, [self], recursive, delete_references)
for r in results:
r.check()
return nodes
def _fill_delete_reference_item(self, rdesc, bidirectional = False):
ditem = ua.DeleteReferencesItem()
ditem.SourceNodeId = self.nodeid
ditem.TargetNodeId = rdesc.NodeId
ditem.ReferenceTypeId = rdesc.ReferenceTypeId
ditem.IsForward = rdesc.IsForward
ditem.DeleteBidirectional = bidirectional
return ditem
[docs] def delete_reference(self, target, reftype, forward=True, bidirectional=True):
"""
Delete given node's references from address space
"""
known_refs = self.get_references(reftype, includesubtypes=False)
targetid = _to_nodeid(target)
for r in known_refs:
if r.NodeId == targetid and r.IsForward == forward:
rdesc = r
break
else:
raise ua.UaStatusCodeError(ua.StatusCodes.BadNotFound)
ditem = self._fill_delete_reference_item(rdesc, bidirectional)
self.server.delete_references([ditem])[0].check()
[docs] def add_reference(self, target, reftype, forward=True, bidirectional=True):
"""
Add reference to node
"""
aitem = ua.AddReferencesItem()
aitem.SourceNodeId = self.nodeid
aitem.TargetNodeId = _to_nodeid(target)
aitem.ReferenceTypeId = _to_nodeid(reftype)
aitem.IsForward = forward
params = [aitem]
if bidirectional:
aitem2 = ua.AddReferencesItem()
aitem2.SourceNodeId = aitem.TargetNodeId
aitem2.TargetNodeId = aitem.SourceNodeId
aitem2.ReferenceTypeId = aitem.ReferenceTypeId
aitem2.IsForward = not forward
params.append(aitem2)
results = self.server.add_references(params)
for r in results:
r.check()
[docs] def set_modelling_rule(self, mandatory):
"""
Add a modelling rule reference to Node.
When creating a new object type, its variable and child nodes will not
be instanciated if they do not have modelling rule
if mandatory is None, the modelling rule is removed
"""
# remove all existing modelling rule
rules = self.get_references(ua.ObjectIds.HasModellingRule)
self.server.delete_references(list(map(self._fill_delete_reference_item, rules)))
# add new modelling rule as requested
if mandatory is not None:
rule = ua.ObjectIds.ModellingRule_Mandatory if mandatory else ua.ObjectIds.ModellingRule_Optional
self.add_reference(rule, ua.ObjectIds.HasModellingRule, True, False)
[docs] def add_folder(self, nodeid, bname):
return opcua.common.manage_nodes.create_folder(self, nodeid, bname)
[docs] def add_object(self, nodeid, bname, objecttype=None):
return opcua.common.manage_nodes.create_object(self, nodeid, bname, objecttype)
[docs] def add_variable(self, nodeid, bname, val, varianttype=None, datatype=None):
return opcua.common.manage_nodes.create_variable(self, nodeid, bname, val, varianttype, datatype)
[docs] def add_object_type(self, nodeid, bname):
return opcua.common.manage_nodes.create_object_type(self, nodeid, bname)
[docs] def add_variable_type(self, nodeid, bname, datatype):
return opcua.common.manage_nodes.create_variable_type(self, nodeid, bname, datatype)
[docs] def add_data_type(self, nodeid, bname, description=None):
return opcua.common.manage_nodes.create_data_type(self, nodeid, bname, description=description)
[docs] def add_property(self, nodeid, bname, val, varianttype=None, datatype=None):
return opcua.common.manage_nodes.create_property(self, nodeid, bname, val, varianttype, datatype)
[docs] def add_method(self, *args):
return opcua.common.manage_nodes.create_method(self, *args)
[docs] def add_reference_type(self, nodeid, bname, symmetric=True, inversename=None):
return opcua.common.manage_nodes.create_reference_type(self, nodeid, bname, symmetric, inversename)
[docs] def call_method(self, methodid, *args):
return opcua.common.methods.call_method(self, methodid, *args)
[docs] def register(self):
"""
Register node for faster read and write access (if supported by server)
Rmw: This call modifies the nodeid of the node, the original nodeid is
available as node.basenodeid
"""
nodeid = self.server.register_nodes([self.nodeid])[0]
self.basenodeid = self.nodeid
self.nodeid = nodeid
[docs] def unregister(self):
if self.basenodeid is None:
return
self.server.unregister_nodes([self.nodeid])
self.nodeid = self.basenodeid
self.basenodeid = None