1 前言
海康门禁的对接需要门禁所在的局域网内存在中台,这样才能通过中台对门禁进行一定的操作。否则对接很难实现,而且需要对Linux的基本操作和中台框架有一定了解的专业人员,不然很容易出现意料之外的问题导致部署的中台出现问题,务必仔细阅读以下内容,避免不必要的错误产生,请熟知。
2 驱动目的
通过中台直接下发远程采集指令到门禁,门禁可以采集在门禁前的人脸,并且保存采集到的人脸将其下发至系统内,通过此驱动,中台也可以直接获取到最新的出入情况和门禁的状态并且展示出来。
3 适用范围
具备人脸识别功能的海康门禁
4 使用示例
该驱动仅展示了门禁远程采集人脸并下发至门禁存储系统中的功能
4.1 下载门禁需要的依赖包
这些依赖包用于与海康门禁的通讯,必不可少,下载地址:海康门禁依赖包下载,选择Linux的SDK,需要登录该平台后才能下载,没有账号的可以注册。
下载完成后,解压下载的文件,在里面找到lib文件,将其复制一份并且改名为linux_lib
4.2 放置依赖包
上一步完成后,开始放置依赖包,该依赖包需要放置在两个地方。
①第一个地方
利用FileZilla或者MobaXterm或者xshell带有的xftp等可以连接虚拟主机的文件传输工具(可在网上自行下载),这里以FileZilla为例,打开此软件,上方输入中台主机IP、用户名、密码和端口(端口填22,账号密码若不清楚可咨询工作人员),点击快速连接即可进入中台主机
进入后依次点击【..】->【data】->【IOTOSDK_python】->【sdk】->【_example】或者在远程站点处输入/data/IOTOSDK_python/sdk/_examples后按回车进入_example文件目录下
在软件的左侧块打开刚才下载的linux_lib的文件夹下,将linux_lib整个文件夹拖到右边主机目录下即可。②第二个地方
利用PuTTy或者MobaXterm或者Xshell等软件连接到主机,这里以PuTTy为例,输入主机名点击Open
进入主机后输入账号密码即可进入(与进入FileZilla的账号密码一致),在命令行输入
docker exec -it iotos_paas bash
然后再输入
cp -r /home/IOTOSDK_python/sdk/\_examples/linux_lib/ /
最后输入ls
检查linux_lib是否已经存在了。
4.3 添加映射
为了使捕捉到的人脸展示出来,需要在内部进行一定的映射将连接开放出来用于展示、观察。
①创建映射文件
承接放置依赖包那块,进入主机后输入cd /data/iotos-paas-v2/media
,然后再输入mkdir mj
,创建完成。最后输入cd /
回到根目录
①进入需求目录
在命令行输入cd /data/IOTOS_PaaS_Docker_Script/
。
②修改配置文件
进入需求目录后,再在命令行输入vim docker-compose.yml
,进入文件后按键盘i键即可进入编辑状态,在其中加入一行命令,添加完成后退出文件。
退出刚才编辑的文件之后在命令行输入docker-compose stop
,等该命令的运行过程结束后再输入docker-compose up -d
,该命令结束后即添加映射完成。
4.4 编写驱动
在自己的本机任意一个位置建立一个文件,命名为dr_hikvision_collectingface_test.py,其中的代码如下:
#!coding:utf8
import json
import sys
sys.path.append("..")
from driver import *
import os
import platform
import time
import requests
from requests.auth import HTTPDigestAuth
import ctypes
reload(sys)
sys.setdefaultencoding('utf8')
from ctypes import *
# 定义登录结构体
class NET_DVR_DEVICEINFO_V30(Structure):
pass
LPNET_DVR_DEVICEINFO_V30 = POINTER(NET_DVR_DEVICEINFO_V30)
NET_DVR_DEVICEINFO_V30._fields_ = [
('sSerialNumber', c_ubyte * 48),
('byAlarmInPortNum', c_ubyte),
('byAlarmOutPortNum', c_ubyte),
('byDiskNum', c_ubyte),
('byDVRType', c_ubyte),
('byChanNum', c_ubyte),
('byStartChan', c_ubyte),
('byAudioChanNum', c_ubyte),
('byIPChanNum', c_ubyte),
('byZeroChanNum', c_ubyte),
('byMainProto', c_ubyte),
('bySubProto', c_ubyte),
('bySupport', c_ubyte),
('bySupport1', c_ubyte),
('bySupport2', c_ubyte),
('wDevType', c_ushort),
('bySupport3', c_ubyte),
('byMultiStreamProto', c_ubyte),
('byStartDChan', c_ubyte),
('byStartDTalkChan', c_ubyte),
('byHighDChanNum', c_ubyte),
('bySupport4', c_ubyte),
('byLanguageType', c_ubyte),
('byVoiceInChanNum', c_ubyte),
('byStartVoiceInChanNo', c_ubyte),
('bySupport5', c_ubyte),
('bySupport6', c_ubyte),
('byMirrorChanNum', c_ubyte),
('wStartMirrorChanNo', c_ushort),
('bySupport7', c_ubyte),
('byRes2', c_ubyte),
]
# 定义组件库加载路径信息结构体
class NET_DVR_LOCAL_SDK_PATH(Structure):
pass
LPNET_DVR_LOCAL_SDK_PATH = POINTER(NET_DVR_LOCAL_SDK_PATH)
NET_DVR_LOCAL_SDK_PATH._fields_ = [
('sPath', c_char * 256),
('byRes', c_ubyte * 128),
]
# 抓图数据结构体
class NET_DVR_JPEGPARA(Structure):
_fields_ = [
("wPicSize", c_ushort),
("wPicQuality", c_ushort)]
# 定义预览参数结构体
class NET_DVR_PREVIEWINFO(Structure):
pass
LPNET_DVR_PREVIEWINFO = POINTER(NET_DVR_PREVIEWINFO)
NET_DVR_PREVIEWINFO._fields_ = [
('lChannel', c_long),
('dwStreamType', c_ulong),
('dwLinkMode', c_ulong),
('hPlayWnd', c_void_p),
('bBlocked', c_ulong),
('bPassbackRecord', c_ulong),
('byPreviewMode', c_ubyte),
('byStreamID', c_ubyte * 32),
('byProtoType', c_ubyte),
('byRes1', c_ubyte),
('byVideoCodingType', c_ubyte),
('dwDisplayBufNum', c_ulong),
('byRes', c_ubyte * 216),
]
#采集人脸信息条件参数结构体
class NET_DVR_CAPTURE_FACE_COND(Structure):
pass
LPNET_DVR_CAPTURE_FACE_COND=POINTER(NET_DVR_CAPTURE_FACE_COND)
NET_DVR_CAPTURE_FACE_COND._fields_ = [
('dwSize', c_uint),
('byRes', c_ubyte * 128),
]
NET_DVR_CAPTURE_FACE_INFO=2510 #远程人脸采集
# 区域框参数结构体
class NET_VCA_RECT(Structure):
pass
LPNET_VCA_RECT=POINTER(NET_VCA_RECT)
NET_VCA_RECT._fields_ = [
('fX', c_float),
('fY', c_float),
('fWidth', c_float),
('fHeight', c_float),
]
# 点坐标参数结构体
class NET_VCA_POINT(Structure):
pass
LPNET_VCA_POINT=POINTER(NET_VCA_POINT)
NET_VCA_POINT._fields_ = [
('fX', c_float),
('fY', c_float),
]
# 人脸属性结构体
class NET_DVR_FACE_FEATURE(Structure):
pass
LPNET_DVR_FACE_FEATURE=POINTER(NET_DVR_FACE_FEATURE)
NET_DVR_FACE_FEATURE._fields_ = [
('struFace', NET_VCA_RECT),
('struLeftEye', NET_VCA_POINT),
('struRightEye', NET_VCA_POINT),
('struLeftMouth', NET_VCA_POINT),
('struRightMouth', NET_VCA_POINT),
('struNoseTip', NET_VCA_POINT),
]
# 人脸信息采集结果结构体
class NET_DVR_CAPTURE_FACE_CFG(Structure):
pass
LPNET_DVR_CAPTURE_FACE_CFG=POINTER(NET_DVR_CAPTURE_FACE_CFG)
NET_DVR_CAPTURE_FACE_CFG._fields_ = [
('dwSize', c_uint),
('dwFaceTemplate1Size', c_uint),
('pFaceTemplate1Buffer', c_char_p),
('dwFaceTemplate2Size', c_uint),
('pFaceTemplate2Buffer', c_char_p),
('dwFacePicSize', c_uint),
('pFacePicBuffer', c_char_p),
('byFaceQuality1', c_ubyte),
('byFaceQuality2', c_ubyte),
('byCaptureProgress', c_ubyte),
('byFacePicQuality', c_ubyte),
('dwInfraredFacePicSize', c_uint),
('pInfraredFacePicBuffer', c_char_p),
('byInfraredFacePicQuality', c_ubyte),
('byRes1', c_ubyte * 3),
('struFeature', NET_DVR_FACE_FEATURE),
('byRes', c_ubyte * 56),
]
NET_SDK_CALLBACK_TYPE_STATUS = 0
NET_SDK_CALLBACK_TYPE_PROGRESS = 1
NET_SDK_CALLBACK_TYPE_DATA = 2
# 状态回调函数
STATECALLBACK = CFUNCTYPE(None, c_uint, c_void_p, c_uint, c_void_p)
def stateback(dwType, lpBuffer, dwBufLen, pUserData):
'''
状态回调函数
'''
if dwType == NET_SDK_CALLBACK_TYPE_STATUS:
pass
elif dwType == NET_SDK_CALLBACK_TYPE_PROGRESS:
pass
elif dwType == NET_SDK_CALLBACK_TYPE_DATA:
lpBuffer = NET_DVR_CAPTURE_FACE_CFG()
cbStateCallback = STATECALLBACK(stateback)
win = None
funcRealDataCallBack_V30 = None
funcfRemoteConfigCallback = None
# HCNetSDK库文件路径
HCNETSDK_DLL_PATH = r'HCNetSDK.dll'
HCNETSDK_DLL_PATH_LINUX = r'/linux_lib/libhcnetsdk.so'
HCNETSDK_DLL_PATH_LINUX_one = r'/linux_lib/libcrypto.so'
#HCNETSDK_DLL_PATH_LINUX_two = r'/libssl.so'
HCNETSDK_DLL_PATH_LINUX_two = r'/linux_lib/libssl.so'
def GetPlatform():
sysstr = platform.system()
if sysstr != "Windows":
global WINDOWS_FLAG
WINDOWS_FLAG = False
def LoadHCNetSDK():
'''
加载HCNetSDK库
'''
if not WINDOWS_FLAG:
os.getcwd()
Objdll = cdll.LoadLibrary(os.getcwd() + HCNETSDK_DLL_PATH_LINUX)
else:
os.chdir('D:\iotos\hk_py\lib')
Objdll = WinDLL(HCNETSDK_DLL_PATH)
return Objdll
def InitHCNetSDK(self,Objdll):
'''
初始化HCNetSDK库
'''
if not WINDOWS_FLAG:
self.debug('loading')
sdk_path = NET_DVR_LOCAL_SDK_PATH()
sdk_path.sPath = os.path.dirname(HCNETSDK_DLL_PATH_LINUX).encode('utf-8')
flag1 = Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_path))
self.debug("libhcnetsdk.so loads successfully")
sdk_path.sPath=(os.getcwd() + HCNETSDK_DLL_PATH_LINUX_one).encode('utf-8')
flag2 = Objdll.NET_DVR_SetSDKInitCfg(3, byref(sdk_path))
self.debug("libcrypto.so loads successfully")
sdk_path.sPath=("/usr/lib/x86_64-linux-gnu/libssl.so").encode('utf-8')
#Objdll.NET_DVR_SetSDKInitCfg(4, byref(sdk_path))
flag3 = Objdll.NET_DVR_SetSDKInitCfg(4, "/usr/lib/x86_64-linux-gnu/libssl.so")
self.debug("libssl.so loads successfully")
# 初始化DLL
falg = Objdll.NET_DVR_Init()
self.debug(falg)
self.debug(Objdll.NET_DVR_GetLastError())
# 设置日志路径
if not WINDOWS_FLAG:
Objdll.NET_DVR_SetLogToFile(3, create_string_buffer(b'/home/sdklog'), True)
else:
Objdll.NET_DVR_SetLogToFile(3, create_string_buffer(b'G:\\SdkLog'), True)
# 设置设备超时时间
# Objdll.NET_DVR_SetConnectTime(int(1000), 1)
def LoginDev(self,Objdll):
'''
登录设备
'''
device_info = NET_DVR_DEVICEINFO_V30()
self.debug(self.HK_username)
self.debug(self.HK_password)
os.chdir('/home/IOTOSDK_python/sdk/_examples/linux_lib')
lUserId = Objdll.NET_DVR_Login_V30(create_string_buffer(self.HK_ip), self.HK_port, create_string_buffer(self.HK_username) , create_string_buffer(self.HK_password), byref(device_info))
return (lUserId, device_info)
def RemoteConfigCallback(dwType, lpBuffer, dwBufLen, pUserData):
"""
远程配置回调函数
"""
print(pUserData)
print(dwBufLen)
# jpeg抓图hPlayWnd显示窗口能为none,存在缺点采集图片速度慢
def Get_JPEGpicture(self, lUserID, lChannel=1):
picPath = "../../../../iotos_paas"
picName = "/media/mj/"+ time.strftime('%Y-%m-%d_%H-%M-%S') + "_face.jpg"
picStr = picPath + picName
self.debug(picStr)
with open(picStr, mode="w") as f:
self.debug("success!!!!!!!!!!!!!!")
sJpegPicFileName = bytes(picStr)
lpJpegPara = NET_DVR_JPEGPARA()
lpJpegPara.wPicSize = 0
lpJpegPara.wPicQuality = 0
if (self.Objdll.NET_DVR_CaptureJPEGPicture(lUserID, lChannel, byref(lpJpegPara), sJpegPicFileName) == False):
error_info = self.Objdll.NET_DVR_GetLastError()
print("抓图失败:" + str(error_info))
else:
print("抓图成功")
picURL = "" + picName #需要在这里修改一下ip改为中台的ip
self.debug(picURL)
return picURL, picStr
def uploadImage(self,file_path, strname, streno):
file_name = file_path.split("/")[-1]
url = "http://"+self.hk_ip+"/ISAPI/Intelligent/FDLib/FaceDataRecord?format=json"
payload={'FaceDataRecord': '{"faceLibType": "blackFD","FDID": "1","FPID": "'+streno+'","name": "'+strname+'"}'}
files=[
('FaceImage',(file_name, open(file_path,'rb'), 'image/jpeg'))
]
headers = {}
response = requests.post(url, auth = HTTPDigestAuth(self.hk_name, self.hk_pwd), headers=headers, data=payload, files=files)
#return json.loads(response.text)["statusCode"]
return response
def newperson(self,pname, eno, Endtime=None):
if not Endtime:
Endtime = "2030-01-01T00:00:00"
data = {
"UserInfo":{
"Valid":{
"beginTime":time.strftime('%Y-%m-%dT%H:%M:%S'),
"enable":True,
"endTime":Endtime
},
"checkUser":False,
"doorRight":"1",
"RightPlan":[
{
"doorNo": 1,
"planTemplateNo": "1,3,5"
}
],
"employeeNo":eno,
"floorNumber":1,
"maxOpenDoorTime":0,
"name":pname,
"openDelayEnabled":False,
"password":"123456",
"roomNumber":1,
"userType":"normal"
}
}
r = requests.post("http://"+self.hk_ip+"/ISAPI/AccessControl/UserInfo/Record?format=json", auth = HTTPDigestAuth(self.hk_name, self.hk_pwd), json =data)
return json.loads(r.text)["statusCode"]
class FaceDriver(IOTOSDriverI):
# 1、通信初始化
def InitComm(self,attrs):
# 获取设备实例参数
try:
self.HK_ip = bytes(self.sysAttrs['config']['param']['HK_ip'].encode('utf-8'))
self.HK_port = self.sysAttrs['config']['param']['HK_port']
self.HK_username = bytes(self.sysAttrs['config']['param']['HK_username'].encode('utf-8'))
self.HK_password = bytes(self.sysAttrs['config']['param']['HK_password'].encode('utf-8'))
self.debug(type(self.HK_ip))
self.debug(self.HK_port)
self.online(True)
self.setPauseCollect(False)
self.setCollectingOneCircle(False)
self.picstr = ""
hip = self.sysAttrs['config']['param']['HK_ip']
self.hk_ip = hip.encode()
self.debug(type(self.hk_ip))
self.hk_name = self.sysAttrs['config']['param']['HK_username'].encode('utf-8')
self.hk_pwd = self.sysAttrs['config']['param']['HK_password'].encode('utf-8')
# 获取系统平台
GetPlatform()
# load library
(sdkPath, tempfilename) = os.path.split(HCNETSDK_DLL_PATH)
# 加载HCNetSDK库
self.Objdll = LoadHCNetSDK()
# 初始化HCNetSDK库
InitHCNetSDK(self, self.Objdll)
self.debug(self.Objdll.NET_DVR_GetLastError())
# 登录设备
(self.lUserId, device_info) = LoginDev(self,self.Objdll)
self.debug(self.lUserId)
if self.lUserId<0:
self.debug(self.Objdll.NET_DVR_GetLastError())
self.setValue(u'采集情况', '登录失败')
# 释放资源
self.Objdll.NET_DVR_Cleanup()
self.debug(self.lUserId)
return json.dumps({'code': 0, 'msg': '', 'data': ''})
else:
self.debug(self.lUserId)
self.setValue(u'采集情况', '登录成功')
self.debug(os.getcwd())
sdk_path = NET_DVR_LOCAL_SDK_PATH()
HCNETSDK_DLL_PATH_LINUX = r'/'
sdk_path.sPath=(os.getcwd()+HCNETSDK_DLL_PATH_LINUX).encode('utf-8')
flag1 = self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_path))
self.debug("successfully")
except Exception,e:
self.debug('初始化失败')
self.Objdll.NET_DVR_Cleanup()
# #2、采集引擎回调
def Collecting(self, dataId):
for key, value in self.data2attrs.items(): # 拿出数据点的键和值
if "private" in value['config']['param']:
# 在private中识别开关控制的反控标签
if value['config']['param']['private'] == "switch":
self.key_s = key # 将下发数据点的dataid保存到self中,后续Event_setData中判断下发数据点时需要用到
# 开关赋值》》》》》》》》》》防止页面刷新而导致开关的值消失造成不能下发数据,利用memoryvalue判断是否有数据上传以及上一次上传的数据是多少,下同
if "memoryvalue" not in self.data2attrs[key] or self.data2attrs[key]["memoryvalue"]==None:
self.setValue(u'采集开关', 0)
else:
self.debug(self.data2attrs[key])
self.setValue(u'采集开关', int(self.data2attrs[key]["memoryvalue"]))
if value['config']['param']['private'] == "statue":
# 开关赋值》》》》》》》》》》防止页面刷新而导致开关的值消失造成不能下发数据,利用memoryvalue判断是否有数据上传以及上一次上传的数据是多少,下同
if "memoryvalue" not in self.data2attrs[key]:
pass
else:
self.setValue(u'采集情况', self.data2attrs[key]["memoryvalue"])
if value['config']['param']['private'] == "person":
self.key_p = key
if "memoryvalue" not in self.data2attrs[key]:
self.setValue(u"新建人员", "例:飞飞,20001,2030-08-01")
else:
self.setValue(u'新建人员', self.data2attrs[key]["memoryvalue"])
if value['config']['param']['private'] == "modifytime":
self.key_f = key
if "memoryvalue" not in self.data2attrs[key]:
self.setValue(u"修改权限时间", "例:工号,2021-09-12 14:00,2021-09-12 15:00")
else:
self.setValue(u'修改权限时间', self.data2attrs[key]["memoryvalue"])
time.sleep(0.5)
return ()
# 5、控制事件回调,数据点控制访问
def Event_setData(self, dataId, value):
try:
if dataId==self.key_s and value==1:
# 登录设备
lpInBuffer = NET_DVR_CAPTURE_FACE_COND()
lpInBuffer.dwSize = sizeof(lpInBuffer)
self.debug(lpInBuffer.dwSize)
dwInBufferLen = sizeof(lpInBuffer)
self.HK_ret = self.Objdll.NET_DVR_StartRemoteConfig(self.lUserId, NET_DVR_CAPTURE_FACE_INFO, byref(lpInBuffer), dwInBufferLen, None, None)
self.debug(self.HK_ret)
self.debug(self.Objdll.NET_DVR_GetLastError())
if self.HK_ret < 0:
#self.debug('fail, error code is:'+ self.Objdll.NET_DVR_GetLastError())
# 关闭长连接
rtu = self.Objdll.NET_DVR_StopRemoteConfig(self.HK_ret)
self.debug("关闭长连接!")
self.debug(self.Objdll.NET_DVR_GetLastError())
# 登出设备
self.Objdll.NET_DVR_Logout(self.lUserId)
# 释放资源
self.Objdll.NET_DVR_Cleanup()
return json.dumps({'code':0, 'msg':'', 'data':''})
try:
# 循环获取人脸采集结果
btime = time.time()
while True:
#btime = time.time()
lpOutBuff = NET_DVR_CAPTURE_FACE_CFG()
lpOutBuff.dwSize = sizeof(lpOutBuff)
dwOutBuffSize = sizeof(lpOutBuff)
res = self.Objdll.NET_DVR_GetNextRemoteConfig(self.HK_ret, byref(lpOutBuff), dwOutBuffSize)
self.debug(res)
self.debug(self.Objdll.NET_DVR_GetLastError())
if res == 1000:
self.setValue(u'采集情况', '采集成功')
picurl, picstr = Get_JPEGpicture(self, self.lUserId)
self.picstr = picstr
self.debug(self.picstr)
self.setValue(u'采集开关', 0)
self.setValue(u'人脸图片中人脸质量',lpOutBuff.byFacePicQuality)
self.setValue(u'人脸图片数据大小',lpOutBuff.dwFacePicSize)
self.setValue(u'人脸图片URL', picurl)
break
elif res == 1002:
# print("采集结束")
self.setValue(u'采集情况', '采集结束')
picurl, picstr = Get_JPEGpicture(self, self.lUserId)
self.picstr = picstr
self.debug(self.picstr)
self.setValue(u'采集开关', 0)
self.setValue(u'人脸图片中人脸质量', lpOutBuff.byFacePicQuality)
self.setValue(u'人脸图片数据大小', lpOutBuff.dwFacePicSize)
self.setValue(u'人脸图片URL', picurl)
break
elif res == 1003 or res == 1004:
self.setValue(u'采集情况', '采集超时,请重新开启采集')
self.setValue(u'采集开关', 0)
# self.setValue(u'人脸图片URL', picstr)\
self.setValue(u'人脸图片URL', "")
self.setValue(u'人脸图片中人脸质量', "")
self.setValue(u'人脸图片数据大小', "")
break
elif res == 0:
self.setValue(u'采集情况', '数据返回失败')
self.setValue(u'采集开关', 0)
elif res == 1001:
etime = time.time()
self.debug(type(btime))
self.debug(etime-btime)
if (etime - btime) > 5:
self.setValue(u'采集情况', '采集超时!请重新开启采集')
self.setValue(u'采集开关', 0)
self.setValue(u'人脸图片URL', "")
self.setValue(u'人脸图片中人脸质量', "")
self.setValue(u'人脸图片数据大小', "")
res = 1003
break
continue
except Exception, e:
self.debug(e)
return json.dumps({'code': 0, 'msg': '', 'data': ''})
# 关闭长连接
rtu = self.Objdll.NET_DVR_StopRemoteConfig(self.HK_ret)
self.debug("关闭长连接!")
self.debug(self.Objdll.NET_DVR_GetLastError())
# 登出设备
#self.Objdll.NET_DVR_Logout(self.lUserId)
self.setValue(u'采集开关',0)
if res == 1000 or res ==1002:
thistime = int(time.mktime(time.localtime()))
self.strname = str(thistime)
self.streno = str(thistime)
result1 = newperson(self,self.strname, self.streno)
self.debug(result1)
result2 = uploadImage(self ,self.picstr, self.strname, self.streno)
self.debug(result2)
return json.dumps({'code': 0, 'msg': '', 'data': ''})
except Exception,e:
self.debug(u'>>>>>>>>>error'+e)
return json.dumps({'code':0, 'msg':'', 'data':''})
在279行左右的地方(如下图所示),在””填入中台的IP地址即可
4.5 放置驱动
驱动编写完成后,重新回到FileZilla中,在远程站点的路径栏输入
/data/IOTOSDK_python/sdk/_drivers
点击回车后进入该目录,然后利用放入linux_lib相同的操作将刚才创建的驱动文件拖入此文件夹即可4.6 中台操作
进入IOTOS中台,测试账号为iotos_test,密码为iotos123(也可以创建自己的账号),依次点击【系统设备】->【模板驱动】->【我的模板】,点击右上角创建模板,填写模板名称和模板根配置里面的“driver”参数,该参数表示要运行的驱动,详见系统设备管理-模板驱动,填写刚才创建的驱动和驱动的类名,如下图(参数与图片中保持一致即可)
创建网关。依次点击【系统设备】 -> 【通信网关】,点击右上角创建网关,详见系统设备管理-通信网关,填写网关名称即可
创建设备实例。依次点击【系统设备】 -> 【通信网关】 -> 【设备实例】,点击右上角创建设备实例。详见系统设备管理-设备实例。填写设备名称,选择刚才创建的网关和模板即可(检查一下驱动根配置的“driver”参数,若在创建模板时已按要求配置,则现在无需修改,否则则需要在此进行配置,配置标准见创建模板),“driver”参数没有问题之后需要在“param”参数中填入海康门禁的相关信息,具体要求如图所示,其中”HK_PORT”为门禁的TCP端口一般都为8000可到门禁的配置里面查看。
创建数据点,点击【我的设备】 -> 【通信网关】 -> 【设备实例】 -> 【数据点】,并在【设备实例】下拉列表选择刚刚创建的设备实例,然后进行数据点的创建。详见系统设备管理-数据点。首先删除系统自带的那四个数据点(设备持续在线时长、网关持续运行时长、设备网络状态、网关网络状态),然后建立驱动需要的数据点,如下图注意数据点名称和数据类型:
另外还需要对数据点的属性进行配置以便于驱动识别,需要配置的是“采集情况”和“采集开关”两个数据点,分别点击这两个数据点操作栏的编辑按钮,再点击上面的高级配置栏,在“param”参数中添加一个名为“private”的参数,它的值分别配置为“status”和“switch”。如下图所示:
数据点采集情况的配置
数据点采集开关的配置
4.7 运行驱动
依次点击【系统设备】 -> 【通信网关】,点击刚才创建的网关操作栏中的“高级”按钮
开启云网关,输入账号的密码。开启成功后驱动已经正常运行
4.8 查看运行结果
查看数据点。点击【我的设备】 -> 【通信网关】 -> 【设备实例】 -> 【数据点】,并在【设备实例】下拉列表选择刚刚创建的设备实例,此时可以看到数据点“采集情况”显示登录成功,说明已经能够对门禁进行相应的操作了。
点击采集开关右侧的下发按钮,将开关设置为开启,即开始进行人脸采集,采集时长为5秒钟,我们最后在采集开关开启前站到m若此时采集成功则会自动下发至门禁设备中并且在中台中进行显示。
采集成功情况。
5 代码解析
5.1 完整代码
#!coding:utf8
import json
import sys
sys.path.append("..")
from driver import *
import os
import platform
import time
import requests
from requests.auth import HTTPDigestAuth
import ctypes
reload(sys)
sys.setdefaultencoding('utf8')
from ctypes import *
# 定义登录结构体
class NET_DVR_DEVICEINFO_V30(Structure):
pass
LPNET_DVR_DEVICEINFO_V30 = POINTER(NET_DVR_DEVICEINFO_V30)
NET_DVR_DEVICEINFO_V30._fields_ = [
('sSerialNumber', c_ubyte * 48),
('byAlarmInPortNum', c_ubyte),
('byAlarmOutPortNum', c_ubyte),
('byDiskNum', c_ubyte),
('byDVRType', c_ubyte),
('byChanNum', c_ubyte),
('byStartChan', c_ubyte),
('byAudioChanNum', c_ubyte),
('byIPChanNum', c_ubyte),
('byZeroChanNum', c_ubyte),
('byMainProto', c_ubyte),
('bySubProto', c_ubyte),
('bySupport', c_ubyte),
('bySupport1', c_ubyte),
('bySupport2', c_ubyte),
('wDevType', c_ushort),
('bySupport3', c_ubyte),
('byMultiStreamProto', c_ubyte),
('byStartDChan', c_ubyte),
('byStartDTalkChan', c_ubyte),
('byHighDChanNum', c_ubyte),
('bySupport4', c_ubyte),
('byLanguageType', c_ubyte),
('byVoiceInChanNum', c_ubyte),
('byStartVoiceInChanNo', c_ubyte),
('bySupport5', c_ubyte),
('bySupport6', c_ubyte),
('byMirrorChanNum', c_ubyte),
('wStartMirrorChanNo', c_ushort),
('bySupport7', c_ubyte),
('byRes2', c_ubyte),
]
# 定义组件库加载路径信息结构体
class NET_DVR_LOCAL_SDK_PATH(Structure):
pass
LPNET_DVR_LOCAL_SDK_PATH = POINTER(NET_DVR_LOCAL_SDK_PATH)
NET_DVR_LOCAL_SDK_PATH._fields_ = [
('sPath', c_char * 256),
('byRes', c_ubyte * 128),
]
# 抓图数据结构体
class NET_DVR_JPEGPARA(Structure):
_fields_ = [
("wPicSize", c_ushort),
("wPicQuality", c_ushort)]
# 定义预览参数结构体
class NET_DVR_PREVIEWINFO(Structure):
pass
LPNET_DVR_PREVIEWINFO = POINTER(NET_DVR_PREVIEWINFO)
NET_DVR_PREVIEWINFO._fields_ = [
('lChannel', c_long),
('dwStreamType', c_ulong),
('dwLinkMode', c_ulong),
('hPlayWnd', c_void_p),
('bBlocked', c_ulong),
('bPassbackRecord', c_ulong),
('byPreviewMode', c_ubyte),
('byStreamID', c_ubyte * 32),
('byProtoType', c_ubyte),
('byRes1', c_ubyte),
('byVideoCodingType', c_ubyte),
('dwDisplayBufNum', c_ulong),
('byRes', c_ubyte * 216),
]
#采集人脸信息条件参数结构体
class NET_DVR_CAPTURE_FACE_COND(Structure):
pass
LPNET_DVR_CAPTURE_FACE_COND=POINTER(NET_DVR_CAPTURE_FACE_COND)
NET_DVR_CAPTURE_FACE_COND._fields_ = [
('dwSize', c_uint),
('byRes', c_ubyte * 128),
]
NET_DVR_CAPTURE_FACE_INFO=2510 #远程人脸采集
# 区域框参数结构体
class NET_VCA_RECT(Structure):
pass
LPNET_VCA_RECT=POINTER(NET_VCA_RECT)
NET_VCA_RECT._fields_ = [
('fX', c_float),
('fY', c_float),
('fWidth', c_float),
('fHeight', c_float),
]
# 点坐标参数结构体
class NET_VCA_POINT(Structure):
pass
LPNET_VCA_POINT=POINTER(NET_VCA_POINT)
NET_VCA_POINT._fields_ = [
('fX', c_float),
('fY', c_float),
]
# 人脸属性结构体
class NET_DVR_FACE_FEATURE(Structure):
pass
LPNET_DVR_FACE_FEATURE=POINTER(NET_DVR_FACE_FEATURE)
NET_DVR_FACE_FEATURE._fields_ = [
('struFace', NET_VCA_RECT),
('struLeftEye', NET_VCA_POINT),
('struRightEye', NET_VCA_POINT),
('struLeftMouth', NET_VCA_POINT),
('struRightMouth', NET_VCA_POINT),
('struNoseTip', NET_VCA_POINT),
]
# 人脸信息采集结果结构体
class NET_DVR_CAPTURE_FACE_CFG(Structure):
pass
LPNET_DVR_CAPTURE_FACE_CFG=POINTER(NET_DVR_CAPTURE_FACE_CFG)
NET_DVR_CAPTURE_FACE_CFG._fields_ = [
('dwSize', c_uint),
('dwFaceTemplate1Size', c_uint),
('pFaceTemplate1Buffer', c_char_p),
('dwFaceTemplate2Size', c_uint),
('pFaceTemplate2Buffer', c_char_p),
('dwFacePicSize', c_uint),
('pFacePicBuffer', c_char_p),
('byFaceQuality1', c_ubyte),
('byFaceQuality2', c_ubyte),
('byCaptureProgress', c_ubyte),
('byFacePicQuality', c_ubyte),
('dwInfraredFacePicSize', c_uint),
('pInfraredFacePicBuffer', c_char_p),
('byInfraredFacePicQuality', c_ubyte),
('byRes1', c_ubyte * 3),
('struFeature', NET_DVR_FACE_FEATURE),
('byRes', c_ubyte * 56),
]
NET_SDK_CALLBACK_TYPE_STATUS = 0
NET_SDK_CALLBACK_TYPE_PROGRESS = 1
NET_SDK_CALLBACK_TYPE_DATA = 2
# 状态回调函数
STATECALLBACK = CFUNCTYPE(None, c_uint, c_void_p, c_uint, c_void_p)
def stateback(dwType, lpBuffer, dwBufLen, pUserData):
'''
状态回调函数
'''
if dwType == NET_SDK_CALLBACK_TYPE_STATUS:
pass
elif dwType == NET_SDK_CALLBACK_TYPE_PROGRESS:
pass
elif dwType == NET_SDK_CALLBACK_TYPE_DATA:
lpBuffer = NET_DVR_CAPTURE_FACE_CFG()
cbStateCallback = STATECALLBACK(stateback)
win = None
funcRealDataCallBack_V30 = None
funcfRemoteConfigCallback = None
# HCNetSDK库文件路径
HCNETSDK_DLL_PATH = r'HCNetSDK.dll'
HCNETSDK_DLL_PATH_LINUX = r'/linux_lib/libhcnetsdk.so'
HCNETSDK_DLL_PATH_LINUX_one = r'/linux_lib/libcrypto.so'
#HCNETSDK_DLL_PATH_LINUX_two = r'/libssl.so'
HCNETSDK_DLL_PATH_LINUX_two = r'/linux_lib/libssl.so'
def GetPlatform():
sysstr = platform.system()
if sysstr != "Windows":
global WINDOWS_FLAG
WINDOWS_FLAG = False
def LoadHCNetSDK():
'''
加载HCNetSDK库
'''
if not WINDOWS_FLAG:
os.getcwd()
Objdll = cdll.LoadLibrary(os.getcwd() + HCNETSDK_DLL_PATH_LINUX)
else:
os.chdir('D:\iotos\hk_py\lib')
Objdll = WinDLL(HCNETSDK_DLL_PATH)
return Objdll
def InitHCNetSDK(self,Objdll):
'''
初始化HCNetSDK库
'''
if not WINDOWS_FLAG:
self.debug('loading')
sdk_path = NET_DVR_LOCAL_SDK_PATH()
sdk_path.sPath = os.path.dirname(HCNETSDK_DLL_PATH_LINUX).encode('utf-8')
flag1 = Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_path))
self.debug("libhcnetsdk.so loads successfully")
sdk_path.sPath=(os.getcwd() + HCNETSDK_DLL_PATH_LINUX_one).encode('utf-8')
flag2 = Objdll.NET_DVR_SetSDKInitCfg(3, byref(sdk_path))
self.debug("libcrypto.so loads successfully")
sdk_path.sPath=("/usr/lib/x86_64-linux-gnu/libssl.so").encode('utf-8')
#Objdll.NET_DVR_SetSDKInitCfg(4, byref(sdk_path))
flag3 = Objdll.NET_DVR_SetSDKInitCfg(4, "/usr/lib/x86_64-linux-gnu/libssl.so")
self.debug("libssl.so loads successfully")
# 初始化DLL
falg = Objdll.NET_DVR_Init()
self.debug(falg)
self.debug(Objdll.NET_DVR_GetLastError())
# 设置日志路径
if not WINDOWS_FLAG:
Objdll.NET_DVR_SetLogToFile(3, create_string_buffer(b'/home/sdklog'), True)
else:
Objdll.NET_DVR_SetLogToFile(3, create_string_buffer(b'G:\\SdkLog'), True)
# 设置设备超时时间
# Objdll.NET_DVR_SetConnectTime(int(1000), 1)
def LoginDev(self,Objdll):
'''
登录设备
'''
device_info = NET_DVR_DEVICEINFO_V30()
self.debug(self.HK_username)
self.debug(self.HK_password)
os.chdir('/home/IOTOSDK_python/sdk/_examples/linux_lib')
lUserId = Objdll.NET_DVR_Login_V30(create_string_buffer(self.HK_ip), self.HK_port, create_string_buffer(self.HK_username) , create_string_buffer(self.HK_password), byref(device_info))
return (lUserId, device_info)
def RemoteConfigCallback(dwType, lpBuffer, dwBufLen, pUserData):
"""
远程配置回调函数
"""
print(pUserData)
print(dwBufLen)
# jpeg抓图hPlayWnd显示窗口能为none,存在缺点采集图片速度慢
def Get_JPEGpicture(self, lUserID, lChannel=1):
picPath = "../../../../iotos_paas"
picName = "/media/mj/"+ time.strftime('%Y-%m-%d_%H-%M-%S') + "_face.jpg"
picStr = picPath + picName
self.debug(picStr)
with open(picStr, mode="w") as f:
self.debug("success!!!!!!!!!!!!!!")
sJpegPicFileName = bytes(picStr)
lpJpegPara = NET_DVR_JPEGPARA()
lpJpegPara.wPicSize = 0
lpJpegPara.wPicQuality = 0
if (self.Objdll.NET_DVR_CaptureJPEGPicture(lUserID, lChannel, byref(lpJpegPara), sJpegPicFileName) == False):
error_info = self.Objdll.NET_DVR_GetLastError()
print("抓图失败:" + str(error_info))
else:
print("抓图成功")
picURL = "" + picName #需要在这里修改一下ip改为中台的ip
self.debug(picURL)
return picURL, picStr
def uploadImage(self,file_path, strname, streno):
file_name = file_path.split("/")[-1]
url = "http://"+self.hk_ip+"/ISAPI/Intelligent/FDLib/FaceDataRecord?format=json"
payload={'FaceDataRecord': '{"faceLibType": "blackFD","FDID": "1","FPID": "'+streno+'","name": "'+strname+'"}'}
files=[
('FaceImage',(file_name, open(file_path,'rb'), 'image/jpeg'))
]
headers = {}
response = requests.post(url, auth = HTTPDigestAuth(self.hk_name, self.hk_pwd), headers=headers, data=payload, files=files)
#return json.loads(response.text)["statusCode"]
return response
def newperson(self,pname, eno, Endtime=None):
if not Endtime:
Endtime = "2030-01-01T00:00:00"
data = {
"UserInfo":{
"Valid":{
"beginTime":time.strftime('%Y-%m-%dT%H:%M:%S'),
"enable":True,
"endTime":Endtime
},
"checkUser":False,
"doorRight":"1",
"RightPlan":[
{
"doorNo": 1,
"planTemplateNo": "1,3,5"
}
],
"employeeNo":eno,
"floorNumber":1,
"maxOpenDoorTime":0,
"name":pname,
"openDelayEnabled":False,
"password":"123456",
"roomNumber":1,
"userType":"normal"
}
}
r = requests.post("http://"+self.hk_ip+"/ISAPI/AccessControl/UserInfo/Record?format=json", auth = HTTPDigestAuth(self.hk_name, self.hk_pwd), json =data)
return json.loads(r.text)["statusCode"]
class FaceDriver(IOTOSDriverI):
# 1、通信初始化
def InitComm(self,attrs):
# 获取设备实例参数
try:
self.HK_ip = bytes(self.sysAttrs['config']['param']['HK_ip'].encode('utf-8'))
self.HK_port = self.sysAttrs['config']['param']['HK_port']
self.HK_username = bytes(self.sysAttrs['config']['param']['HK_username'].encode('utf-8'))
self.HK_password = bytes(self.sysAttrs['config']['param']['HK_password'].encode('utf-8'))
self.debug(type(self.HK_ip))
self.debug(self.HK_port)
self.online(True)
self.setPauseCollect(False)
self.setCollectingOneCircle(False)
self.picstr = ""
hip = self.sysAttrs['config']['param']['HK_ip']
self.hk_ip = hip.encode()
self.debug(type(self.hk_ip))
self.hk_name = self.sysAttrs['config']['param']['HK_username'].encode('utf-8')
self.hk_pwd = self.sysAttrs['config']['param']['HK_password'].encode('utf-8')
# 获取系统平台
GetPlatform()
# load library
(sdkPath, tempfilename) = os.path.split(HCNETSDK_DLL_PATH)
# 加载HCNetSDK库
self.Objdll = LoadHCNetSDK()
# 初始化HCNetSDK库
InitHCNetSDK(self, self.Objdll)
self.debug(self.Objdll.NET_DVR_GetLastError())
# 登录设备
(self.lUserId, device_info) = LoginDev(self,self.Objdll)
self.debug(self.lUserId)
if self.lUserId<0:
self.debug(self.Objdll.NET_DVR_GetLastError())
self.setValue(u'采集情况', '登录失败')
# 释放资源
self.Objdll.NET_DVR_Cleanup()
self.debug(self.lUserId)
return json.dumps({'code': 0, 'msg': '', 'data': ''})
else:
self.debug(self.lUserId)
self.setValue(u'采集情况', '登录成功')
self.debug(os.getcwd())
sdk_path = NET_DVR_LOCAL_SDK_PATH()
HCNETSDK_DLL_PATH_LINUX = r'/'
sdk_path.sPath=(os.getcwd()+HCNETSDK_DLL_PATH_LINUX).encode('utf-8')
flag1 = self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_path))
self.debug("successfully")
except Exception,e:
self.debug('初始化失败')
self.Objdll.NET_DVR_Cleanup()
# #2、采集引擎回调
def Collecting(self, dataId):
for key, value in self.data2attrs.items(): # 拿出数据点的键和值
if "private" in value['config']['param']:
# 在private中识别开关控制的反控标签
if value['config']['param']['private'] == "switch":
self.key_s = key # 将下发数据点的dataid保存到self中,后续Event_setData中判断下发数据点时需要用到
# 开关赋值》》》》》》》》》》防止页面刷新而导致开关的值消失造成不能下发数据,利用memoryvalue判断是否有数据上传以及上一次上传的数据是多少,下同
if "memoryvalue" not in self.data2attrs[key] or self.data2attrs[key]["memoryvalue"]==None:
self.setValue(u'采集开关', 0)
else:
self.debug(self.data2attrs[key])
self.setValue(u'采集开关', int(self.data2attrs[key]["memoryvalue"]))
if value['config']['param']['private'] == "statue":
# 开关赋值》》》》》》》》》》防止页面刷新而导致开关的值消失造成不能下发数据,利用memoryvalue判断是否有数据上传以及上一次上传的数据是多少,下同
if "memoryvalue" not in self.data2attrs[key]:
pass
else:
self.setValue(u'采集情况', self.data2attrs[key]["memoryvalue"])
if value['config']['param']['private'] == "person":
self.key_p = key
if "memoryvalue" not in self.data2attrs[key]:
self.setValue(u"新建人员", "例:飞飞,20001,2030-08-01")
else:
self.setValue(u'新建人员', self.data2attrs[key]["memoryvalue"])
if value['config']['param']['private'] == "modifytime":
self.key_f = key
if "memoryvalue" not in self.data2attrs[key]:
self.setValue(u"修改权限时间", "例:工号,2021-09-12 14:00,2021-09-12 15:00")
else:
self.setValue(u'修改权限时间', self.data2attrs[key]["memoryvalue"])
time.sleep(0.5)
return ()
# 5、控制事件回调,数据点控制访问
def Event_setData(self, dataId, value):
self.debug(123456456123)
self.debug(os.getcwd())
try:
if dataId==self.key_s and value==1:
# 登录设备
lpInBuffer = NET_DVR_CAPTURE_FACE_COND()
lpInBuffer.dwSize = sizeof(lpInBuffer)
self.debug(lpInBuffer.dwSize)
dwInBufferLen = sizeof(lpInBuffer)
self.HK_ret = self.Objdll.NET_DVR_StartRemoteConfig(self.lUserId, NET_DVR_CAPTURE_FACE_INFO, byref(lpInBuffer), dwInBufferLen, None, None)
self.debug(self.HK_ret)
self.debug(self.Objdll.NET_DVR_GetLastError())
if self.HK_ret < 0:
#self.debug('fail, error code is:'+ self.Objdll.NET_DVR_GetLastError())
# 关闭长连接
rtu = self.Objdll.NET_DVR_StopRemoteConfig(self.HK_ret)
self.debug("关闭长连接!")
self.debug(self.Objdll.NET_DVR_GetLastError())
# 登出设备
self.Objdll.NET_DVR_Logout(self.lUserId)
# 释放资源
self.Objdll.NET_DVR_Cleanup()
return json.dumps({'code':0, 'msg':'', 'data':''})
try:
# 循环获取人脸采集结果
btime = time.time()
while True:
#btime = time.time()
lpOutBuff = NET_DVR_CAPTURE_FACE_CFG()
lpOutBuff.dwSize = sizeof(lpOutBuff)
dwOutBuffSize = sizeof(lpOutBuff)
res = self.Objdll.NET_DVR_GetNextRemoteConfig(self.HK_ret, byref(lpOutBuff), dwOutBuffSize)
self.debug(res)
self.debug(self.Objdll.NET_DVR_GetLastError())
if res == 1000:
self.setValue(u'采集情况', '采集成功')
picurl, picstr = Get_JPEGpicture(self, self.lUserId)
self.picstr = picstr
self.debug(self.picstr)
self.setValue(u'采集开关', 0)
self.setValue(u'人脸图片中人脸质量',lpOutBuff.byFacePicQuality)
self.setValue(u'人脸图片数据大小',lpOutBuff.dwFacePicSize)
self.setValue(u'人脸图片URL', picurl)
break
elif res == 1002:
# print("采集结束")
self.setValue(u'采集情况', '采集结束')
picurl, picstr = Get_JPEGpicture(self, self.lUserId)
self.picstr = picstr
self.debug(self.picstr)
self.setValue(u'采集开关', 0)
self.setValue(u'人脸图片中人脸质量', lpOutBuff.byFacePicQuality)
self.setValue(u'人脸图片数据大小', lpOutBuff.dwFacePicSize)
self.setValue(u'人脸图片URL', picurl)
break
elif res == 1003 or res == 1004:
self.setValue(u'采集情况', '采集超时,请重新开启采集')
self.setValue(u'采集开关', 0)
# self.setValue(u'人脸图片URL', picstr)\
self.setValue(u'人脸图片URL', "")
self.setValue(u'人脸图片中人脸质量', "")
self.setValue(u'人脸图片数据大小', "")
break
elif res == 0:
self.setValue(u'采集情况', '数据返回失败')
self.setValue(u'采集开关', 0)
elif res == 1001:
etime = time.time()
self.debug(type(btime))
self.debug(etime-btime)
if (etime - btime) > 5:
self.setValue(u'采集情况', '采集超时!请重新开启采集')
self.setValue(u'采集开关', 0)
self.setValue(u'人脸图片URL', "")
self.setValue(u'人脸图片中人脸质量', "")
self.setValue(u'人脸图片数据大小', "")
res = 1003
break
continue
except Exception, e:
self.debug(e)
return json.dumps({'code': 0, 'msg': '', 'data': ''})
# 关闭长连接
rtu = self.Objdll.NET_DVR_StopRemoteConfig(self.HK_ret)
self.debug("关闭长连接!")
self.debug(self.Objdll.NET_DVR_GetLastError())
# 登出设备
#self.Objdll.NET_DVR_Logout(self.lUserId)
self.setValue(u'采集开关',0)
if res == 1000 or res ==1002:
thistime = int(time.mktime(time.localtime()))
self.strname = str(thistime)
self.streno = str(thistime)
result1 = newperson(self,self.strname, self.streno)
self.debug(result1)
result2 = uploadImage(self ,self.picstr, self.strname, self.streno)
self.debug(result2)
return json.dumps({'code': 0, 'msg': '', 'data': ''})
except Exception,e:
self.debug(u'>>>>>>>>>error'+e)
return json.dumps({'code':0, 'msg':'', 'data':''})
- 导入包
导入IOTOS的驱动包和海康门禁所需要的辅助通讯包
#!coding:utf8
import json
import sys
sys.path.append("..")
from driver import *
import os
import platform
import time
import requests
from requests.auth import HTTPDigestAuth
import ctypes
reload(sys)
sys.setdefaultencoding('utf8')
from ctypes import *
- 定义结构体
定义与海康门禁所需要的结构体和全局变量,包括登录、初始化等
# 定义登录结构体
class NET_DVR_DEVICEINFO_V30(Structure):
pass
LPNET_DVR_DEVICEINFO_V30 = POINTER(NET_DVR_DEVICEINFO_V30)
NET_DVR_DEVICEINFO_V30._fields_ = [
('sSerialNumber', c_ubyte * 48),
('byAlarmInPortNum', c_ubyte),
('byAlarmOutPortNum', c_ubyte),
('byDiskNum', c_ubyte),
('byDVRType', c_ubyte),
('byChanNum', c_ubyte),
('byStartChan', c_ubyte),
('byAudioChanNum', c_ubyte),
('byIPChanNum', c_ubyte),
('byZeroChanNum', c_ubyte),
('byMainProto', c_ubyte),
('bySubProto', c_ubyte),
('bySupport', c_ubyte),
('bySupport1', c_ubyte),
('bySupport2', c_ubyte),
('wDevType', c_ushort),
('bySupport3', c_ubyte),
('byMultiStreamProto', c_ubyte),
('byStartDChan', c_ubyte),
('byStartDTalkChan', c_ubyte),
('byHighDChanNum', c_ubyte),
('bySupport4', c_ubyte),
('byLanguageType', c_ubyte),
('byVoiceInChanNum', c_ubyte),
('byStartVoiceInChanNo', c_ubyte),
('bySupport5', c_ubyte),
('bySupport6', c_ubyte),
('byMirrorChanNum', c_ubyte),
('wStartMirrorChanNo', c_ushort),
('bySupport7', c_ubyte),
('byRes2', c_ubyte),
]
# 定义组件库加载路径信息结构体
class NET_DVR_LOCAL_SDK_PATH(Structure):
pass
LPNET_DVR_LOCAL_SDK_PATH = POINTER(NET_DVR_LOCAL_SDK_PATH)
NET_DVR_LOCAL_SDK_PATH._fields_ = [
('sPath', c_char * 256),
('byRes', c_ubyte * 128),
]
# 抓图数据结构体
class NET_DVR_JPEGPARA(Structure):
_fields_ = [
("wPicSize", c_ushort),
("wPicQuality", c_ushort)]
# 定义预览参数结构体
class NET_DVR_PREVIEWINFO(Structure):
pass
LPNET_DVR_PREVIEWINFO = POINTER(NET_DVR_PREVIEWINFO)
NET_DVR_PREVIEWINFO._fields_ = [
('lChannel', c_long),
('dwStreamType', c_ulong),
('dwLinkMode', c_ulong),
('hPlayWnd', c_void_p),
('bBlocked', c_ulong),
('bPassbackRecord', c_ulong),
('byPreviewMode', c_ubyte),
('byStreamID', c_ubyte * 32),
('byProtoType', c_ubyte),
('byRes1', c_ubyte),
('byVideoCodingType', c_ubyte),
('dwDisplayBufNum', c_ulong),
('byRes', c_ubyte * 216),
]
#采集人脸信息条件参数结构体
class NET_DVR_CAPTURE_FACE_COND(Structure):
pass
LPNET_DVR_CAPTURE_FACE_COND=POINTER(NET_DVR_CAPTURE_FACE_COND)
NET_DVR_CAPTURE_FACE_COND._fields_ = [
('dwSize', c_uint),
('byRes', c_ubyte * 128),
]
NET_DVR_CAPTURE_FACE_INFO=2510 #远程人脸采集
# 区域框参数结构体
class NET_VCA_RECT(Structure):
pass
LPNET_VCA_RECT=POINTER(NET_VCA_RECT)
NET_VCA_RECT._fields_ = [
('fX', c_float),
('fY', c_float),
('fWidth', c_float),
('fHeight', c_float),
]
# 点坐标参数结构体
class NET_VCA_POINT(Structure):
pass
LPNET_VCA_POINT=POINTER(NET_VCA_POINT)
NET_VCA_POINT._fields_ = [
('fX', c_float),
('fY', c_float),
]
# 人脸属性结构体
class NET_DVR_FACE_FEATURE(Structure):
pass
LPNET_DVR_FACE_FEATURE=POINTER(NET_DVR_FACE_FEATURE)
NET_DVR_FACE_FEATURE._fields_ = [
('struFace', NET_VCA_RECT),
('struLeftEye', NET_VCA_POINT),
('struRightEye', NET_VCA_POINT),
('struLeftMouth', NET_VCA_POINT),
('struRightMouth', NET_VCA_POINT),
('struNoseTip', NET_VCA_POINT),
]
# 人脸信息采集结果结构体
class NET_DVR_CAPTURE_FACE_CFG(Structure):
pass
LPNET_DVR_CAPTURE_FACE_CFG=POINTER(NET_DVR_CAPTURE_FACE_CFG)
NET_DVR_CAPTURE_FACE_CFG._fields_ = [
('dwSize', c_uint),
('dwFaceTemplate1Size', c_uint),
('pFaceTemplate1Buffer', c_char_p),
('dwFaceTemplate2Size', c_uint),
('pFaceTemplate2Buffer', c_char_p),
('dwFacePicSize', c_uint),
('pFacePicBuffer', c_char_p),
('byFaceQuality1', c_ubyte),
('byFaceQuality2', c_ubyte),
('byCaptureProgress', c_ubyte),
('byFacePicQuality', c_ubyte),
('dwInfraredFacePicSize', c_uint),
('pInfraredFacePicBuffer', c_char_p),
('byInfraredFacePicQuality', c_ubyte),
('byRes1', c_ubyte * 3),
('struFeature', NET_DVR_FACE_FEATURE),
('byRes', c_ubyte * 56),
]
- 定义初始化、登录、回调、抓图等函数
NET_SDK_CALLBACK_TYPE_STATUS = 0
NET_SDK_CALLBACK_TYPE_PROGRESS = 1
NET_SDK_CALLBACK_TYPE_DATA = 2
# 状态回调函数
STATECALLBACK = CFUNCTYPE(None, c_uint, c_void_p, c_uint, c_void_p)
def stateback(dwType, lpBuffer, dwBufLen, pUserData):
'''
状态回调函数
'''
if dwType == NET_SDK_CALLBACK_TYPE_STATUS:
pass
elif dwType == NET_SDK_CALLBACK_TYPE_PROGRESS:
pass
elif dwType == NET_SDK_CALLBACK_TYPE_DATA:
lpBuffer = NET_DVR_CAPTURE_FACE_CFG()
cbStateCallback = STATECALLBACK(stateback)
win = None
funcRealDataCallBack_V30 = None
funcfRemoteConfigCallback = None
# HCNetSDK库文件路径
HCNETSDK_DLL_PATH = r'HCNetSDK.dll'
HCNETSDK_DLL_PATH_LINUX = r'/linux_lib/libhcnetsdk.so'
HCNETSDK_DLL_PATH_LINUX_one = r'/linux_lib/libcrypto.so'
#HCNETSDK_DLL_PATH_LINUX_two = r'/libssl.so'
HCNETSDK_DLL_PATH_LINUX_two = r'/linux_lib/libssl.so'
def GetPlatform():
sysstr = platform.system()
if sysstr != "Windows":
global WINDOWS_FLAG
WINDOWS_FLAG = False
def LoadHCNetSDK():
'''
加载HCNetSDK库
'''
if not WINDOWS_FLAG:
os.getcwd()
Objdll = cdll.LoadLibrary(os.getcwd() + HCNETSDK_DLL_PATH_LINUX)
else:
os.chdir('D:\iotos\hk_py\lib')
Objdll = WinDLL(HCNETSDK_DLL_PATH)
return Objdll
def InitHCNetSDK(self,Objdll):
'''
初始化HCNetSDK库
'''
if not WINDOWS_FLAG:
self.debug('loading')
sdk_path = NET_DVR_LOCAL_SDK_PATH()
sdk_path.sPath = os.path.dirname(HCNETSDK_DLL_PATH_LINUX).encode('utf-8')
flag1 = Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_path))
self.debug("libhcnetsdk.so loads successfully")
sdk_path.sPath=(os.getcwd() + HCNETSDK_DLL_PATH_LINUX_one).encode('utf-8')
flag2 = Objdll.NET_DVR_SetSDKInitCfg(3, byref(sdk_path))
self.debug("libcrypto.so loads successfully")
sdk_path.sPath=("/usr/lib/x86_64-linux-gnu/libssl.so").encode('utf-8')
#Objdll.NET_DVR_SetSDKInitCfg(4, byref(sdk_path))
flag3 = Objdll.NET_DVR_SetSDKInitCfg(4, "/usr/lib/x86_64-linux-gnu/libssl.so")
self.debug("libssl.so loads successfully")
# 初始化DLL
falg = Objdll.NET_DVR_Init()
self.debug(falg)
self.debug(Objdll.NET_DVR_GetLastError())
# 设置日志路径
if not WINDOWS_FLAG:
Objdll.NET_DVR_SetLogToFile(3, create_string_buffer(b'/home/sdklog'), True)
else:
Objdll.NET_DVR_SetLogToFile(3, create_string_buffer(b'G:\\SdkLog'), True)
# 设置设备超时时间
# Objdll.NET_DVR_SetConnectTime(int(1000), 1)
def LoginDev(self,Objdll):
'''
登录设备
'''
device_info = NET_DVR_DEVICEINFO_V30()
self.debug(self.HK_username)
self.debug(self.HK_password)
os.chdir('/home/IOTOSDK_python/sdk/_examples/linux_lib')
lUserId = Objdll.NET_DVR_Login_V30(create_string_buffer(self.HK_ip), self.HK_port, create_string_buffer(self.HK_username) , create_string_buffer(self.HK_password), byref(device_info))
return (lUserId, device_info)
def RemoteConfigCallback(dwType, lpBuffer, dwBufLen, pUserData):
"""
远程配置回调函数
"""
print(pUserData)
print(dwBufLen)
# jpeg抓图hPlayWnd显示窗口能为none,存在缺点采集图片速度慢
def Get_JPEGpicture(self, lUserID, lChannel=1):
picPath = "../../../../iotos_paas"
picName = "/media/mj/"+ time.strftime('%Y-%m-%d_%H-%M-%S') + "_face.jpg"
picStr = picPath + picName
self.debug(picStr)
with open(picStr, mode="w") as f:
self.debug("success!!!!!!!!!!!!!!")
sJpegPicFileName = bytes(picStr)
lpJpegPara = NET_DVR_JPEGPARA()
lpJpegPara.wPicSize = 0
lpJpegPara.wPicQuality = 0
if (self.Objdll.NET_DVR_CaptureJPEGPicture(lUserID, lChannel, byref(lpJpegPara), sJpegPicFileName) == False):
error_info = self.Objdll.NET_DVR_GetLastError()
print("抓图失败:" + str(error_info))
else:
print("抓图成功")
picURL = "" + picName #需要在这里修改一下ip改为中台的ip
self.debug(picURL)
return picURL, picStr
- 定义图片上传和人脸创建的函数
def uploadImage(self,file_path, strname, streno):
file_name = file_path.split("/")[-1]
url = "http://"+self.hk_ip+"/ISAPI/Intelligent/FDLib/FaceDataRecord?format=json"
payload={'FaceDataRecord': '{"faceLibType": "blackFD","FDID": "1","FPID": "'+streno+'","name": "'+strname+'"}'}
files=[
('FaceImage',(file_name, open(file_path,'rb'), 'image/jpeg'))
]
headers = {}
response = requests.post(url, auth = HTTPDigestAuth(self.hk_name, self.hk_psw), headers=headers, data=payload, files=files)
#return json.loads(response.text)["statusCode"]
return response
def newperson(self,pname, eno, Endtime=None):
if not Endtime:
Endtime = "2030-01-01T00:00:00"
data = {
"UserInfo":{
"Valid":{
"beginTime":time.strftime('%Y-%m-%dT%H:%M:%S'),
"enable":True,
"endTime":Endtime
},
"checkUser":False,
"doorRight":"1",
"RightPlan":[
{
"doorNo": 1,
"planTemplateNo": "1,3,5"
}
],
"employeeNo":eno,
"floorNumber":1,
"maxOpenDoorTime":0,
"name":pname,
"openDelayEnabled":False,
"password":"123456",
"roomNumber":1,
"userType":"normal"
}
}
r = requests.post("http://"+self.hk_ip+"/ISAPI/AccessControl/UserInfo/Record?format=json", auth = HTTPDigestAuth(self.hk_name, self.hk_psw), json =data)
return json.loads(r.text)["statusCode"]
- 定义驱动函数,对通讯进行初始化。
获取中台配置的参数,对通讯资源进行加载和初始化,进行登录,为后续对门禁进行操作做初始化。
class FaceDriver(IOTOSDriverI):
# 1、通信初始化
def InitComm(self,attrs):
# 获取设备实例参数
try:
self.HK_ip = bytes(self.sysAttrs['config']['param']['HK_ip'].encode('utf-8'))
self.HK_port = self.sysAttrs['config']['param']['HK_port']
self.HK_username = bytes(self.sysAttrs['config']['param']['HK_username'].encode('utf-8'))
self.HK_password = bytes(self.sysAttrs['config']['param']['HK_password'].encode('utf-8'))
self.debug(type(self.HK_ip))
self.debug(self.HK_port)
self.online(True)
self.setPauseCollect(False)
self.setCollectingOneCircle(False)
self.picstr = ""
hip = self.sysAttrs['config']['param']['HK_ip']
self.hk_ip = hip.encode()
self.debug(type(self.hk_ip))
self.hk_name = self.sysAttrs['config']['param']['HK_username'].encode('utf-8')
self.hk_pwd = self.sysAttrs['config']['param']['HK_password'].encode('utf-8')
# 获取系统平台
GetPlatform()
# load library
(sdkPath, tempfilename) = os.path.split(HCNETSDK_DLL_PATH)
# 加载HCNetSDK库
self.Objdll = LoadHCNetSDK()
# 初始化HCNetSDK库
InitHCNetSDK(self, self.Objdll)
self.debug(self.Objdll.NET_DVR_GetLastError())
# 登录设备
(self.lUserId, device_info) = LoginDev(self,self.Objdll)
self.debug(self.lUserId)
if self.lUserId<0:
self.debug(self.Objdll.NET_DVR_GetLastError())
self.setValue(u'采集情况', '登录失败')
# 释放资源
self.Objdll.NET_DVR_Cleanup()
self.debug(self.lUserId)
return json.dumps({'code': 0, 'msg': '', 'data': ''})
else:
self.debug(self.lUserId)
self.setValue(u'采集情况', '登录成功')
self.debug(os.getcwd())
sdk_path = NET_DVR_LOCAL_SDK_PATH()
HCNETSDK_DLL_PATH_LINUX = r'/'
sdk_path.sPath=(os.getcwd()+HCNETSDK_DLL_PATH_LINUX).encode('utf-8')
flag1 = self.Objdll.NET_DVR_SetSDKInitCfg(2, byref(sdk_path))
self.debug("successfully")
except Exception,e:
self.debug('初始化失败')
self.Objdll.NET_DVR_Cleanup()
循环采集
对数据点进行初始化赋值,方便进行下发。# #2、采集引擎回调 def Collecting(self, dataId): for key, value in self.data2attrs.items(): # 拿出数据点的键和值 if "private" in value['config']['param']: # 在private中识别开关控制的反控标签 if value['config']['param']['private'] == "switch": self.key_s = key # 将下发数据点的dataid保存到self中,后续Event_setData中判断下发数据点时需要用到 # 开关赋值》》》》》》》》》》防止页面刷新而导致开关的值消失造成不能下发数据,利用memoryvalue判断是否有数据上传以及上一次上传的数据是多少,下同 if "memoryvalue" not in self.data2attrs[key] or self.data2attrs[key]["memoryvalue"]==None: self.setValue(u'采集开关', 0) else: self.debug(self.data2attrs[key]) self.setValue(u'采集开关', int(self.data2attrs[key]["memoryvalue"])) if value['config']['param']['private'] == "statue": # 开关赋值》》》》》》》》》》防止页面刷新而导致开关的值消失造成不能下发数据,利用memoryvalue判断是否有数据上传以及上一次上传的数据是多少,下同 if "memoryvalue" not in self.data2attrs[key]: pass else: self.setValue(u'采集情况', self.data2attrs[key]["memoryvalue"]) time.sleep(0.5) return ()
定义控制事件
主要是针对采集按钮的控制,当中台开启采集下发按钮时,触发控制事件,使门禁进行长连接,开启远程采集功能,并且调用之前的人员创建和人脸下发的函数。
# 5、控制事件回调,数据点控制访问
def Event_setData(self, dataId, value):
try:
if dataId==self.key_s and value==1:
# 登录设备
lpInBuffer = NET_DVR_CAPTURE_FACE_COND()
lpInBuffer.dwSize = sizeof(lpInBuffer)
self.debug(lpInBuffer.dwSize)
dwInBufferLen = sizeof(lpInBuffer)
self.HK_ret = self.Objdll.NET_DVR_StartRemoteConfig(self.lUserId, NET_DVR_CAPTURE_FACE_INFO, byref(lpInBuffer), dwInBufferLen, None, None)
self.debug(self.HK_ret)
self.debug(self.Objdll.NET_DVR_GetLastError())
if self.HK_ret < 0:
#self.debug('fail, error code is:'+ self.Objdll.NET_DVR_GetLastError())
# 关闭长连接
rtu = self.Objdll.NET_DVR_StopRemoteConfig(self.HK_ret)
self.debug("关闭长连接!")
self.debug(self.Objdll.NET_DVR_GetLastError())
# 登出设备
self.Objdll.NET_DVR_Logout(self.lUserId)
# 释放资源
self.Objdll.NET_DVR_Cleanup()
return json.dumps({'code':0, 'msg':'', 'data':''})
try:
# 循环获取人脸采集结果
btime = time.time()
while True:
#btime = time.time()
lpOutBuff = NET_DVR_CAPTURE_FACE_CFG()
lpOutBuff.dwSize = sizeof(lpOutBuff)
dwOutBuffSize = sizeof(lpOutBuff)
res = self.Objdll.NET_DVR_GetNextRemoteConfig(self.HK_ret, byref(lpOutBuff), dwOutBuffSize)
self.debug(res)
self.debug(self.Objdll.NET_DVR_GetLastError())
if res == 1000:
self.setValue(u'采集情况', '采集成功')
picurl, picstr = Get_JPEGpicture(self, self.lUserId)
self.picstr = picstr
self.debug(self.picstr)
self.setValue(u'采集开关', 0)
self.setValue(u'人脸图片中人脸质量',lpOutBuff.byFacePicQuality)
self.setValue(u'人脸图片数据大小',lpOutBuff.dwFacePicSize)
self.setValue(u'人脸图片URL', picurl)
break
elif res == 1002:
# print("采集结束")
self.setValue(u'采集情况', '采集结束')
picurl, picstr = Get_JPEGpicture(self, self.lUserId)
self.picstr = picstr
self.debug(self.picstr)
self.setValue(u'采集开关', 0)
self.setValue(u'人脸图片中人脸质量', lpOutBuff.byFacePicQuality)
self.setValue(u'人脸图片数据大小', lpOutBuff.dwFacePicSize)
self.setValue(u'人脸图片URL', picurl)
break
elif res == 1003 or res == 1004:
self.setValue(u'采集情况', '采集超时,请重新开启采集')
self.setValue(u'采集开关', 0)
# self.setValue(u'人脸图片URL', picstr)\
self.setValue(u'人脸图片URL', "")
self.setValue(u'人脸图片中人脸质量', "")
self.setValue(u'人脸图片数据大小', "")
break
elif res == 0:
self.setValue(u'采集情况', '数据返回失败')
self.setValue(u'采集开关', 0)
elif res == 1001:
etime = time.time()
self.debug(type(btime))
self.debug(etime-btime)
if (etime - btime) > 5:
self.setValue(u'采集情况', '采集超时!请重新开启采集')
self.setValue(u'采集开关', 0)
self.setValue(u'人脸图片URL', "")
self.setValue(u'人脸图片中人脸质量', "")
self.setValue(u'人脸图片数据大小', "")
res = 1003
break
continue
except Exception, e:
self.debug(e)
return json.dumps({'code': 0, 'msg': '', 'data': ''})
# 关闭长连接
rtu = self.Objdll.NET_DVR_StopRemoteConfig(self.HK_ret)
self.debug("关闭长连接!")
self.debug(self.Objdll.NET_DVR_GetLastError())
# 登出设备
#self.Objdll.NET_DVR_Logout(self.lUserId)
self.setValue(u'采集开关',0)
if res == 1000 or res ==1002:
thistime = int(time.mktime(time.localtime()))
self.strname = str(thistime)
self.streno = str(thistime)
result1 = newperson(self,self.strname, self.streno)
self.debug(result1)
result2 = uploadImage(self ,self.picstr, self.strname, self.streno)
self.debug(result2)
return json.dumps({'code': 0, 'msg': '', 'data': ''})
except Exception,e:
self.debug(u'>>>>>>>>>error'+e)
return json.dumps({'code':0, 'msg':'', 'data':''})
该实例只是展示了门禁的远程采集储存人脸的功能,还有展示门禁开关门状态和下发人员信息等操作的功能,这些功能可以根据上述的驱动进行扩展操作。
最后编辑:admin 更新时间:2023-11-29 09:38