导入模块

注意这里模块名是pyserial

pip install pyserial

1. 打开串口

import serial
com = serial.Serial('COM3', 9600)
print com

2. 发送数据

import serial
com = serial.Serial('COM3', 9600)
success_bytes = com.write('This is data for test')
print success_bytes

3. 接收数据(固定长度)

import serial
com = serial.Serial('COM3', 9600)
data = com.read(10)
print data

接收10个字节,不接收到就一直等待接收。

4. 接收数据(某时间内一直接收,超过则不接收)

import serial
import time
 
com = serial.Serial('COM3', 115200)
over_time = 30  # 设置的时间
start_time = time.time()
while True:
  end_time = time.time()
  if end_time - start_time < over_time:
    data = com.read(com.inWaiting())
    data = str(data)
    if data != '':
      print data

5. 封装成类

# -*- encoding=utf-8 -*-
import serial
import time

import WriteLog


class COM:
  def __init__(self, port, baud):
    self.port = port
    self.baud = int(baud)
    self.open_com = None
    self.log = WriteLog.WriteLog('ITC_LOG.LOG')
    self.get_data_flag = True
    self.real_time_data = ''

  # return real time data form com
  def get_real_time_data(self):
    return self.real_time_data

  def clear_real_time_data(self):
    self.real_time_data = ''

  # set flag to receive data or not
  def set_get_data_flag(self, get_data_flag):
    self.get_data_flag = get_data_flag

  def open(self):
    try:
      self.open_com = serial.Serial(self.port, self.baud)
    except Exception as e:
      self.log.error('Open com fail:{}/{}'.format(self.port, self.baud))
      self.log.error('Exception:{}'.format(e))

  def close(self):
    if self.open_com is not None and self.open_com.isOpen:
      self.open_com.close()

  def send_data(self, data):
    if self.open_com is None:
      self.open()
    success_bytes = self.open_com.write(data.encode('UTF-8'))
    return success_bytes

  def get_data(self, over_time=30):
    all_data = ''
    if self.open_com is None:
      self.open()
    start_time = time.time()
    while True:
      end_time = time.time()
      if end_time - start_time < over_time and self.get_data_flag:
        data = self.open_com.read(self.open_com.inWaiting())
        # data = self.open_com.read() # read 1 size
        data = str(data)
        if data != '':
          self.log.info('Get data is:{}'.format(data))
          all_data = all_data + data
          print data
          self.real_time_data = all_data
      else:
        self.set_get_data_flag(True)
        break
    return all_data
if __name__ == '__main__':
  pass
  com = COM('com3', 9600)
  com.open()
  print com.send_data('data')
  com.get_data(50)
  com.close()
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐