OPCUA-Python实例

 更新时间:2024年02月23日 10:18:47   作者:笨笨D幸福  
这篇文章主要介绍了OPCUA-Python实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

OPCUA-Python

pip install opcua

Server

from threading import Thread
import copy
import logging
from datetime import datetime
import time
from math import sin
import sys
sys.path.insert(0, "..")

try:
    from IPython import embed
except ImportError:
    import code

    def embed():
        myvars = globals()
        myvars.update(locals())
        shell = code.InteractiveConsole(myvars)
        shell.interact()


from opcua import ua, uamethod, Server


class SubHandler(object):

    """
    Subscription Handler. To receive events from server for a subscription
    """

    def datachange_notification(self, node, val, data):
        print("Python: New data change event", node, val)

    def event_notification(self, event):
        print("Python: New event", event)


# method to be exposed through server

def func(parent, variant):
    ret = False
    if variant.Value % 2 == 0:
        ret = True
    return [ua.Variant(ret, ua.VariantType.Boolean)]


# method to be exposed through server
# uses a decorator to automatically convert to and from variants

@uamethod
def multiply(parent, x, y):
    print("multiply method call with parameters: ", x, y)
    return x * y


class VarUpdater(Thread):
    def __init__(self, var):
        Thread.__init__(self)
        self._stopev = False
        self.var = var

    def stop(self):
        self._stopev = True

    def run(self):
        while not self._stopev:
            v = sin(time.time() / 10)
            self.var.set_value(v)
            time.sleep(0.1)



if __name__ == "__main__":
    # optional: setup logging
    logging.basicConfig(level=logging.WARN)
    #logger = logging.getLogger("opcua.address_space")
    # logger.setLevel(logging.DEBUG)
    #logger = logging.getLogger("opcua.internal_server")
    # logger.setLevel(logging.DEBUG)
    #logger = logging.getLogger("opcua.binary_server_asyncio")
    # logger.setLevel(logging.DEBUG)
    #logger = logging.getLogger("opcua.uaprocessor")
    # logger.setLevel(logging.DEBUG)

    # now setup our server
    server = Server()
    #server.disable_clock()
    #server.set_endpoint("opc.tcp://localhost:4840/freeopcua/server/")
    server.set_endpoint("opc.tcp://0.0.0.0:4841/freeopcua/server/")
    server.set_server_name("FreeOpcUa Example Server")
    # set all possible endpoint policies for clients to connect through
    server.set_security_policy([
        ua.SecurityPolicyType.NoSecurity,
        # ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
        # ua.SecurityPolicyType.Basic256Sha256_Sign
    ])

    # setup our own namespace
    uri = "http://examples.freeopcua.github.io"
    idx = server.register_namespace(uri)

    # create a new node type we can instantiate in our address space
    dev = server.nodes.base_object_type.add_object_type(0, "MyDevice")
    dev.add_variable(0, "sensor1", 1.0).set_modelling_rule(True)
    dev.add_property(0, "device_id", "0340").set_modelling_rule(True)
    ctrl = dev.add_object(0, "controller")
    ctrl.set_modelling_rule(True)
    ctrl.add_property(0, "state", "Idle").set_modelling_rule(True)

    # populating our address space

    # First a folder to organise our nodes
    myfolder = server.nodes.objects.add_folder(idx, "myEmptyFolder")
    # instanciate one instance of our device
    mydevice = server.nodes.objects.add_object(idx, "Device0001", dev)
    mydevice_var = mydevice.get_child(["0:controller", "0:state"])  # get proxy to our device state variable 
    # create directly some objects and variables
    myobj = server.nodes.objects.add_object(idx, "MyObject")
    myvar = myobj.add_variable(idx, "MyVariable", 6.7)
    mysin = myobj.add_variable(idx, "MySin", 0, ua.VariantType.Float)
    myvar.set_writable()    # Set MyVariable to be writable by clients
    mystringvar = myobj.add_variable(idx, "MyStringVariable", "Really nice string")
    mystringvar.set_writable()    # Set MyVariable to be writable by clients
    mydtvar = myobj.add_variable(idx, "MyDateTimeVar", datetime.utcnow())
    mydtvar.set_writable()    # Set MyVariable to be writable by clients
    myarrayvar = myobj.add_variable(idx, "myarrayvar", [6.7, 7.9])
    myarrayvar = myobj.add_variable(idx, "myStronglytTypedVariable", ua.Variant([], ua.VariantType.UInt32))
    myprop = myobj.add_property(idx, "myproperty", "I am a property")
    mymethod = myobj.add_method(idx, "mymethod", func, [ua.VariantType.Int64], [ua.VariantType.Boolean])
    multiply_node = myobj.add_method(idx, "multiply", multiply, [ua.VariantType.Int64, ua.VariantType.Int64], [ua.VariantType.Int64])

    # import some nodes from xml
    # server.import_xml("custom_nodes.xml")

    # creating a default event object
    # The event object automatically will have members for all events properties
    # you probably want to create a custom event type, see other examples
    myevgen = server.get_event_generator()
    myevgen.event.Severity = 300

    # starting!
    server.start()
    print("Available loggers are: ", logging.Logger.manager.loggerDict.keys())
    vup = VarUpdater(mysin)  # just  a stupide class update a variable
    vup.start()
    try:
        # enable following if you want to subscribe to nodes on server side
        #handler = SubHandler()
        #sub = server.create_subscription(500, handler)
        #handle = sub.subscribe_data_change(myvar)
        # trigger event, all subscribed clients wil receive it
        var = myarrayvar.get_value()  # return a ref to value in db server side! not a copy!
        var = copy.copy(var)  # WARNING: we need to copy before writting again otherwise no data change event will be generated
        var.append(9.3)
        myarrayvar.set_value(var)
        mydevice_var.set_value("Running")
        myevgen.trigger(message="This is BaseEvent")
        server.set_attribute_value(myvar.nodeid, ua.DataValue(9.9))  # Server side write method which is a but faster than using set_value

        embed()
    finally:
        vup.stop()
        server.stop()

这个服务程序演示了opcua服务端,几乎所有的功能,其中Event部分没有不断发送,所以仅供参考。

下图展示了server端的对象结构

客户端

from IPython import embed
from opcua import Client

class SubHandler(object):
    def event_notification(self, event):
        print("Event:", event.EventId, event.Time, event.proper_random, event.Message.Text)

def main_c():
    url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"
    c = Client(url)
    try:
        c.connect()
        root = c.get_root_node()
        embed()
    except Exception as e:
        print("Client Exception:", e)
    finally:
        c.disconnect()
        
if __name__ == "__main__":
    main_c()

客户端遍历流程

  • 1.获取root下的Objects节点的所有子节点,一般类型都为Object,取一代表为A
  • 2.获取A的NodeClass(一般为Object)和TypeDefinition,其中NodeId需要与路径["0:Types","0:ObjectTypes","0:BaseObjectType"]下的类型对照(自定义类型也在内)。
  • 3.继续以A为根,遍历子节点,如果时Object继续2,如果是Variable,NodeId对照["0:Types","0:VariableTypes","0:BaseVariableType"],可以判断是变量(variable,63)还是属性(property,65)。
  • 4.对于Method使用a_root.call_method(a, arg1)调用。对于Variable使用get_value/get_data_value()获取存储的值。如果是Object继续2.
  • 5.对于Variable使用access_level获取是否有写权限,如果有set_value可以设置值。

一个比较完美的遍历客户端

from opcua import Client, ua


def brower_child(root):
    """
    递归调用遍历,格式化不好做,有深度问题
    """
    name = root.get_node_class().name
    # print(name)
    if name == "Object":
        brower_obj(root)
        for c in root.get_children():
            print("  ", end='')
            brower_child(c)
    elif name == 'Variable':
        brower_var(root)
    else:
        brower_method(root)


class CurState():
    def __init__(self, parent=None, p=None, d=0):
        self.parent = parent  # unused
        self.p = p
        self.d = d


def brower_child2(root, max_d=-1, ignore=[]):
    """
    栈+循环遍历,非常好用
    """
    stack = [CurState(None, root, 0)]
    while len(stack):
        cur = stack.pop()
        name = cur.p.get_node_class().name

        print(''.join(['  ' for i in range(cur.d)]), end="")

        if cur.p.get_browse_name().Name in ignore:
            continue

        if name == "Object":
            brower_obj(cur.p)
            if max_d > 0 and cur.d >= max_d:
                continue
            for c in cur.p.get_children():
                stack.append(CurState(cur.p, c, cur.d+1))

        elif name == 'Variable':
            brower_var(cur.p)
        else:
            brower_method(cur.p)


def brower_obj(v):
    # print(v.get_browse_name())
    rw = 'R '
    bname = v.get_browse_name()
    print("*%2d:%-30s (%-2s, %-23s)" %
          (bname.NamespaceIndex, bname.Name, rw, "Object"))


def brower_var(v):
    # print(v.get_browse_name())
    rw = 'R '
    if ua.AccessLevel.CurrentWrite in v.get_access_level():
        rw = "RW"
    bname = v.get_browse_name()
    tv = v.get_data_value().Value
    v_show = tv.Value
    if len(str(v_show)) > 1024:
        v_show = str(v_show[:56]) + "..."
    print("-%2d:%-30s (%-2s, %-23s) =>" %
          (bname.NamespaceIndex, bname.Name, rw, tv.VariantType), v_show)


def brower_method(v):
    # print(v.get_description())
    rw = 'C '
    bname = v.get_browse_name()
    # args = []
    # for a in v.get_properties():
    #     dt = a.get_data_type().NodeIdType.name
    #     args.append(dt)
    print("@%2d:%-30s (%-2s, %-23s)" %
          (bname.NamespaceIndex, bname.Name, rw, "Method"))


def main_c():
    url = "opc.tcp://127.0.0.1:4841/freeopcua/server/"
    c = Client(url)
    try:
        c.connect()
        root = c.get_root_node()
        print("\r\nBrower:")
        brower_child2(root.get_child(["0:Objects"]), -1, ["Server"])
    except Exception as e:
        print("Client Exception:", e)
    finally:
        c.disconnect()


if __name__ == "__main__":
    main_c()

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 详解model.train()和model.eval()两种模式的原理与用法

    详解model.train()和model.eval()两种模式的原理与用法

    这篇文章主要介绍了详解model.train()和model.eval()两种模式的原理与用法,相信很多没有经验的人对此束手无策,那么看完这篇文章一定会对你有所帮助
    2023-03-03
  • 用 Python 定义 Schema 并生成 Parquet 文件详情

    用 Python 定义 Schema 并生成 Parquet 文件详情

    本文将演示两个例子,一个是没有层级的两个字段,另一个是含于嵌套级别的字段,将要使用到的 Python 模块有 pandas 和 pyarrow,感兴趣是我小伙伴请和小编一起学习下面文章内容吧
    2021-09-09
  • python中使用you-get库批量在线下载bilibili视频的教程

    python中使用you-get库批量在线下载bilibili视频的教程

    这篇文章主要介绍了使用python中you-get库批量在线下载bilibili视频的方法,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-03-03
  • Django中Middleware中的函数详解

    Django中Middleware中的函数详解

    这篇文章主要介绍了Django中Middleware中的函数详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-07-07
  • Python实现随机游走的示例代码

    Python实现随机游走的示例代码

    随机游走是一个数学对象,称为随机或随机过程,它描述了一条路径,该路径由一些数学空间上的一系列随机步骤组成,下面我们就来学习一下Python如何实现随机游走的吧
    2023-12-12
  • Python + Chrome抓取AJAX动态数据的两种方法

    Python + Chrome抓取AJAX动态数据的两种方法

    在现代 Web 开发中,AJAX技术被广泛应用于动态加载数据,使得网页能够在不刷新的情况下更新内容,本文将详细介绍 Python + Chrome 如何抓取 AJAX 动态数据,并提供两种方法的完整实现代码,需要的朋友可以参考下
    2025-04-04
  • Pycharm导入Python包,模块的图文教程

    Pycharm导入Python包,模块的图文教程

    今天小编就为大家分享一篇Pycharm导入Python包,模块的图文教程,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-06-06
  • python multiprocessing 多进程并行计算的操作

    python multiprocessing 多进程并行计算的操作

    这篇文章主要介绍了python multiprocessing 多进程并行计算的操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-03-03
  • 使用Pytorch如何完成多分类问题

    使用Pytorch如何完成多分类问题

    这篇文章主要介绍了使用Pytorch如何完成多分类问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02
  • Pytest Fixture参数讲解及使用

    Pytest Fixture参数讲解及使用

    这篇文章主要介绍了Pytest之Fixture参数详解及使用,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2023-01-01

最新评论