schedule模块定时执行任务:
python中有一个轻量级的定时任务调度的库:schedule。他可以完成每分钟,每小时,每天,周几,特定日期的定时任务。因此十分方便我们执行一些轻量级的定时任务。

安装:

pip3 install schedule

测试:

import schedule
import time
 
def run():
    print("I'm doing something...")
 
schedule.every(10).minutes.do(run)    # 每隔十分钟执行一次任务
schedule.every().hour.do(run)         # 每隔一小时执行一次任务
schedule.every().day.at("10:30").do(run)  # 每天的10:30执行一次任务
schedule.every().monday.do(run)  # 每周一的这个时候执行一次任务
schedule.every().wednesday.at("13:15").do(run) # 每周三13:15执行一次任务
 
while True:
    schedule.run_pending()  # run_pending:运行所有可以运行的任务

个人专属测试:

'''
Author: zhankun
Description: 
Date: 2021-08-02 10:11:00
LastEditTime: 2021-08-10 10:11:48
FilePath: /test/test.py
'''
import socket
import time
import json
import schedule

client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
host = "192.168.50.112"
port = 31001
client.connect((host, port))

def recv_save_data(client):
    msg = client.recv(1024)
    msg_info = msg.decode("utf-8")
    # print(type(msg_info))
    # print("receiving:", msg.decode("utf-8"))
    msg_dict = json.loads(msg_info)
    # print(type(msg_dict))
    try:
        print("x: ",msg_dict["results"]["current_pose"]["x"])
        print("y: ",msg_dict["results"]["current_pose"]["y"])
        print("theta: ",msg_dict["results"]["current_pose"]["theta"])
    except:
        pass
    
    with open('log.txt', mode='a', encoding='utf-8') as f:
        f.write(time.strftime('%Y-%m-%d %H:%M:%S ',time.localtime(time.time())) + msg.decode("utf-8"))

    return msg

def run_status():
    send_msg = "/api/robot_status"    # 获得机器人当前的全局状态
    client.send(send_msg.encode("utf-8"))
    msg = recv_save_data(client)

def run_loop():
    target_loop = "/api/move?markers=m1,m2,m3&distance_tolerance=1.0&count=-1"  # 多个点之间不间断的循环移动
    client.send(target_loop.encode("utf-8"))   

def run_charge():
    target_cancle = "/api/move/cancel"   # 取消当前正在进行的移动指令
    client.send(target_cancle.encode("utf-8"))
    target_charge = "/api/move?marker=target_name"   # 调用移动接口,移动至代号为"target_name"的目标点
    client.send(target_charge.encode("utf-8"))

    
if __name__ == '__main__':
    run_loop()  # 开始执行第一次巡检任务
    schedule.every(0.1).minutes.do(run_status)   # 每隔6s执行一次保存日志
    schedule.every().hour.do(run_loop)           # 每隔1hour执行一次巡检任务
    schedule.every(30).minutes.do(run_charge)    # 每隔30minutes执行一次回充电桩充电

    while True:
        schedule.run_pending()  # run_pending:运行所有可以运行的任务   

    client.close()

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐