python每隔10分钟执行一次函数
schedule模块定时执行任务:python中有一个轻量级的定时任务调度的库:schedule。他可以完成每分钟,每小时,每天,周几,特定日期的定时任务。因此十分方便我们执行一些轻量级的定时任务。安装:pip3 install schedule -i http://pypi.douban.com/simple测试:import scheduleimport timedef run():print
·
schedule模块定时执行任务:
python中有一个轻量级的定时任务调度的库:schedule。他可以完成每分钟,每小时,每天,周几,特定日期的定时任务。因此十分方便我们执行一些轻量级的定时任务。
安装:
pip3 install schedule
测试:
import schedule
import time
def run():
print("I'm doing something...")
schedule.every(10).minutes.do(run) # 每隔十分钟执行一次任务
schedule.every().hour.do(run) # 每隔一小时执行一次任务
schedule.every().day.at("10:30").do(run) # 每天的10:30执行一次任务
schedule.every().monday.do(run) # 每周一的这个时候执行一次任务
schedule.every().wednesday.at("13:15").do(run) # 每周三13:15执行一次任务
while True:
schedule.run_pending() # run_pending:运行所有可以运行的任务
个人专属测试:
'''
Author: zhankun
Description:
Date: 2021-08-02 10:11:00
LastEditTime: 2021-08-10 10:11:48
FilePath: /test/test.py
'''
import socket
import time
import json
import schedule
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "192.168.50.112"
port = 31001
client.connect((host, port))
def recv_save_data(client):
msg = client.recv(1024)
msg_info = msg.decode("utf-8")
# print(type(msg_info))
# print("receiving:", msg.decode("utf-8"))
msg_dict = json.loads(msg_info)
# print(type(msg_dict))
try:
print("x: ",msg_dict["results"]["current_pose"]["x"])
print("y: ",msg_dict["results"]["current_pose"]["y"])
print("theta: ",msg_dict["results"]["current_pose"]["theta"])
except:
pass
with open('log.txt', mode='a', encoding='utf-8') as f:
f.write(time.strftime('%Y-%m-%d %H:%M:%S ',time.localtime(time.time())) + msg.decode("utf-8"))
return msg
def run_status():
send_msg = "/api/robot_status" # 获得机器人当前的全局状态
client.send(send_msg.encode("utf-8"))
msg = recv_save_data(client)
def run_loop():
target_loop = "/api/move?markers=m1,m2,m3&distance_tolerance=1.0&count=-1" # 多个点之间不间断的循环移动
client.send(target_loop.encode("utf-8"))
def run_charge():
target_cancle = "/api/move/cancel" # 取消当前正在进行的移动指令
client.send(target_cancle.encode("utf-8"))
target_charge = "/api/move?marker=target_name" # 调用移动接口,移动至代号为"target_name"的目标点
client.send(target_charge.encode("utf-8"))
if __name__ == '__main__':
run_loop() # 开始执行第一次巡检任务
schedule.every(0.1).minutes.do(run_status) # 每隔6s执行一次保存日志
schedule.every().hour.do(run_loop) # 每隔1hour执行一次巡检任务
schedule.every(30).minutes.do(run_charge) # 每隔30minutes执行一次回充电桩充电
while True:
schedule.run_pending() # run_pending:运行所有可以运行的任务
client.close()
更多推荐
已为社区贡献3条内容
所有评论(0)