深入理解高并发技术dpdk无锁队列
前两周给大家直播分享,并发技术全景(从硬件,操作系统,虚拟机/标准库,编程语言等)上半场(5个小时):并发/并行技术全景指南下半场(5个小时):人生的下半场,你准备好了吗最后我上周还布置了一个作业,让大家动手加深一下技术的理解今天给大家分享一篇,DPDK高性能无锁队列的实现,这才是真正实用,非常考验大家的工程能力的高级数据结构,很多人说算法和数据结构没有用,可能你只知道做...
前两周给大家直播分享,并发技术全景(从硬件,操作系统,虚拟机/标准库,编程语言等)
下半场(5个小时):人生的下半场,你准备好了吗
最后我上周还布置了一个作业,让大家动手加深一下技术的理解
今天给大家分享一篇,DPDK高性能无锁队列的实现,这才是真正实用,非常考验大家的工程能力的高级数据结构,很多人说算法和数据结构没有用,可能你只知道做算法题里面那些理想数据结构,不知道工作中的各种框架,虚拟机,标准库,操作系统为了高性能帮你做好了这一切。
一、dpdk的rte_ring简介
rte_ring的实质是FIFO的无锁环形队列,无锁队列的出队入队操作是rte_ring实现的关键。常用于多线程/多进程之间的通信。
ring的特点:
无锁出入队(除了cas(compare and swap)操作)
多消费/生产者同时出入队
使用方法:
1.创建一个ring对象。
2. 出入队
有不同的出入队方式(单、bulk、burst)都在rte_ring.h中。
例如:rte_ring_enqueue
和rte_ring_dequeue
这种数据结构与链表队列相比:
优点如下:
更快:比较void *大小的数据,只需要执行单次Compare-And-Swap指令,而不需要执行2次Compare-And-Swap指令
比完全无锁队列简单
适用于批量入队/出队操作。因为指针存储在表中,多个对象出队并不会像链表队列那样产生大量的缓存未命中,此外,多个对象批量出队不会比单个对象出队开销大
CAS(Compare and Swap)是个原子操作
缺点如下:
大小固定
许多环在内存方面的成本比链表列表的成本更高。空环至少包含N个指针。
二、rte_ring结构体分析
无锁环形队列的结构体如下:
struct rte_ring {
TAILQ_ENTRY(rte_ring) next;
char name[RTE_MEMZONE_NAMESIZE];
int flags;
const struct rte_memzone *memzone;
struct prod {
uint32_t watermark;
uint32_t sp_enqueue;
uint32_t size;
uint32_t mask;
volatile uint32_t head;
volatile uint32_t tail;
} prod __rte_cache_aligned;
struct cons {
uint32_t sc_dequeue;
uint32_t size;
uint32_t mask;
volatile uint32_t head;
volatile uint32_t tail;
#ifdef RTE_RING_SPLIT_PROD_CONS
} cons __rte_cache_aligned;
#else
} cons;
#endif
#ifdef RTE_LIBRTE_RING_DEBUG
struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
#endif
void *ring[] __rte_cache_aligned;
};
dpdk
在rte_ring_list
链表中创建一个rte_tailq_entry
节点,在memzone
中根据队列的大小count申请一块内存(rte_ring
的大小加上count*sizeof(void *)
)。紧邻着rte_ring
结构的void *
数组用于放置入队的对象(单纯的赋值指针值)。rte_ring
结构中有生产者结构prod
、消费者结构cons
,初始化参数之后,把rte_tailq_entry
的data
节点指向rte_ring
结构地址。
可以注意到cons.head、cons.tail、prod.head、prod.tail
的类型都是uint32_t。除此之外,队列的大小count被限制为2的幂次方。这两个条件放到一起构成了一个很巧妙的情景。因为队列的大小一般不会有2的32次方那么大,所以,把队列取为32位的一个窗口,当窗口的大小是2的幂次方,则32位包含整数个窗口。这样,用来存放ring对象的void *指针数组空间就可只申请一个窗口大小即可。根据二进制的回环性,可以直接用(uint32_t)( prod_tail - cons_tail)
计算队列中有多少生产的产品(即使溢出了也不会出错,如(uint32_t)5-65535 = 6
)。
三、rte_ring实现多进程间通信
rte_ring需要与rte_mempool配合使用,通过rte_mempool来共享内存。dpdk多进程示例解读(examples/multi_process/simple_mp),实现进程之间的master和slave线程互发字串 :
int
main(int argc, char **argv)
{
const unsigned flags = 0;
const unsigned ring_size = 64;
const unsigned pool_size = 1024;
const unsigned pool_cache = 32;
const unsigned priv_data_sz = 0;
int ret;
unsigned lcore_id;
ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Cannot init EAL\n");
if (rte_eal_process_type() == RTE_PROC_PRIMARY){
send_ring = rte_ring_create(_PRI_2_SEC, ring_size, rte_socket_id(), flags);
recv_ring = rte_ring_create(_SEC_2_PRI, ring_size, rte_socket_id(), flags);
message_pool = rte_mempool_create(_MSG_POOL, pool_size,
STR_TOKEN_SIZE, pool_cache, priv_data_sz,
NULL, NULL, NULL, NULL,
rte_socket_id(), flags);
} else {
recv_ring = rte_ring_lookup(_PRI_2_SEC);
send_ring = rte_ring_lookup(_SEC_2_PRI);
message_pool = rte_mempool_lookup(_MSG_POOL);
}
if (send_ring == NULL)
rte_exit(EXIT_FAILURE, "Problem getting sending ring\n");
if (recv_ring == NULL)
rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n");
if (message_pool == NULL)
rte_exit(EXIT_FAILURE, "Problem getting message pool\n");
RTE_LOG(INFO, APP, "Finished Process Init.\n");
RTE_LCORE_FOREACH_SLAVE(lcore_id) {
rte_eal_remote_launch(lcore_recv, NULL, lcore_id);
}
struct cmdline *cl = cmdline_stdin_new(simple_mp_ctx, "\nsimple_mp > ");
if (cl == NULL)
rte_exit(EXIT_FAILURE, "Cannot create cmdline instance\n");
cmdline_interact(cl);
cmdline_stdin_exit(cl);
rte_eal_mp_wait_lcore();
return 0;
}
使用时,rte_mempool_get
从mempoo
l中获取一个对象,然后使用rte_ring_enqueue
入队列,另一个进程通过rte_ring_dequeue
来出队列,使用完成后需要rte_mempool_put
将对象放回mempool
:
send:
static void cmd_send_parsed(void *parsed_result,
__attribute__((unused)) struct cmdline *cl,
__attribute__((unused)) void *data)
{
void *msg = NULL;
struct cmd_send_result *res = parsed_result;
if (rte_mempool_get(message_pool, &msg) < 0)
rte_panic("Failed to get message buffer\n");
strlcpy((char *)msg, res->message, STR_TOKEN_SIZE);
if (rte_ring_enqueue(send_ring, msg) < 0) {
printf("Failed to send message - message discarded\n");
rte_mempool_put(message_pool, msg);
}
}
receive:
static int
lcore_recv(__attribute__((unused)) void *arg)
{
unsigned lcore_id = rte_lcore_id();
printf("Starting core %u\n", lcore_id);
while (!quit){
void *msg;
if (rte_ring_dequeue(recv_ring, &msg) < 0){
usleep(5);
continue;
}
printf("core %u: Received '%s'\n", lcore_id, (char *)msg);
rte_mempool_put(message_pool, msg);
}
return 0;
}
四、实现多生产/消费者同时生产/消费(同时出入队)
移动prod.head表示生产者预定的生产数量
当该生产者生产结束,且在此之前的生产也都结束后,移动prod.tail表示实际生产的位置
同样,移动cons.head表示消费者预定的消费数量
当该消费者消费结束,且在此之前的消费也都结束后,移动cons.tail表示实际消费的位置
1、多生产者入队流程:
/**
* @internal Enqueue several objects on the ring (multi-producers safe).
*
* This function uses a "compare and set" instruction to move the
* producer index atomically.
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
* @param behavior
* RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
* @return
* Depend on the behavior value
* if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects enqueue.
* - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
* high water mark is exceeded.
* - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
* if behavior = RTE_RING_QUEUE_VARIABLE
* - n: Actual number of objects enqueued.
*/
static inline int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t prod_head, prod_next;
uint32_t cons_tail, free_entries;
const unsigned max = n;
int success;
unsigned i, rep = 0;
uint32_t mask = r->prod.mask;
int ret;
/* Avoid the unnecessary cmpset operation below, which is also
* potentially harmful when n equals 0. */
if (n == 0)
return 0;
/* move prod.head atomically */
do {
/* Reset n to the initial burst count */
n = max;
/* 1. 抢占移动prod.head */
prod_head = r->prod.head;
cons_tail = r->cons.tail;
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* prod_head > cons_tail). So 'free_entries' is always between 0
* and size(ring)-1. */
/* 2.检查free空间是否足够 */
free_entries = (mask + cons_tail - prod_head);
/* check that we have enough room in ring */
if (unlikely(n > free_entries)) {
if (behavior == RTE_RING_QUEUE_FIXED) {
__RING_STAT_ADD(r, enq_fail, n);
return -ENOBUFS;
}
else {
/* No free entry available */
if (unlikely(free_entries == 0)) {
__RING_STAT_ADD(r, enq_fail, n);
return 0;
}
n = free_entries;
}
}
/* 3.利用cas操作,移动r->prod.head,预约生产*/
prod_next = prod_head + n;
success = rte_atomic32_cmpset(&r->prod.head, prod_head,
prod_next);
} while (unlikely(success == 0));
/* write entries in ring */
ENQUEUE_PTRS();
rte_smp_wmb();
/* if we exceed the watermark */
/*4.检查是否到了阈值,并添加到统计中*/
if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
(int)(n | RTE_RING_QUOT_EXCEED);
__RING_STAT_ADD(r, enq_quota, n);
}
else {
ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
__RING_STAT_ADD(r, enq_success, n);
}
/*
* If there are other enqueues in progress that preceded us,
* we need to wait for them to complete
*/
/*5.等待之前的入队操作完成,移动实际位置*/
while (unlikely(r->prod.tail != prod_head)) {
rte_pause();
/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
* for other thread finish. It gives pre-empted thread a chance
* to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&
++rep == RTE_RING_PAUSE_REP_COUNT) {
rep = 0;
sched_yield();
}
}
r->prod.tail = prod_next;
return ret;
}
下面介绍当两个生产者同时添加对象到ring时发生了什么。
1)在初始状态, prod_head 和 prod_tail指向相同的位置:
在两个生产者core中(这个core可以理解成同时运行的线程或进程),各自的局部变量都保存ring->prod_head 和 ring->cons_tail。各自的局部变量prod_next索引指向ring->prod_head的下一个元素,如果是批量入队,指向下几个元素。假如ring里没有足够的空间(检查cons_tail获知),入队函数将返回error:
prod_head = r->prod.head;
cons_tail = r->cons.tail;
...
free_entries = (mask + cons_tail - prod_head);
...
prod_next = prod_head + n;
2)第二步是修改ring结构体里的ring->prod_head 索引,将它指向上面提到的局部变量prod_next指向的位置:
这个操作是通过使用 Compare And Swap (CAS)执行完成的,rte_atomic32_cmpset()所做的就是CAS(compare and set)操作,是无锁队列实现的关键。Compare And Swap (CAS)包含以下原子操作:
如果ring->prod_head索引和局部变量prod_head索引不相等,CAS操作失败,代码将从新从第一步开始执行。
若相等,将ring->prod_head索引指向局部变量prod_next的位置,CAS操作成功,继续下一步处理。
在上图中,生产者core1执行成功后,生产者core2重新运行后成功。
do {
...
prod_head = r->prod.head;
cons_tail = r->cons.tail;
...
success = rte_atomic32_cmpset(&r->prod.head, prod_head, prod_next);
...
} while (unlikely(success == 0));
3)生产者core2中CAS指令重试成功
生产者core1更新对象obj4到ring中,生产者core2更新对象obj5到ring中(CAS指令重试后执行成功的)。
/* write entries in ring */
ENQUEUE_PTRS();
rte_smp_wmb();//内存屏障,防止乱序
4)现在每个生产者core都想更新 ring->prod_tail索引。生产者core代码中,只有ring->prod_tail等于自己局部变量prod_head才能被更新,显然从上图中可知,只有生产者core1才能满足,生产者core1完成了入队操作。
一旦生产者core1更新了ring->prod_tail后,生产者core2也可以更新ring->prod_tail了。生产者core2也完成了入队操作
(4)(5)两步对应代码:
/*
* If there are other enqueues in progress that preceded us,
* we need to wait for them to complete
*/
while (unlikely(r->prod.tail != prod_head)) {
rte_pause();
/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
* for other thread finish. It gives pre-empted thread a chance
* to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&
++rep == RTE_RING_PAUSE_REP_COUNT) {
rep = 0;
sched_yield();
}
}
r->prod.tail = prod_next;
多消费者出队流程:
static inline int __attribute__((always_inline))
__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t cons_head, prod_tail;
uint32_t cons_next, entries;
const unsigned max = n;
int success;
unsigned i, rep = 0;
uint32_t mask = r->prod.mask;
/* Avoid the unnecessary cmpset operation below, which is also
* potentially harmful when n equals 0. */
if (n == 0)
return 0;
/* move cons.head atomically
cgm
1.检查可消费空间是否足够
2.cms消费预约*/
do {
/* Restore n as it may change every loop */
n = max;
cons_head = r->cons.head;
prod_tail = r->prod.tail;
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* cons_head > prod_tail). So 'entries' is always between 0
* and size(ring)-1. */
entries = (prod_tail - cons_head);
/* Set the actual entries for dequeue */
if (n > entries) {
if (behavior == RTE_RING_QUEUE_FIXED) {
__RING_STAT_ADD(r, deq_fail, n);
return -ENOENT;
}
else {
if (unlikely(entries == 0)){
__RING_STAT_ADD(r, deq_fail, n);
return 0;
}
n = entries;
}
}
cons_next = cons_head + n;
success = rte_atomic32_cmpset(&r->cons.head, cons_head,
cons_next);
} while (unlikely(success == 0));
/* copy in table */
DEQUEUE_PTRS();
rte_smp_rmb();
/*
* If there are other dequeues in progress that preceded us,
* we need to wait for them to complete
cgm 等待之前的出队操作完成
*/
while (unlikely(r->cons.tail != cons_head)) {
rte_pause();
/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
* for other thread finish. It gives pre-empted thread a chance
* to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&
++rep == RTE_RING_PAUSE_REP_COUNT) {
rep = 0;
sched_yield();
}
}
__RING_STAT_ADD(r, deq_success, n);
r->cons.tail = cons_next;
return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
}
同生产者一个道理,代码中加了点注释,可以再看一下上面的代码。
参考:
https://blog.csdn.net/qq_15437629/article/details/78147874
欢迎加入极客星球,分享多年工作经验和技术理解,扩展视野(很多你不知道你不知道的东西),直播分享,一起做项目,经典面试题,帮助有想进大厂(在校大学生校招和社招镀金的同学找到最佳学习路线,针对性突破); 帮助想提高技术实力的,制定技术成长路线,分享各种宝贵的职场经验和人生经验,希望你们站在前辈们的肩膀上。
详细点击查看-> 极客星球。
这里我正在准备搞一个后端集训营,当前招聘要求越来越高,要想获得高新offer,必须拿出自己的实力,尤其是背景不怎么好的同学,技术实力就是最好的竞争力,但很多知识需要历练才能理解深刻,所以需要有人指导才行,这样才能快速崛起,从上到下打通整个技术链条(从编程语言,算法,应用框架,中间件,到底层内核(Linux内核),甚至到底层硬件等),加强内功修炼(硬件+软件),加强基本功, 让自己上升几个level,这个事本身是很费力的,但我希望尽最大努力帮助大家。
可以加我微信,交个朋友,技术交流
- END -
看完一键三连在看,转发,点赞
是对文章最大的赞赏,极客重生感谢你
推荐阅读
你好,这里是极客重生,我是阿荣,大家都叫我荣哥,从华为->外企->到互联网大厂,目前是大厂资深工程师,多次获得五星员工,多年职场经验,技术扎实,专业后端开发和后台架构设计,热爱底层技术,丰富的实战经验,分享技术的本质原理,希望帮助更多人蜕变重生,拿BAT大厂offer,培养高级工程师能力,成为技术专家,实现高薪梦想,期待你的关注!点击蓝字查看我的成长之路。
校招/社招/简历/面试技巧/大厂技术栈分析/后端开发进阶/优秀开源项目/直播分享/技术视野/实战高手等, 极客星球希望成为最有技术价值星球,尽最大努力为星球的同学提供面试,跳槽,技术成长帮助!详情查看->极客星球
求点赞,求在看,求分享三连
更多推荐
所有评论(0)