"""
Internal server implementing opcu-ua interface.
Can be used on server side or to implement binary/https opc-ua servers
"""
from datetime import datetime, timedelta
from copy import copy
import os
import logging
from threading import Lock
from enum import Enum
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
from opcua import ua
from opcua.common import utils
from opcua.common.callback import (CallbackType, ServerItemCallback,
CallbackDispatcher)
from opcua.common.node import Node
from opcua.server.history import HistoryManager
from opcua.server.address_space import AddressSpace
from opcua.server.address_space import AttributeService
from opcua.server.address_space import ViewService
from opcua.server.address_space import NodeManagementService
from opcua.server.address_space import MethodService
from opcua.server.subscription_service import SubscriptionService
from opcua.server.discovery_service import LocalDiscoveryService
from opcua.server.standard_address_space import standard_address_space
from opcua.server.user_manager import UserManager
#from opcua.common import xmlimporter
[docs]class SessionState(Enum):
Created = 0
Activated = 1
Closed = 2
[docs]class InternalServer(object):
def __init__(self, shelffile=None, user_manager=None, session_cls=None):
self.logger = logging.getLogger(__name__)
self.server_callback_dispatcher = CallbackDispatcher()
self.endpoints = []
self._channel_id_counter = 5
self.disabled_clock = False # for debugging we may want to disable clock that writes too much in log
self._local_discovery_service = None # lazy-loading
self.aspace = AddressSpace()
self.attribute_service = AttributeService(self.aspace)
self.view_service = ViewService(self.aspace)
self.method_service = MethodService(self.aspace)
self.node_mgt_service = NodeManagementService(self.aspace)
self.load_standard_address_space(shelffile)
self.loop = None
self.asyncio_transports = []
self.subscription_service = SubscriptionService(self.aspace)
self.history_manager = HistoryManager(self)
self.user_manager = user_manager
# create a session to use on server side
self.session_cls = session_cls or InternalSession
self.isession = self.session_cls(self, self.aspace, \
self.subscription_service, "Internal", user=UserManager.User.Admin)
self.server_status_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus))
self.current_time_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_CurrentTime))
self._address_space_fixes()
self.setup_nodes()
@property
def thread_loop(self):
if self.loop is None:
raise Exception("InternalServer stopped: async threadloop is not running.")
return self.loop
@property
def local_discovery_service(self):
if self._local_discovery_service is None:
self._local_discovery_service = LocalDiscoveryService(parent = self)
for edp in self.endpoints:
srvDesc = LocalDiscoveryService.ServerDescription(edp.Server)
self._local_discovery_service.add_server_description(srvDesc)
return self._local_discovery_service
[docs] def setup_nodes(self):
"""
Set up some nodes as defined by spec
"""
uries = ["http://opcfoundation.org/UA/"]
ns_node = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_NamespaceArray))
ns_node.set_value(uries)
[docs] def load_standard_address_space(self, shelffile=None):
if (shelffile is not None) and (os.path.isfile(shelffile) or os.path.isfile(shelffile+".db")):
# import address space from shelf
self.aspace.load_aspace_shelf(shelffile)
else:
# import address space from code generated from xml
standard_address_space.fill_address_space(self.node_mgt_service)
# import address space directly from xml, this has performance impact so disabled
# importer = xmlimporter.XmlImporter(self.node_mgt_service)
# importer.import_xml("/path/to/python-opcua/schemas/Opc.Ua.NodeSet2.xml", self)
# if a cache file was supplied a shelve of the standard address space can now be built for next start up
if shelffile:
self.aspace.make_aspace_shelf(shelffile)
def _address_space_fixes(self):
"""
Looks like the xml definition of address space has some error. This is a good place to fix them
"""
it = ua.AddReferencesItem()
it.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseObjectType)
it.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
it.IsForward = False
it.TargetNodeId = ua.NodeId(ua.ObjectIds.ObjectTypesFolder)
it.TargetNodeClass = ua.NodeClass.Object
it2 = ua.AddReferencesItem()
it2.SourceNodeId = ua.NodeId(ua.ObjectIds.BaseDataType)
it2.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes)
it2.IsForward = False
it2.TargetNodeId = ua.NodeId(ua.ObjectIds.DataTypesFolder)
it2.TargetNodeClass = ua.NodeClass.Object
results = self.isession.add_references([it, it2])
params = ua.WriteParameters()
for nodeid in (ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRead,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadData,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryReadEvents,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerWrite,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateData,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerHistoryUpdateEvents,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerMethodCall,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerBrowse,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerRegisterNodes,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerTranslateBrowsePathsToNodeIds,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxNodesPerNodeManagement,
ua.ObjectIds.Server_ServerCapabilities_OperationLimits_MaxMonitoredItemsPerCall):
attr = ua.WriteValue()
attr.NodeId = ua.NodeId(nodeid)
attr.AttributeId = ua.AttributeIds.Value
attr.Value = ua.DataValue(ua.Variant(10000, ua.VariantType.UInt32), ua.StatusCode(ua.StatusCodes.Good))
attr.Value.ServerTimestamp = datetime.utcnow()
params.NodesToWrite.append(attr)
result = self.isession.write(params)
result[0].check()
[docs] def load_address_space(self, path):
"""
Load address space from path
"""
self.aspace.load(path)
[docs] def dump_address_space(self, path):
"""
Dump current address space to path
"""
self.aspace.dump(path)
[docs] def start(self):
self.logger.info("starting internal server")
self.loop = utils.ThreadLoop()
self.loop.start()
self.subscription_service.set_loop(self.loop)
serverState = Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_State))
serverState.set_value(ua.uaprotocol_auto.ServerState.Running, ua.VariantType.Int32)
Node(self.isession, ua.NodeId(ua.ObjectIds.Server_ServerStatus_StartTime)).set_value(datetime.utcnow())
if not self.disabled_clock:
self._set_current_time()
[docs] def stop(self):
self.logger.info("stopping internal server")
self.isession.close_session()
self.subscription_service.set_loop(None)
self.history_manager.stop()
if self.loop:
self.loop.stop()
# wait for ThreadLoop to finish before proceeding
self.loop.join()
self.loop.close()
self.loop = None
[docs] def is_running(self):
return self.loop is not None
def _set_current_time(self):
self.current_time_node.set_value(datetime.utcnow())
ssdata = self.server_status_node.get_value()
ssdata.CurrentTime = datetime.utcnow()
self.server_status_node.set_value(ssdata)
self.loop.call_later(1, self._set_current_time)
[docs] def get_new_channel_id(self):
self._channel_id_counter += 1
return self._channel_id_counter
[docs] def add_endpoint(self, endpoint):
self.endpoints.append(endpoint)
[docs] def get_endpoints(self, params=None, sockname=None):
self.logger.info("get endpoint")
if sockname:
# return to client the ip address it has access to
edps = []
for edp in self.endpoints:
edp1 = copy(edp)
url = urlparse(edp1.EndpointUrl)
url = url._replace(netloc=sockname[0] + ":" + str(sockname[1]))
edp1.EndpointUrl = url.geturl()
edps.append(edp1)
return edps
return self.endpoints[:]
[docs] def create_session(self, name, user=UserManager.User.Anonymous):
return self.session_cls(self, self.aspace, self.subscription_service, name, user=user)
[docs] def enable_history_data_change(self, node, period=timedelta(days=7), count=0):
"""
Set attribute Historizing of node to True and start storing data for history
"""
node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(True))
node.set_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
node.set_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
self.history_manager.historize_data_change(node, period, count)
[docs] def disable_history_data_change(self, node):
"""
Set attribute Historizing of node to False and stop storing data for history
"""
node.set_attribute(ua.AttributeIds.Historizing, ua.DataValue(False))
node.unset_attr_bit(ua.AttributeIds.AccessLevel, ua.AccessLevel.HistoryRead)
node.unset_attr_bit(ua.AttributeIds.UserAccessLevel, ua.AccessLevel.HistoryRead)
self.history_manager.dehistorize(node)
[docs] def enable_history_event(self, source, period=timedelta(days=7), count=0):
"""
Set attribute History Read of object events to True and start storing data for history
"""
event_notifier = source.get_event_notifier()
if ua.EventNotifier.SubscribeToEvents not in event_notifier:
raise ua.UaError("Node does not generate events", event_notifier)
if ua.EventNotifier.HistoryRead not in event_notifier:
event_notifier.add(ua.EventNotifier.HistoryRead)
source.set_event_notifier(event_notifier)
self.history_manager.historize_event(source, period, count)
[docs] def disable_history_event(self, source):
"""
Set attribute History Read of node to False and stop storing data for history
"""
source.unset_attr_bit(ua.AttributeIds.EventNotifier, ua.EventNotifier.HistoryRead)
self.history_manager.dehistorize(source)
[docs] def subscribe_server_callback(self, event, handle):
"""
Create a subscription from event to handle
"""
self.server_callback_dispatcher.addListener(event, handle)
[docs] def unsubscribe_server_callback(self, event, handle):
"""
Remove a subscription from event to handle
"""
self.server_callback_dispatcher.removeListener(event, handle)
[docs] def set_attribute_value(self, nodeid, datavalue, attr=ua.AttributeIds.Value):
"""
directly write datavalue to the Attribute, bypasing some checks and structure creation
so it is a little faster
"""
self.aspace.set_attribute_value(nodeid, ua.AttributeIds.Value, datavalue)
[docs]class InternalSession(object):
_counter = 10
_auth_counter = 1000
def __init__(self, internal_server, aspace, submgr, name, user=UserManager.User.Anonymous):
self.logger = logging.getLogger(__name__)
self.iserver = internal_server
self.aspace = aspace
self.subscription_service = submgr
self.name = name
self.user = user
self.nonce = None
self.state = SessionState.Created
self.session_id = ua.NodeId(self._counter)
InternalSession._counter += 1
self.authentication_token = ua.NodeId(self._auth_counter)
InternalSession._auth_counter += 1
self.subscriptions = []
self.logger.info("Created internal session %s", self.name)
self._lock = Lock()
@property
def user_manager(self):
return self.iserver.user_manager
def __str__(self):
return "InternalSession(name:{0}, user:{1}, id:{2}, auth_token:{3})".format(
self.name, self.user, self.session_id, self.authentication_token)
[docs] def get_endpoints(self, params=None, sockname=None):
return self.iserver.get_endpoints(params, sockname)
[docs] def create_session(self, params, sockname=None):
self.logger.info("Create session request")
result = ua.CreateSessionResult()
result.SessionId = self.session_id
result.AuthenticationToken = self.authentication_token
result.RevisedSessionTimeout = params.RequestedSessionTimeout
result.MaxRequestMessageSize = 65536
self.nonce = utils.create_nonce(32)
result.ServerNonce = self.nonce
result.ServerEndpoints = self.get_endpoints(sockname=sockname)
return result
[docs] def close_session(self, delete_subs=True):
self.logger.info("close session %s with subscriptions %s", self, self.subscriptions)
self.state = SessionState.Closed
self.delete_subscriptions(self.subscriptions[:])
[docs] def activate_session(self, params):
self.logger.info("activate session")
result = ua.ActivateSessionResult()
if self.state != SessionState.Created:
raise utils.ServiceError(ua.StatusCodes.BadSessionIdInvalid)
self.nonce = utils.create_nonce(32)
result.ServerNonce = self.nonce
for _ in params.ClientSoftwareCertificates:
result.Results.append(ua.StatusCode())
self.state = SessionState.Activated
id_token = params.UserIdentityToken
if isinstance(id_token, ua.UserNameIdentityToken):
if self.user_manager.check_user_token(self, id_token) == False:
raise utils.ServiceError(ua.StatusCodes.BadUserAccessDenied)
self.logger.info("Activated internal session %s for user %s", self.name, self.user)
return result
[docs] def read(self, params):
results = self.iserver.attribute_service.read(params)
return results
[docs] def history_read(self, params):
return self.iserver.history_manager.read_history(params)
[docs] def write(self, params):
return self.iserver.attribute_service.write(params, self.user)
[docs] def browse(self, params):
return self.iserver.view_service.browse(params)
[docs] def translate_browsepaths_to_nodeids(self, params):
return self.iserver.view_service.translate_browsepaths_to_nodeids(params)
[docs] def add_nodes(self, params):
return self.iserver.node_mgt_service.add_nodes(params, self.user)
[docs] def delete_nodes(self, params):
return self.iserver.node_mgt_service.delete_nodes(params, self.user)
[docs] def add_references(self, params):
return self.iserver.node_mgt_service.add_references(params, self.user)
[docs] def delete_references(self, params):
return self.iserver.node_mgt_service.delete_references(params, self.user)
[docs] def add_method_callback(self, methodid, callback):
return self.aspace.add_method_callback(methodid, callback)
[docs] def call(self, params):
return self.iserver.method_service.call(params)
[docs] def create_subscription(self, params, callback, ready_callback=None):
result = self.subscription_service.create_subscription(params, callback)
with self._lock:
self.subscriptions.append(result.SubscriptionId)
return result
[docs] def modify_subscription(self, params, callback):
return self.subscription_service.modify_subscription(params, callback)
[docs] def create_monitored_items(self, params):
subscription_result = self.subscription_service.create_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(
CallbackType.ItemSubscriptionCreated, ServerItemCallback(params, subscription_result))
return subscription_result
[docs] def modify_monitored_items(self, params):
subscription_result = self.subscription_service.modify_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(
CallbackType.ItemSubscriptionModified, ServerItemCallback(params, subscription_result))
return subscription_result
[docs] def republish(self, params):
return self.subscription_service.republish(params)
[docs] def delete_subscriptions(self, ids):
for i in ids:
with self._lock:
if i in self.subscriptions:
self.subscriptions.remove(i)
return self.subscription_service.delete_subscriptions(ids)
[docs] def delete_monitored_items(self, params):
subscription_result = self.subscription_service.delete_monitored_items(params)
self.iserver.server_callback_dispatcher.dispatch(
CallbackType.ItemSubscriptionDeleted, ServerItemCallback(params, subscription_result))
return subscription_result
[docs] def publish(self, acks=None):
if acks is None:
acks = []
return self.subscription_service.publish(acks)