python

超轻量级php框架startmvc

python获取微信企业号打卡数据并生成windows计划任务

更新时间:2020-07-01 15:24:01 作者:startmvc
由于公司的系统用的是Java版本,开通了企业号打卡之后又没有预算让供应商做数据对接,

由于公司的系统用的是Java版本,开通了企业号打卡之后又没有预算让供应商做数据对接,所以只能自己捣鼓这个,以下是个人设置的一些内容,仅供大家参考

安装python

python的安装,这里就不详细写了,大家可自行度娘或google。

安装第三方库

python安装好之后别忘记配置环境变量!另外,所以的内容都是安装在服务器上的,且服务器需要能够上外网,否则,只能配置在本地,因为需要外网连接微信企业号的接口。这里需要用到几个第三方库:

python的pip命令,一般python安装好之后都会默认有,如果不确定,可输入命令查询,通过cmd进入命令提示符,输入

pip list

如果提示你需要更新,你可以更新,也可以不更新,更新命令其实给到你了python -m pip install --upgrade pip

安装所需要的库

Step.1

pip install pymssql

如果安装pymssql出错,提示什么visual C++ 14,则先安装wheel,如不报错则忽略step2、step3

Step.2

pip install wheel

Step.3

下载pymssql-2.1.4.dev5-cp37-cp37m-win_amd64.whl

可去这里下载最新版本的。pymssql下载

下载好之后,进入该文件所在的目录,通过pip install安装即可cd D:\

pip install pymssql-2.1.4.dev5-cp37-cp37m-win_amd64.whl

step.4

pip install requests

至此,所有第三方库都配置好了。

写主程序


# !/usr/bin/python
# -*- coding:utf-8 -*-
# @Time: 2018/7/26 16:05
# @Author: hychen.cc
import json # 因微信企业号返回的格式为json,所以引入json
import requests
import pymssql
import math # 引入数学方法
import time
import datetime
server = 'XX.XX.XX.XX' # 数据库服务器地址
user = 'sa' # 数据库登录名,可以用sa
password = '******' # 数据库用户对应的密码
dbName = 'DBNAME' # 数据库名称
CORP_ID = 'XXXXXX' # 微信企业号提供的CORP_ID
CORP_SECRET = 'XXXXX' # 微信企业号提供的CORP_SECRET
"""

因微信接口所需要unix时间戳,所以需要把时间转为为Unix时间戳格式

定义时间转换为Unix时间方法


"""def datetime_timestamp(dt):
 # dt为字符串
 # 中间过程,一般都需要将字符串转化为时间数组
 time.strptime(dt, '%Y-%m-%d %H:%M:%S')
 ## time.struct_time(tm_year=2018, tm_mon=10, tm_mday=25, tm_hour=10, tm_min=0, tm_sec=0, tm_wday=0, tm_yday=88, tm_isdst=-1)
 # 将"2018-10-25 10:00:00"转化为时间戳
 s = time.mktime(time.strptime(dt, '%Y-%m-%d %H:%M:%S'))
 return int(s)
# 定义连接数据库方法
def get_link_server():
 connection = pymssql.connect(server, user, password, database=dbName)
 if connection:
 return connection
 else:
 raise ValueError('Connect DBServer failed.')
"""

定义获取用户列表,因为微信企业号一次最大只能获取100个,所以需要转换为列表格式,分批次获取

我这里设置是从DB中获取有权限微信打卡的人员(Select * From Table),换成自己的方式即可


"""
def get_userid_list():
 """
 获取用户列表
 :return:
 """
 conn = get_link_server()
 cursor = conn.cursor()
 sql = "Select * From Table"
 cursor.execute(sql)
 row = cursor.fetchone()
 userlist = []
 while row:
 userlist.append(row[0])
 row = cursor.fetchone()
 if userlist:
 return userlist
 else:
 raise ValueError('Get Userlist failed.')
 conn.close()
"""

获取Access_Token,因为Token有时效(2小时),所以需要存在本地,这样不需要频繁调用,所以我定义了存储过程(sP_GetWX_access_token)来判断之前存储的token是否有效,有效的话就不需要重复获取了


"""
def get_access_token(refresh=False):
 """
 获取Access Token
 :return:
 """
 if not refresh:
 API_ACCESS_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % (
 CORP_ID, CORP_SECRET)
 response = requests.get(API_ACCESS_TOKEN_URL, verify=False)
 if response.status_code == 200:
 rep_dict = json.loads(response.text)
 errcode = rep_dict.get('errcode')
 if errcode:
 raise ValueError('Get wechat Access Token failed, errcode=%s.' % errcode)
 else:
 access_token = rep_dict.get('access_token')
 if access_token:
 conn = get_link_server()
 cursor = conn.cursor()
 cursor.execute('exec sP_GetWX_access_token @Access_Token=%s', access_token)
 conn.commit()
 conn.close()
 return access_token
 else:
 raise ValueError('Get wechat Access Token failed.')
 else:
 raise ValueError('Get wechat Access Token failed.')
 else:
 conn = get_link_server()
 cursor = conn.cursor()
 cursor.execute("Select Access_Token From wx_AccessToken Where ID=1")
 access_token = cursor.fetchone()
 if access_token:
 return access_token[0]
 conn.close()
 else:
 API_ACCESS_TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=%s&corpsecret=%s" % (
 CORP_ID, CORP_SECRET)
 response = requests.get(API_ACCESS_TOKEN_URL, verify=False)
 if response.status_code == 200:
 rep_dict = json.loads(response.text)
 errcode = rep_dict.get('errcode')
 if errcode:
 raise ValueError('Get wechat Access Token failed, errcode=%s.' % errcode)
 else:
 access_token = rep_dict.get('access_token')
 if access_token:
 conn = get_link_server()
 cursor = conn.cursor()
 cursor.execute('exec sP_GetWX_access_token @Access_Token=%s', access_token)
 conn.commit()
 conn.close()
 return access_token
 else:
 raise ValueError('Get wechat Access Token failed.')
 else:
 raise ValueError('Get wechat Access Token failed.')
# 获取微信打卡的json格式
def get_punchcard_info(access_token, opencheckindatatype, starttime, endtime, useridlist):
 API_PUNCH_CARD_URL = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token
 json_str = json.dumps(
 {'opencheckindatatype': opencheckindatatype, 'starttime': starttime, 'endtime': endtime, 'useridlist': useridlist})
 response = requests.post(API_PUNCH_CARD_URL, data=json_str, verify=False)
 if response.status_code == 200:
 rep_dic = json.loads(response.text)
 errcode = rep_dic.get('errcode')
 if errcode == 42001:
 access_token = get_access_token(True)
 API_PUNCH_CARD_URL = 'https://qyapi.weixin.qq.com/cgi-bin/checkin/getcheckindata?access_token=' + access_token
 json_str = json.dumps(
 {'opencheckindatatype': opencheckindatatype, 'starttime': starttime, 'endtime': endtime,
 'useridlist': useridlist})
 response = requests.post(API_PUNCH_CARD_URL, data=json_str, verify=False)
 rep_dic = json.loads(response.text)
 errcode = rep_dic.get('errcode')
 if errcode:
 raise ValueError('Get punch data failed1, errcode=%s' % errcode)
 else:
 value_str = rep_dic.get('checkindata')
 if value_str:
 return value_str
 else:
 raise ValueError('Get punch data failed2.')
 elif errcode:
 raise ValueError ('Get punch data failed3, errcode=%s' % errcode)
 else:
 value_str = rep_dic.get('checkindata')
 if value_str:
 return value_str
 else:
 raise ValueError('I do not find employee punch data.')
 else:
 raise ValueError ('Get punch data failed5.')
# 调用接口,获得数据
if __name__ == '__main__':
 today = datetime.date.today()
 oneday = datetime.timedelta(days=3) # days,即获取几天内的
 yesterday = today - oneday
 starttime = datetime_timestamp(yesterday.strftime('%Y-%m-%d') + ' 00:00:00')
 endtime = datetime_timestamp(today.strftime('%Y-%m-%d') + ' 23:59:59')
 opencheckindatatype = 3
 access_token = get_access_token()
 if access_token:
 useridlist = get_userid_list()
 if useridlist:
 step = 100
 total = len(useridlist)
 n = math.ceil(total/step)
 for i in range(n):
 # print (useridlist[i*step:(i+1)*step])
 punch_card = get_punchcard_info(access_token, opencheckindatatype, starttime, endtime,useridlist[i*step:(i+1)*step])
 # print (punch_card)
 if punch_card:
 conn = get_link_server()
 cursor = conn.cursor()
 for dic_obj in punch_card:
 cursor.execute('exec sp_AnalysisPunchCard @Json=%s',
 (json.dumps(dic_obj, ensure_ascii=False)))
 # print((json.dumps(dic_obj, ensure_ascii=False))),sp_AnalysisPunchCard把获取到的数据解析后存入数据库中
 conn.commit()
 conn.close()
 print ('Get punch card successed.')
 else:
 raise ValueError('No userlist exists')

设置Windows计划任务

通过控制面板-管理工具-任务计划程序,右击选择创建基本任务,这里注意的是路径和程序。

在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述 在这里插入图片描述

程序或脚本:python.exe

添加参数(可选)(A):你的py文件目录

起始于:python目录,如果不知道python安装到哪去了,按照下列cmd命令,输入python后进入python命令查询


import sys
sys.prefix,回车

到此,配置完成,可自行右击任务-执行查询效果,或者通过python命令执行py文件

进入到py文件目录

python xxx.py

总结

以上所述是小编给大家介绍的python获取微信企业号打卡数据并生成windows计划任务,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对脚本之家网站的支持! 如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

python 微信打卡