1 前言

Modbus协议是应用于电子控制器上的一种通用语言。通过此协议,控制器相互之间、控制器经由网络例如以太网)和其它设备之间可以通信。它已经成为一通用工业标准。有了它,不同厂商生产的控制设备可以连成工业网络,进行集中监控此协议定义了一个控制器能认识使用的消息结构,而不管它们是经过何种网络进行通信的。它描述了一控制器请求访问其它设备的过程,如果回应来自其它设备的请求,以及怎样侦测错误并记录。它制定了消息域格局和内容的公共格式。Modbus具有两种串行传输模式:分别为ASCII和RTU。Modbus是一种单主站的主从通信模式,Modbus网络上只能有一个主站存在,主站在Modbus网络上没有地址,每个从站必须有唯一的地址,从站的地址范围为0 - 247,其中0为广播地址,从站的实际地址范围为1 - 247。Modbus RTU通信以主从的方式进行数据传输,在传输的过程中Modbus RTU主站是主动方,即主站发送数据请求报文到从站,Modbus RTU从站返回响应报文。

2 驱动目的

modbus RTU 该驱动是将中台(IOTOS物联中台)作为服务端(上位机)向客户端(下位机)/modbus 485通讯的前端设备发送询问帧,客户端(下位机)/modbus 485通讯的前端设备接收到询问帧并返回应答帧给到服务端(上位机)进行解析工作并展示数据

3 适用范围

凡是走标准的modbus_rtu协议的设备,例如烟感、光感、PLC等,需要注意的是,若客户端(下位机)/modbus 485通讯的前端设备只有485/232 等的串口通讯不具备上网功能,需增加一个外接模块(如485 转4g /485 转wifi 模块)与服务器(上位机进行网络通讯)。使用者或用户可在生产生活中使用串口通讯测试软件测试设备是否通讯正常。

4 使用示例

以光感传感器(威海晶合数字矿山技术有限公司的光照传感器)为示例进行演示,其他同类型协议的设备一样
图片名称

4.1 连接设备

  • 首先,连接光感和485模块,光感的485线的A,B级分别连接模块的485接口AB级;其次光感的电源线连接符合光感正常运转的电源(可用变压器控制电源大小),然后模块网口连接交换机,最后分别给模块和光感通上电,具体连接方式如下图:图片名称

4.2 配置模块

  • 进入模块的IP,进行配置,将模式改为TCP Client格式,地址改为IOTOS中台的IP地址,端口改为被分配的端口,注册包方式改为云转发图片名称

4.3 创建模板

  • 进入IOTOS中台,测试账号为iotos_test,密码为iotos123(也可以创建自己的账号),依次点击【系统设备】->【模板驱动】->【我的模板】,点击右上角创建模板,填写模板名称和模板根配置里面的“driver”参数,该参数表示要运行的驱动,本实例用到的是modbus驱动(该模板中台自带,参数与图片中保持一致即可)

4.4 创建网关

  • 依次点击【系统设备】 -> 【通信网关】,创建网关

    图片名称
  • 填好网关名称后点击确认

    图片名称

4.5 创建设备

  • 创建设备示例点击【系统设备】 -> 【通信网关】 -> 【设备实例】

    图片名称
  • 填写【设备名称】、选择刚刚创建的【模板驱动】和【通信网关】。参数tcp为中台需要开发的端口,用来与模块进行通讯,与模块配置里面的端口保持一致(上文模块配置的端口为123,所以这里也填123),参数serial为设备的相关信息,根据设备的具体情况进行修改,其中(1,9600,N,8,1,0)从左到右分别表示串口号(此处没用串口所以填1)、波特率、奇偶校验、数据位、停止位、超时设置(可不填)

    图片名称

4.6 创建数据点

  • 创建数据点,点击【系统设备】 -> 【通信网关】 -> 【设备实例】 -> 【数据点】,并在【设备实例】下拉列表选择刚刚创建的设备实例

    图片名称
  • 点击右边的创建数据点,填写名称

    图片名称
  • 并在高级配置中配置需要给光感发送的指令,以下为光感的询问帧和中台数据点的配置:

    图片名称
  • 中台数据点的配置:

    图片名称

4.7 开启采集

  • 都配置完成后,即可启动驱动进行数据的采集。方法:在【我的设备】 -> 【通信网关】中找到刚才创建的网关,点击【高级】

    图片名称
  • 点击开启云网关,密码为账号密码,开启后即表示驱动已经开始运行,数据正在进行采集

    图片名称
  • 点击 【我的设备】 -> 【通信网关】 -> 【设备实例】->【数据点】,选择刚才创建的设备实例

    图片名称
  • 即可查看数据已经上报成功,light即为此时的光照强度

    图片名称

5 完整驱动代码


#!coding:utf8
import json
import sys

sys.path.append("..")
from driver import *

import time
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk.exceptions import ModbusInvalidResponseError
import serial
import signal
import traceback
from jcomm import *
import re
import struct
import math

#硬件心跳线程
class RunHardwareHeartbeatThread(threading.Thread,JLib):
    def __init__(self, driver):
        threading.Thread.__init__(self)
        JLib.__init__(self)
        self.driver = driver
    def run(self):
        statetmp = False
        dataIdTmp = ''
        recycletmp = 0
        for dataId,attrs in self.driver.data2attrs.items():
            if 'param' not in attrs['config']:
                self.error(attrs['config'])
                break
            if 'hbt' in attrs['config']['param']:
                dataIdTmp = dataId
                recycletmp = attrs['config']['param']['hbt']
                break
        while True:
            try:
                if not self.driver.startHeartbeat:
                    return

                #状态反转及延时
                if statetmp == False:
                    statetmp = True
                else:
                    statetmp = False
                time.sleep(recycletmp)

                # self.warn('HARDWARE HEATBEAT ' + dataIdTmp + u'硬件心跳:' + str(statetmp))
                #控制执行
                rettmp = ''
                if statetmp:
                    rettmp = self.driver.Event_setData(dataIdTmp,'true')
                else:
                    rettmp = self.driver.Event_setData(dataIdTmp,'false')
                if json.loads(rettmp)["code"] == 0:
                    self.driver.setValue(self.driver.name(dataIdTmp), statetmp)
            except Exception,e:
                traceback.print_exc(e.message)
                continue

class ModbusDriver(IOTOSDriverI):
    def __init__(self):
        IOTOSDriverI.__init__(self)
        self.master = None
        # 心跳开关
        self.startHeartbeat = False
        self.bitsState = [0,0,0,0,0,0,0,0]
        self.sourceDataIn = []

    # 1、通信初始化
    def InitComm(self, attrs = None):
        try:
            #一、tcp端口监听
            self.__port = self.sysAttrs['config']['param']['tcp']
            self.__tcpServer = TcpServerThread(self,self.__port)
            self.__tcpServer.setDaemon(True)
            self.__tcpServer.start()
            self.debug(self.sysAttrs['name'] + u' TCP端口' + str(self.__port) + u"已启动监听!")

            #二、创建串口1 <=> 串口2
            serialtmp = self.sysAttrs['config']['param']['serial']
            self.__serial = SerialDtu(serialtmp)
            self.__serial.setCallback(self.serialCallback)
            self.__serial.open()

            #三、串口1 <=> modbus_tk
            self.master = modbus_rtu.RtuMaster(self.__serial.serial)
            self.master.set_timeout(5)
            self.master.set_verbose(False)
            self.debug(self.sysAttrs['name'] + u' 串口' + self.__serial.portName() + u'已打开!')

            self.zm.pauseCollect = True
            # 实例化硬件心跳线程
            RunHardwareHeartbeatThread(self).start()

        except Exception,e:
            self.online(False)
            traceback.print_exc(u'通信初始化失败' + e.message)

    #四、串口2 <=> tcp
    #tcp => 串口2
    def tcpCallback(self,data):
        datastr = self.str2hex(data)
        self.sourceDataIn = data
        self.info("Master < < < < < < Device: " + datastr)
        self.__serial.send(data)

    #tcp <= 串口2
    def serialCallback(self,data):
        self.info("Master > > > > > > Device: " + self.str2hex(data))
        self.__tcpServer.send(data)

    #连接状态回调
    def connectEvent(self,state):
        self.online(state)
        try:
            if state == True:
                self.warn('连接成功,启动采集、心跳')
                self.pauseCollect = False
                #启动软件看门狗
                self.startHeartbeat = True
            else:
                self.warn('连接断开,将关闭采集和心跳!')
                self.startHeartbeat = False
                self.pauseCollect = True
        except Exception,e:
            self.error(u'硬件心跳错误, ' + e.message)

    # 2、采集
    def Collecting(self, dataId):
        try:
            rtu_ret = ()
            cfgtmp = self.data2attrs[dataId]['config']

            #added by lrq,过滤非modbus rtu配置的点
            if not cfgtmp.has_key('param') or not cfgtmp.has_key('proxy'):
                return ()

            #当是新一组功能号时;当没有proxy.pointer,或者有,但是值为null时,就进行采集!否则(有pointer且值不为null,表明设置了采集代理,那么自己自然就被略过了,因为被代理了)当前数据点遍历轮询会被略过!
            if 'pointer' not in cfgtmp['proxy'] or cfgtmp['proxy']['pointer'] == None or cfgtmp['proxy']['pointer'] == '':

                #added by lrq,某些过滤掉不采集,因为有的地址的设备不在线,只要在proxy下面配置disabled:true,这样就不会轮训到它!
                if 'disabled' in cfgtmp['proxy'] and cfgtmp['proxy']['disabled'] == True:
                    return ()
                else:
                    self.warn(self.name(dataId))

                # added by lrq,过滤非modbus rtu配置的点
                if not cfgtmp['param'].has_key('funid'):
                    return ()

                funid = cfgtmp['param']['funid']
                devid = cfgtmp['param']['devid']
                regad = cfgtmp['param']['regad']
                format = cfgtmp['param']['format']
                quantity = re.findall(r"\d+\.?\d*", format)
                if len(quantity):
                    quantity = int(quantity[0])
                else:
                    quantity = 1
                if format.lower().find('i') != -1:       #I、i类型数据为4个字节,所以n个数据,就是4n字节,除一般应对modbus标准协议的2字节一个数据的个数单位!
                    quantity *= 4/2
                elif format.lower().find('h') != -1:
                    quantity *= 2/2
                elif format.lower().find('b') != -1:
                    quantity *= 1/2
                elif format.find('d') != -1:
                    quantity *= 8/2
                elif format.find('f') != -1:
                    quantity *= 4/2
                elif format.find('?') != -1:           #对于功能号1、2的开关量读,开关个数,对于这种bool开关型,个数就不是返回字节数的两倍了!返回的字节个数是动态的,要字节数对应的位数总和,能覆盖传入的个数数值!
                    quantity *= 1
                    format = ''                        #实践发现,对于bool开关型,传入开关量个数就行,format保留为空!如果format设置为 "?"或"8?"、">?"等,都会解析不正确!!
                self.debug('>>>>>>' + '(PORT-' + str(self.__port) + ')' + str(devid) + ' ' + str(funid) + ' ' + str(regad) + ' ' + str(quantity) + ' ' + str(format))
                rtu_ret = self.master.execute(devid, funid, regad, quantity,data_format=format)

                if funid == 3:
                    retlist = []
                    for i in range(len(rtu_ret)):
                        retlist.append(rtu_ret[i])
                    rtu_ret = tuple(retlist)

                #周期查询的开关量输出状态进行备份,用来给控制用
                if funid == 1:
                    self.bitsState = list(rtu_ret)
                self.debug(rtu_ret)
                return rtu_ret
            # 一组功能号内的数据点,不进行遍历采集!跳过!
            else:
                return ()   #注意,这种情况下不是采集错误,如果返回None,那么会当作采集错误处理,进行采集错误计数了!!
        except ModbusInvalidResponseError, e:
            self.error(u'MODBUS响应超时, ' + e.message)
            return None
        except Exception, e:
            traceback.print_exc(e.message)
            self.error(u'采集解析参数错误:' + e.message)
            return None

    # 3、控制 数据点配置
    # 事件回调接口,监测点操作访问
    def Event_getData(self, dataId, condition=''):

        return json.dumps({'code': 0, 'msg': '', 'data': new_val})

    # 事件回调接口,监测点操作访问
    def Event_setData(self, dataId, value):
        self.warn(value)
        try:
            if self.master == None:
                self.InitComm()
            data_config = self.data2attrs[dataId]['config']
            bit = 0
            if 'proxy' in data_config.keys() and 'pointer' in data_config['proxy'] and data_config['proxy']['pointer'] != None:
                bit = data_config['proxy']['index']
            if self.valueTyped(dataId,value) == True:
                self.bitsState[bit] = 1
            else:
                self.bitsState[bit] = 0
            self.warn(self.bitsState)

            #注意,这里地址是1,但是再huaihua等用了3合一设备的,地址是2,接下来需要这里也做个区分,按照当前操作的数据点对应的实际数据点来!
            ret = self.master.execute(1, cst.WRITE_MULTIPLE_COILS, 0, output_value=self.bitsState)
            self.warn(ret)
            return json.dumps({'code': 0, 'msg': u'操作成功!', 'data': list(ret)})
        except Exception,e:
            return json.dumps({'code': 501, 'msg': u'操作失败,错误码501,' + e.message, 'data': None})

6 驱动解析

  • 编写环境为python2,首先需要导入modbus、数据解析和IOTOS中台驱动文件(driver)的相关包
#!coding:utf8
import json
import sys

sys.path.append("..")
from driver import *

import time
import modbus_tk
import modbus_tk.defines as cst
import modbus_tk.modbus as modbus
import modbus_tk.modbus_rtu as modbus_rtu
from modbus_tk.exceptions import ModbusInvalidResponseError
import serial
import signal
import traceback
from jcomm import *
import re
import struct
import math
  • 创建硬件心跳进程,判断中台的数据点是否含有必要的属性,如果没有则提示error,防止后续过程报错,开启心跳进程则启动中台的通讯
class RunHardwareHeartbeatThread(threading.Thread,JLib):
    def __init__(self, driver):
        threading.Thread.__init__(self)
        JLib.__init__(self)
        self.driver = driver
    def run(self):
        statetmp = False
        dataIdTmp = ''
        recycletmp = 0
        for dataId,attrs in self.driver.data2attrs.items():
            if 'param' not in attrs['config']:
                self.error(attrs['config'])
                break
            if 'hbt' in attrs['config']['param']:
                dataIdTmp = dataId
                recycletmp = attrs['config']['param']['hbt']
                break
        while True:
            try:
                if not self.driver.startHeartbeat:
                    return

                #状态反转及延时
                if statetmp == False:
                    statetmp = True
                else:
                    statetmp = False
                time.sleep(recycletmp)

                # self.warn('HARDWARE HEATBEAT ' + dataIdTmp + u'硬件心跳:' + str(statetmp))
                #控制执行
                rettmp = ''
                if statetmp:
                    rettmp = self.driver.Event_setData(dataIdTmp,'true')
                else:
                    rettmp = self.driver.Event_setData(dataIdTmp,'false')
                if json.loads(rettmp)["code"] == 0:
                    self.driver.setValue(self.driver.name(dataIdTmp), statetmp)
            except Exception,e:
                traceback.print_exc(e.message)
                continue
  • 继承IOTOSDriverI类,进行初始化,设置心跳开关
class ModbusDriver(IOTOSDriverI):
    def __init__(self):
        IOTOSDriverI.__init__(self)
        self.master = None
        # 心跳开关
        self.startHeartbeat = False
        self.bitsState = [0,0,0,0,0,0,0,0]
        self.sourceDataIn = []
  • 进行通讯初始化,获取IOTOS中台设备实例中配置的端口和serial属性并且启动tcp监听,实例化心跳进程
# 1、通信初始化
    def InitComm(self, attrs = None):
        try:
            #一、tcp端口监听
            self.__port = self.sysAttrs['config']['param']['tcp']
            self.__tcpServer = TcpServerThread(self,self.__port)
            self.__tcpServer.setDaemon(True)
            self.__tcpServer.start()
            self.debug(self.sysAttrs['name'] + u' TCP端口' + str(self.__port) + u"已启动监听!")

            #二、创建串口1 <=> 串口2
            serialtmp = self.sysAttrs['config']['param']['serial']
            self.__serial = SerialDtu(serialtmp)
            self.__serial.setCallback(self.serialCallback)
            self.__serial.open()

            #三、串口1 <=> modbus_tk
            self.master = modbus_rtu.RtuMaster(self.__serial.serial)
            self.master.set_timeout(5)
            self.master.set_verbose(False)
            self.debug(self.sysAttrs['name'] + u' 串口' + self.__serial.portName() + u'已打开!')

            self.zm.pauseCollect = True
            # 实例化硬件心跳线程
            RunHardwareHeartbeatThread(self).start()

        except Exception,e:
            self.online(False)
            traceback.print_exc(u'通信初始化失败' + e.message)
  • tcp回调,可以查看设备是否与中台以及连接成功
#四、串口2 <=> tcp
    #tcp => 串口2
    def tcpCallback(self,data):
        datastr = self.str2hex(data)
        self.sourceDataIn = data
        self.info("Master < < < < < < Device: " + datastr)
        self.__serial.send(data)

#tcp <= 串口2
    def serialCallback(self,data):
        self.info("Master > > > > > > Device: " + self.str2hex(data))
        self.__tcpServer.send(data)
  • 连接状态回调,连接成功则启动硬件心跳进程并且设置中台的网关状态
#连接状态回调
    def connectEvent(self,state):
        self.online(state)
        try:
            if state == True:
                self.warn('连接成功,启动采集、心跳')
                self.pauseCollect = False
                #启动软件看门狗
                self.startHeartbeat = True
            else:
                self.warn('连接断开,将关闭采集和心跳!')
                self.startHeartbeat = False
                self.pauseCollect = True
        except Exception,e:
            self.error(u'硬件心跳错误, ' + e.message)
  • 最后是采集函数,先过滤掉中台非modbus rtu配置的点,再拿到数据点属性中的参数,将参数进行处理后可以拿到需要给设备发送的指令,发送过去后对接收过来的数据进行进制转换和处理后再上传至中台即可将设备的数据上云
# 2、采集
    def Collecting(self, dataId):
        try:
            rtu_ret = ()
            cfgtmp = self.data2attrs[dataId]['config']

            #added by lrq,过滤非modbus rtu配置的点
            if not cfgtmp.has_key('param') or not cfgtmp.has_key('proxy'):
                return ()

            #当是新一组功能号时;当没有proxy.pointer,或者有,但是值为null时,就进行采集!否则(有pointer且值不为null,表明设置了采集代理,那么自己自然就被略过了,因为被代理了)当前数据点遍历轮询会被略过!
            if 'pointer' not in cfgtmp['proxy'] or cfgtmp['proxy']['pointer'] == None or cfgtmp['proxy']['pointer'] == '':

                #added by lrq,某些过滤掉不采集,因为有的地址的设备不在线,只要在proxy下面配置disabled:true,这样就不会轮训到它!
                if 'disabled' in cfgtmp['proxy'] and cfgtmp['proxy']['disabled'] == True:
                    return ()
                else:
                    self.warn(self.name(dataId))

                # added by lrq,过滤非modbus rtu配置的点
                if not cfgtmp['param'].has_key('funid'):
                    return ()

                funid = cfgtmp['param']['funid']
                devid = cfgtmp['param']['devid']
                regad = cfgtmp['param']['regad']
                format = cfgtmp['param']['format']
                quantity = re.findall(r"\d+\.?\d*", format)
                if len(quantity):
                    quantity = int(quantity[0])
                else:
                    quantity = 1
                if format.lower().find('i') != -1:       #I、i类型数据为4个字节,所以n个数据,就是4n字节,除一般应对modbus标准协议的2字节一个数据的个数单位!
                    quantity *= 4/2
                elif format.lower().find('h') != -1:
                    quantity *= 2/2
                elif format.lower().find('b') != -1:
                    quantity *= 1/2
                elif format.find('d') != -1:
                    quantity *= 8/2
                elif format.find('f') != -1:
                    quantity *= 4/2
                elif format.find('?') != -1:           #对于功能号1、2的开关量读,开关个数,对于这种bool开关型,个数就不是返回字节数的两倍了!返回的字节个数是动态的,要字节数对应的位数总和,能覆盖传入的个数数值!
                    quantity *= 1
                    format = ''                        #实践发现,对于bool开关型,传入开关量个数就行,format保留为空!如果format设置为 "?"或"8?"、">?"等,都会解析不正确!!
                self.debug('>>>>>>' + '(PORT-' + str(self.__port) + ')' + str(devid) + ' ' + str(funid) + ' ' + str(regad) + ' ' + str(quantity) + ' ' + str(format))
                rtu_ret = self.master.execute(devid, funid, regad, quantity,data_format=format)


                if funid == 3:
                    retlist = []
                    for i in range(len(rtu_ret)):
                        retlist.append(rtu_ret[i])
                    rtu_ret = tuple(retlist)

                #周期查询的开关量输出状态进行备份,用来给控制用
                if funid == 1:
                    self.bitsState = list(rtu_ret)
                self.debug(rtu_ret)
                return rtu_ret
            # 一组功能号内的数据点,不进行遍历采集!跳过!
            else:
                return ()   #注意,这种情况下不是采集错误,如果返回None,那么会当作采集错误处理,进行采集错误计数了!!
        except ModbusInvalidResponseError, e:
            self.error(u'MODBUS响应超时, ' + e.message)
            return None
        except Exception, e:
            traceback.print_exc(e.message)
            self.error(u'采集解析参数错误:' + e.message)
            return None
  • 部分设备可以进行数据的下发来控制设备的状态或者配置设备的参数,可以利用如下的函数
# 事件回调接口,监测点操作访问
    def Event_setData(self, dataId, value):
        self.warn(value)
        try:
            if self.master == None:
                self.InitComm()
            data_config = self.data2attrs[dataId]['config']
            bit = 0
            if 'proxy' in data_config.keys() and 'pointer' in data_config['proxy'] and data_config['proxy']['pointer'] != None:
                bit = data_config['proxy']['index']
            if self.valueTyped(dataId,value) == True:
                self.bitsState[bit] = 1
            else:
                self.bitsState[bit] = 0
            self.warn(self.bitsState)

            #注意,这里地址是1,但是再huaihua等用了3合一设备的,地址是2,接下来需要这里也做个区分,按照当前操作的数据点对应的实际数据点来!
            ret = self.master.execute(1, cst.WRITE_MULTIPLE_COILS, 0, output_value=self.bitsState)
            self.warn(ret)
            return json.dumps({'code': 0, 'msg': u'操作成功!', 'data': list(ret)})
        except Exception,e:
            return json.dumps({'code': 501, 'msg': u'操作失败,错误码501,' + e.message, 'data': None})
作者:admin  创建时间:2021-10-12 09:50
最后编辑:admin  更新时间:2023-11-29 09:38