1 前言

BACnet是用于智能建筑的通信协议,是国标标准化组织(ISO)、美国国家标准协会(ANSI)及美国采暖、制冷与空调工程师学会(ASHRAE)定义的通信协议。BACnet针对智能建筑及控制系统的应用所设计的通信,可用在暖通空调系统(HVAC,包括暖气、通风、空气调节),也可以用在照明控制、门禁系统、火警侦测系统及其相关的设备。优点在于能降低维护系统所需成本并且安装比一般工业通信协议更为简易,而且提供有五种业界常用的标准协议,此可防止设备供应商及系统业者的垄断,也因此未来系统扩展性与兼容性大为增加。

此小结主要针对模拟设备进行对接,若已有真实设备,则可以跳过模拟设备的介绍和安装过程

2 驱动目的

将BACnet协议设备的数据拿到并上传至云上

3 适用范围

BACnet协议设备,运行环境为python3的sdk

4 模拟工具

模拟工具采用的是Yabe设备模拟器,具备服务端和客服端两种,下面主要介绍服务端

4.1 Yabe的安装

下载地址为IOT工具,找到SetupYabe_v1.1.9.zip工具,解压后安装步骤安装即可。

4.2 Yabe的使用

安装成功后,点击安装好后的Yabe下的太阳的图标

运行后即可看到

此时表示bacnet的模拟设备已经打开(不需要关闭,一直挂在下面就行)

5 对接示例

注意:此使用实例是在python3的情况下运行的,需要根据环境搭建搭建可以运行python3版本的SDK

  • 新增驱动,在SDK的_driver目录下创建一个py文件,命名为dr_bacnet。

  • 编写代码,将bacnet的驱动代码放入到py文件,完整代码如下:

#coding=utf-8
import sys
sys.path.append("..")
import BAC0
from driver import *

class Bacnet(IOTOSDriverI):
    def InitComm(self,attrs):
        self.setPauseCollect(False)
        self.setCollectingOneCircle=True
        self.online(True)

        #建立连接并且在通路里搜索bacnet设备的ip和设备id
        try:
            self.bacnet=BAC0.connect()
            self.bacnet.whois()

            # 搜索局域网内的bacnet协议设备并且打印出来
            for each in self.bacnet.discoveredDevices:
                deviceName = (self.bacnet.read('%s device %s objectName' % (each[0], each[1])))
                self.deviceAddr = each[0]
                self.debug('Found device : %s at address %s' % (deviceName, self.deviceAddr))
                # 打印设备地址为deviceAddr 的objectList property 前十个
                read_pro = self.deviceAddr + ' device 3 objectList'
                self.debug(self.bacnet.read(read_pro)[:10])

        except Exception as e:
            self.bacnet.disconnect()

    def Collecting(self,dataId):
        try:
            cfgtmp = self.data2attrs[dataId]['config']
            #过滤掉非采集点
            if cfgtmp["param"] == "":
                return ()

            # 过滤采集点
            if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:
                return ()
            else:
                self.debug(self.name(dataId))

            #获取用于数据下发的点
            if  'private' in cfgtmp['param'] and cfgtmp['param']['private']=='write' and 'num' in cfgtmp['param']:
                if "memoryvalue" not in self.data2attrs[dataId]:
                    return ('请下发',)
                else:
                    return (self.data2attrs[dataId]["memoryvalue"],)

            #上传数据点
            if 'objectName' in cfgtmp['param'] and 'num' in cfgtmp['param']:
                data_val = self.deviceAddr + ' ' + str(cfgtmp['param']['objectName'])+' '+str(cfgtmp['param']['num'])+' '+'objectName description presentValue units'
                #self.debug(data_val)
                #读取bacnet设备中属性的值
                data = self.bacnet.readMultiple(data_val)
                return (str(data[2]),)

        except Exception as e:
            # 连接会一定时间后断开,需要再次开启
            self.bacnet.disconnect()
            self.bacnet = BAC0.connect()
            self.bacnet.whois()

        return ()

    def Event_setData(self, dataId, value):
        #更改bacnet里面属性的值,一般只能是analoValue属性
        if 'private' in self.data2attrs[dataId]['config']['param']:
            if self.data2attrs[dataId]['config']['param']['private']== 'write':
                data_wri=self.deviceAddr+' '+'analogValue'+' '+ str(self.data2attrs[dataId]['config']['param']['num']) +' presentValue ' + str(value)
                #self.debug(data_wri)
                self.bacnet.write(data_wri)
                self.setValue(self.name(dataId), value)

        return json.dumps({'code': 0, 'msg': '', 'data': ''})
  • 创建模板,进入IOTOS中台,测试账号为iotos_test,密码为iotos123(也可以创建自己的账号),依次点击【系统设备】->【模板驱动】->【我的模板】,点击右上角创建模板,填写模板名称和模板根配置里面的“driver”参数,driver参数格式为python/驱动名称.类名。

  • 依次点击【系统设备】 -> 【通信网关】,创建网关

  • 填好网关名称后点击确认

  • 创建设备实例,依次点击【系统设备】 -> 【通信网关】 -> 【设备实例】->【创建设备】

  • 填写【设备名称】、选择刚刚创建的【模板驱动】和【通信网关】。

  • 创建数据点,点击【系统设备】 -> 【通信网关】 -> 【设备实例】 -> 【数据点】,并在【设备实例】下拉列表选择刚刚创建的设备实例

  • 删除系统自带的四个数据点,然后点击右边的创建数据点,填写名称

  • 并在高级配置中配置数据点的相关标志,objectName类型的数据在私有属性“objectName”中填入其名称,并在“num”属性中加入序号;可以用于修改的数据点则在私有属性“private”中填入“write”,并用”num”表示需要修改的objectName类型数据点的序号,例如:

  • 以下是我创建的数据点,仅供参考:


    各数据点的高级配置依次是:




    也可以配置其他的objectName和num参数,这个可以根据设备本身存储数据点的位置来看。后续会详细讲解
    至此,中台的数据点创建完成

  • 创建运行脚本,在SDK的_example目录下创建一个运行脚本(.bat)文件,名称随意

    创建完成后在其中填入运行内容

  • 安装运行bacnet驱动所需要的安装包
    打开cmd,命令行输入
    pip install BAC0 -i https://pypi.tuna.tsinghua.edu.cn/simple/
    若电脑同时装有python2和python3,则输入
    pip3 install BAC0 -i https://pypi.tuna.tsinghua.edu.cn/simple/
    安装成功准备即可运行脚本

  • 双击刚才创建的bat文件或者在cmd中进入到_example目录,运行bat文件

  • 运行结果如下:

  • 中台查看运行结果

    可以看到,刚才的模拟软件中的值已经上传至中台

  • 下发操作,点击下发点右侧的下发按钮,填入想要下发的值,点击下发

    下发成功后,可以看到模拟软件中的值发生了变化,中台上拿到的值也发生了变化

    至此以将模拟设备中的数据拿出并上传至了中台

  • 拿取其他数据,可以根据日志查看设备具备哪些objectName,然后再配置到数据点的高级配置,重新启动运行脚本即可

5 驱动解析

  • 导入sdk相关的依赖包和运行本驱动的BAC0包
#coding=utf-8
import sys
sys.path.append("..")
import BAC0
from driver import *
  • 进行驱动的初始化,在初始化中建立bacnet连接并且在路由里面搜索bacnet设备的ip和设备的id,用于后续拿到设备的数据
class Bacnet(IOTOSDriverI):
    def InitComm(self,attrs):
        self.setPauseCollect(False)
        self.setCollectingOneCircle=True
        self.online(True)

        #建立连接并且在通路里搜索bacnet设备的ip和设备id
        try:
            self.bacnet=BAC0.connect()
            self.bacnet.whois()

            # 搜索局域网内的bacnet协议设备并且打印出来
            for each in self.bacnet.discoveredDevices:
                deviceName = (self.bacnet.read('%s device %s objectName' % (each[0], each[1])))
                self.deviceAddr = each[0]
                self.debug('Found device : %s at address %s' % (deviceName, self.deviceAddr))
                # 打印设备地址为deviceAddr 的objectList property 前十个
                read_pro = self.deviceAddr + ' device 3 objectList'
                self.debug(self.bacnet.read(read_pro)[:10])

        except Exception as e:
            self.bacnet.disconnect()
  • 进行数据的采集,首先先利用数据点里面的参数过滤非采集点和采集点,获取用于数据下发的点和数据上传的点,并通过bacnet协议获取设备的值进行上报
    def Collecting(self,dataId):
        try:
            cfgtmp = self.data2attrs[dataId]['config']
            #过滤掉非采集点
            if cfgtmp["param"] == "":
                return ()

            # 过滤采集点
            if 'disabled' in cfgtmp and cfgtmp['disabled'] == True:
                return ()
            else:
                self.debug(self.name(dataId))

            #获取用于数据下发的点
            if  'private' in cfgtmp['param'] and cfgtmp['param']['private']=='write' and 'num' in cfgtmp['param']:
                if "memoryvalue" not in self.data2attrs[dataId]:
                    return ('请下发',)
                else:
                    return (self.data2attrs[dataId]["memoryvalue"],)

            #上传数据点
            if 'objectName' in cfgtmp['param'] and 'num' in cfgtmp['param']:
                data_val = self.deviceAddr + ' ' + str(cfgtmp['param']['objectName'])+' '+str(cfgtmp['param']['num'])+' '+'objectName description presentValue units'
                #self.debug(data_val)
                #读取bacnet设备中属性的值
                data = self.bacnet.readMultiple(data_val)
                return (str(data[2]),)

        except Exception as e:
            # 连接会一定时间后断开,需要再次开启
            self.bacnet.disconnect()
            self.bacnet = BAC0.connect()
            self.bacnet.whois()

        return ()
  • 数据下发。利用私有属性‘private’判断需要修改的数据点的属性,对数据点进行下发时就会直接对设备的数据进行修改
    def Event_setData(self, dataId, value):
        #更改bacnet里面属性的值,一般只能是analoValue属性
        if 'private' in self.data2attrs[dataId]['config']['param']:
            if self.data2attrs[dataId]['config']['param']['private']== 'write':
                data_wri=self.deviceAddr+' '+'analogValue'+' '+ str(self.data2attrs[dataId]['config']['param']['num']) +' presentValue ' + str(value)
                #self.debug(data_wri)
                self.bacnet.write(data_wri)
                self.setValue(self.name(dataId), value)

        return json.dumps({'code': 0, 'msg': '', 'data': ''})
作者:IOTOS  创建时间:2022-01-13 15:40
最后编辑:admin  更新时间:2023-11-29 09:38