1. 共享Value、列表以及字典

import multiprocessing
import ctypes
import time


def process_write(int_data, str_data, list_data, dict_data):
	i = 1
	while True:
		int_data.value = i
		str_data.value = 'str' + str(i)
		list_data[0] = i
		dict_data['dict0'] = i
		print('write, index: ', i)
		i += 1

def process_read(int_data, str_data, list_data, dict_data):
	while True:
		print('read data: ', int_data.value, str_data.value, list_data[0], dict_data['dict0'])



if __name__=='__main__':
	int_data = multiprocessing.Manager().Value(ctypes.c_int, 0)
	str_data = multiprocessing.Manager().Value(ctypes.c_char_p, 'str0')

	list_data = multiprocessing.Manager().list()
	list_data.append(0)
	dict_data = multiprocessing.Manager().dict()
	dict_data['dict0'] = 0
	


	p1 = multiprocessing.Process(target=process_write, args=(int_data, str_data, list_data, dict_data,))
	p2 = multiprocessing.Process(target=process_read, args=(int_data, str_data, list_data, dict_data,))
	p1.start()
	p2.start()

	start = time.time()
	while time.time() - start < 1:
		pass
	p1.terminate()
	p2.terminate()

执行结果部分截图如下图。

 

2. Queue

数据先存先取,所以最后取到的数据可能是先前的数据,数据不是实时的,代码和执行结果如下图。

import multiprocessing
import ctypes
import time


def process_write(queue):
	i = 1
	while True:
		queue.put(i)
		print('put data: ', i)
		i += 1

def process_read(queue):
	while True:
		time.sleep(0.2)
		print('read data: ', queue.get())
		#print('read data: ', queue.get_nowait())
		print('get data wait end\n')



if __name__=='__main__':
	queue = multiprocessing.Manager().Queue()
	
	p1 = multiprocessing.Process(target=process_write, args=(queue,))
	p2 = multiprocessing.Process(target=process_read, args=(queue,))
	p1.start()
	p2.start()

	start = time.time()
	while time.time() - start < 5:
		pass
	p1.terminate()
	p2.terminate()

每次从queue中get数据时,都得等待queue有数据,也就是先执行put,再执行get。如果queue为空,直接使用get_nowait()方法取数据,那么抛异常,如下图。

 

3. 总结

3.1 对于先进先出的需求,取完前面的数据后,才能取后面的数据,这种场景使用Queue类,而且适用于更新数据进程比获取数据进程快,否则获取数据进程需要等待更新数据进程。

3.2 对于一般进程间共享数据来说,使用multiprocessing.Manager().Value和multiprocessing.Manager().list()和multiprocessing.Manager().dict()即可。

3.3 Value传递其它类型的参数对应表。

 

 

 

附录 使用multiprocessing.Value而不是multiprocessing.Manager().Value引起的问题

import multiprocessing
import ctypes
import time


def process_write(val):
	i = 1
	while True:
		val.value = ("str" + str(i)).encode('utf-8')
		print('write index: ', i)
		i += 1

def process_read(val):
	while True:
		print('read data: ', val.value)



if __name__=='__main__':
	val = multiprocessing.Value(ctypes.c_char_p, "str0".encode('utf-8'))
	
	p1 = multiprocessing.Process(target=process_write, args=(val,))
	p2 = multiprocessing.Process(target=process_read, args=(val,))
	p1.start()
	p2.start()

	start = time.time()
	while time.time() - start < 1:
		pass
	p1.terminate()
	p2.terminate()

附录1 错误:bytes or integer address expected instead of str instance

原因及解决方案:使用Value传递参数,和C函数接口有关,涉及类型转换。将字符串编码为utf-8,在字符串后面加上 .encode('utf-8'),如下图。

 

 

附录2 写进程正常工作,但是程序没有抛异常就强制停止

通过定位,发现是使用val.value获取数据有误,思考过后,可能是觉得对象读写发生冲突,于是想着使用copy包的copy方法进行克隆,然后再使用,结果报错如下。

经过搜索,才发现直接使用multiprocessing.Value是线程不安全的,需要使用multiprocessing.Manager().Value才行,所以更改即可。

 

 

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐