1 前言

为了应对标准化和跨平台的趋势,为了更好的推广OPC,OPC基金会近些年在之前OPC成功应用的基础上推出了一个新的OPC标准-OPC UA。OPC UA接口协议包含了之前的 A&E, DA,OPC XML DA or HDA,只使用一个地址空间就能访问之前所有的对象,而且不受WINDOWS平台限制,因为它是从传输层Scoket以上来定义的,这点后面会提到,导致了灵活性和安全性比之前的OPC都提升了。
此小结主要针对模拟设备进行对接,若已有真实设备,则可以跳过模拟设备的介绍和安装过程

2 驱动目的

中台使用驱动与OPC UA 进行数据对接及自动化配置。

3 适用范围

opc ua协议的设备,运行环境为python3的sdk

4 模拟工具

模拟工具采用的是prosys-opc-ua-simulation-server,简洁轻便便于理解

4.1 模拟工具的安装

下载地址为Prosys OPC UA Simulation Server官网,填写相关信息进行注册后即可进行下载。下载完成后点击进行安装,按照提示进行操作即可安装完成。

4.2 模拟工具的使用

安装完成后在桌面或者开始菜单最近添加处双击即可打开

Service Status显示“Running”即表示opc模拟是设备服务已经打开

5 对接示例

注意:此使用实例是在python3的情况下运行的,需要根据此文档【快速入门】->【环境搭建】搭建可以运行python3版本的SDK

  • 新增驱动,在SDK的_driver目录下创建一个py文件,命名为dr_OPCUA。
  • 编写代码,将bacnet的驱动代码放入到py文件,完整代码如下:
!coding:utf8
import json
import sys
sys.path.append("..")
from opcua import Client, ua
from driver import *
import time
class OPCua():
    def __init__(self, _driver_instance):
        self.driver_instance = _driver_instance


    def datachange_notification(self, node, val, data):
        print("Python: New data change event", val)



class TemplateDriver(IOTOSDriverI):
    #1、通信初始化
    def InitComm(self,attrs):
        self.setPauseCollect(False)
        self.setCollectingOneCircle(False)
        self.online(True)
        self.apartment = OPCua(self)
        #这里将设备参数写上连接的server地址,这里硬件商或客户应知道opc ua的server地址
        self.client = Client('{}'.format(self.sysAttrs['config']['param']['opcua']), timeout=10)
        self.client.connect()

    #2、采集引擎回调,可也可以开启,也可以直接注释掉(对于主动上报,不存在遍历采集的情况)
    def Collecting(self, dataId):
        try:
            cfgtmp = self.data2attrs[dataId]['config']
            # 过滤掉非采集的点,存在采集的node,则在param中的node写上ns;s或者其他参数
            if cfgtmp["param"] == "" :
                return ()

            # 过滤采集点
            if 'disabled' in cfgtmp['param'] and cfgtmp['param']['disabled'] == True:
                return ()
            else:
                self.warn(self.name(dataId))

            #读取采集点配置参数,这里可将采集点的时间周期缩短
            node = cfgtmp['param']['node']
            print(node)
            #这里的ns是需要去opc ua 查看并填写内容的
            dataId_num = self.client.get_node('{}={};{}={}'.format(cfgtmp['param']['nodeid_name'],cfgtmp['param']['nodeid'],cfgtmp['param']['node_name'],cfgtmp['param']['node']))
            handler = self.apartment
            sub = self.client.create_subscription(500, handler)
            sub.subscribe_data_change(dataId_num)
            ret_nu = dataId_num.get_value()
            return (ret_nu,)
        except Exception as e:
            print(e)




    #3、控制
    #广播事件回调,其他操作访问
    def Event_customBroadcast(self, fromUuid, type, data):
        '''*************************************************
        TODO
        **************************************************'''
        return json.dumps({'code':0, 'msg':'', 'data':''})

    # 4、查询
    # 查询事件回调,数据点查询访问
    def Event_getData(self, dataId, condition):
        '''*************************************************
        TODO
        **************************************************'''
        return json.dumps({'code':0, 'msg':'', 'data':''})

    # 5、控制事件回调,数据点控制访问
    def Event_setData(self, dataId, value):
        '''*************************************************
        TODO
        **************************************************'''
        return json.dumps({'code':0, 'msg':'', 'data':''})

    # 6、本地事件回调,数据点操作访问
    def Event_syncPubMsg(self, point, value):
        '''*************************************************
        TODO
        **************************************************'''
        return json.dumps({'code':0, 'msg':'', 'data':''})
  • 创建模板,进入IOTOS中台,测试账号为iotos_test,密码为iotos123(也可以创建自己的账号),依次点击【系统设备】->【模板驱动】->【我的模板】,点击右上角创建模板,填写模板名称和模板根配置里面的“driver”参数,driver参数格式为python/驱动名称.类名,在param中加入一个参数“opcua”,先设置为””,后续需要使用到。

  • 依次点击【系统设备】 -> 【通信网关】,创建网关

  • 填好网关名称后点击确认

  • 创建设备实例,依次点击【系统设备】 -> 【通信网关】 -> 【设备实例】->【创建设备】

  • 填写【设备名称】、选择刚刚创建的【模板驱动】和【通信网关】。

  • 配置opcua的值,在刚才的模拟工具中找到Connection Address,点击右侧的复制键,复制完成后填入到opcua参数中,即完成设备的创建

  • 创建数据点,点击【系统设备】 -> 【通信网关】 -> 【设备实例】 -> 【数据点】,并在【设备实例】下拉列表选择刚刚创建的设备实例

  • 删除系统自带的四个数据点,然后点击右边的创建数据点,填写名称,选择数据类型,点击确认。

  • 这里我们创建了两个数据点,数据点名称任意,类型也可以为string,但是需要在高级配置中进行相关的设置。

  • 高级配置的说明。

    两个数据点一样,各自对应不同的节点

    至此,中台的数据点创建完成

  • 创建运行脚本,在SDK的_example目录下创建一个运行脚本(.bat)文件,名称随意

    创建完成后在其中填入运行内容

    查看网关的uuid如下:

  • 安装驱动运行的依赖包
    打开cmd,命令行输入
    pip install opcua -i https://pypi.tuna.tsinghua.edu.cn/simple/
    若电脑同时装有python2和python3,则输入
    pip3 install opcua -i https://pypi.tuna.tsinghua.edu.cn/simple/
    安装成功准备即可运行脚本

  • 运行脚本。双击刚才创建的bat文件或者在cmd中进入到_example目录,运行bat文件

  • 运行结果如下:

  • 中台查看运行结果

    可以看到,刚才的模拟软件中的值已经上传至中台(两个模拟数据点均为随机数,在不停的改变)

6 驱动解析

  • 导入SDK所需要的依赖包和本驱动所需要的opcua包
import json
import sys
sys.path.append("..")
from opcua import Client, ua
from driver import *
import time
  • 定义一个opcua类,用于打印相关信息
class OPCua():
    def __init__(self, _driver_instance):
        self.driver_instance = _driver_instance


    def datachange_notification(self, node, val, data):
        print("Python: New data change event", val)
  • 驱动通讯初始化,获取中台相关的参数,用于opcua的连接
class TemplateDriver(IOTOSDriverI):
    #1、通信初始化
    def InitComm(self,attrs):
        self.setPauseCollect(False)
        self.setCollectingOneCircle(False)
        self.online(True)
        self.apartment = OPCua(self)
        #这里将设备参数写上连接的server地址,这里硬件商或客户应知道opc ua的server地址
        self.client = Client('{}'.format(self.sysAttrs['config']['param']['opcua']), timeout=10)
        self.client.connect()
  • opcua连接成功后,根据中台数据点的高级配置,通过连接的opcua获取相应的nodeid的值并上传至中台
    #2、采集引擎回调,可也可以开启,也可以直接注释掉(对于主动上报,不存在遍历采集的情况)
    def Collecting(self, dataId):
        try:
            cfgtmp = self.data2attrs[dataId]['config']
            # 过滤掉非采集的点,存在采集的node,则在param中的node写上ns;s或者其他参数
            if cfgtmp["param"] == "" :
                return ()

            # 过滤采集点
            if 'disabled' in cfgtmp['param'] and cfgtmp['param']['disabled'] == True:
                return ()
            else:
                self.warn(self.name(dataId))

            #读取采集点配置参数,这里可将采集点的时间周期缩短
            node = cfgtmp['param']['node']
            print(node)
            #这里的ns是需要去opc ua 查看并填写内容的
            dataId_num = self.client.get_node('{}={};{}={}'.format(cfgtmp['param']['nodeid_name'],cfgtmp['param']['nodeid'],cfgtmp['param']['node_name'],cfgtmp['param']['node']))
            handler = self.apartment
            sub = self.client.create_subscription(500, handler)
            sub.subscribe_data_change(dataId_num)
            ret_nu = dataId_num.get_value()
            return (ret_nu,)
        except Exception as e:
            print(e)
作者:IOTOS  创建时间:2022-01-18 16:10
最后编辑:admin  更新时间:2023-11-29 09:38