本篇文章主要介绍dpdk-l3fwd实例源码,通过分析代码逻辑,学习DPDK中几个API接口作用以及如何使用?

操作系统版本:CentOS 8.4

DPDK版本:dpdk-20.11.3

如何单独创建dpdk-l3fwd工程项目,参考链接:【DPDK】dpdk-l3fwd测试用例单独编译

功能模块分析

0、启动参数介绍

启动dpdk-l3fwd程序命令为:

[root@LFTF dpdk-l3fwd]# ./build/l3fwd_app-static -l 1-2 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)" --parse-ptype

--符号之前的为EAL初始化加载参数,--之后的为程序端口队列与核心绑定初始化所需要参数。其他参数解释可参考连接:

注意:VMWare虚拟机上默认的网卡驱动为e1000,启动之后可能会出现如下报错:

[root@LFTF dpdk-l3fwd]# ./build/l3fwd_app-static -l 1-3 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)(0,1,3)" --parse-ptype
EAL: Detected 8 lcore(s)
EAL: Detected 1 NUMA nodes
EAL: Detected static linkage of DPDK
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'PA'
EAL: No available hugepages reported in hugepages-2048kB
EAL: Probing VFIO support...
EAL: VFIO support initialized
EAL:   using IOMMU type 8 (No-IOMMU)
EAL: Ignore mapping IO port bar(4)
EAL: Probe PCI driver: net_e1000_em (8086:100f) device: 0000:02:05.0 (socket 0)
EAL: Ignore mapping IO port bar(4)
EAL: Probe PCI driver: net_e1000_em (8086:100f) device: 0000:02:07.0 (socket 0)
EAL: No legacy callbacks, legacy socket not created
soft parse-ptype is enabled
LPM or EM none selected, default LPM on
Initializing port 0 ... Creating queues: nb_rxq=2 nb_txq=3... Port 0 modified RSS hash function based on hardware support,requested:0x104 configured:0
Ethdev port_id=0 nb_rx_queues=2 > 1
EAL: Error - exiting with code: 1
  Cause: Cannot configure device: err=-22, port=0

修改网卡驱动为vmnet3,虚拟机网卡配置vmnet3链接:DPDK-虚拟机配置网卡多队列

Note:设置完网卡多队列之后网卡驱动发生改变,需要重新修改网卡绑定脚本和解绑脚本。

设置完成之后启动打印如下:

[root@LFTF dpdk-l3fwd]# ./build/l3fwd_app-static -l 1-3 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)(0,1,3)" --parse-ptype
EAL: Detected 8 lcore(s)
EAL: Detected 1 NUMA nodes
EAL: Detected static linkage of DPDK
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'PA'
EAL: No available hugepages reported in hugepages-2048kB
EAL: Probing VFIO support...
EAL: VFIO support initialized
EAL:   using IOMMU type 8 (No-IOMMU)
EAL: Ignore mapping IO port bar(3)
EAL: Probe PCI driver: net_vmxnet3 (15ad:7b0) device: 0000:03:00.0 (socket 0)
EAL: Ignore mapping IO port bar(3)
EAL: Probe PCI driver: net_vmxnet3 (15ad:7b0) device: 0000:0b:00.0 (socket 0)
EAL: No legacy callbacks, legacy socket not created
soft parse-ptype is enabled
LPM or EM none selected, default LPM on
Initializing port 0 ... Creating queues: nb_rxq=2 nb_txq=3...  Address:00:0C:29:4C:18:9B, Destination:02:00:00:00:00:00, Allocated mbuf pool on socket 0
LPM: Adding route 198.16.0.0 / 24 (0)
LPM: Adding route 198.16.1.0 / 24 (1)
LPM: Adding route 8.8.8.0 / 24 (1)
LPM: Adding route 172.16.0.0 / 24 (0)
LPM: Adding route 192.168.154.0 / 24 (1)
LPM: Adding route 123.125.104.0 / 24 (1)
LPM: Adding route 2001:200:: / 48 (0)
LPM: Adding route 2001:200:0:0:1:: / 48 (1)
txq=1,0,0 txq=2,1,0 txq=3,2,0 
Initializing port 1 ... Creating queues: nb_rxq=1 nb_txq=3...  Address:00:0C:29:4C:18:A5, Destination:02:00:00:00:00:01, txq=1,0,0 txq=2,1,0 txq=3,2,0 

Initializing rx queues on lcore 1 ... rxq=0,0,0 
Initializing rx queues on lcore 2 ... rxq=1,0,0 
Initializing rx queues on lcore 3 ... rxq=0,1,0 
Port 0: softly parse packet type info
Port 1: softly parse packet type info
Port 0: softly parse packet type info

Checking link statusdone
Port 0 Link up at 10 Gbps FDX Fixed
Port 1 Link up at 10 Gbps FDX Fixed
L3FWD: entering main loop on lcore 2
L3FWD:  -- lcoreid=2 portid=1 rxqueueid=0
L3FWD: entering main loop on lcore 3
L3FWD:  -- lcoreid=3 portid=0 rxqueueid=1
L3FWD: entering main loop on lcore 1
L3FWD:  -- lcoreid=1 portid=0 rxqueueid=0

可以看到EAL: Probe PCI driver: net_vmxnet3 (15ad:7b0) device: 0000:03:00.0 (socket 0)发生了改变,程序也成功启动。

1、环境配置模块

1.1、EAL初始化

代码接口为:

/* init EAL */
ret = rte_eal_init(argc, argv);

EAL全称为Environment Abstraction Layer环境抽象层rte_eal_init初始化主要决定如何分配操作系统的资源(即内存空间、设备、定时器、控制台等等)。

1.1.1、CPU核检测

检测系统逻辑CPU个数,判断传入的参数是否超过可用的CPU个数,如果超过则出错返回。启动命令如下:

[root@LFTF dpdk-l3fwd]# ./build/l3fwd_app-static -l 1-10 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)" --parse-ptype
EAL: Detected 8 lcore(s)
EAL: Detected 1 NUMA nodes
EAL: lcore 9 unavailable
EAL: lcore 10 unavailable

EAL: invalid core list, please check specified cores are part of 0-8
.
.
.
EAL: FATAL: Invalid 'command line' arguments.
EAL: Invalid 'command line' arguments.
EAL: Error - exiting with code: 1
  Cause: Invalid EAL parameters

打印可用的cores数是8,如果超过则报错退出。

1.1.2、大页内存检测

检测系统配置的巨页内存知否可用,如果不可用则出错返回。程序启动前可查看巨页内存配置,命令如下:

[root@LFTF dpdk-l3fwd]# cat /proc/meminfo | grep Huge
AnonHugePages:         0 kB
ShmemHugePages:        0 kB
FileHugePages:         0 kB
HugePages_Total:       1
HugePages_Free:        1
HugePages_Rsvd:        0
HugePages_Surp:        0
Hugepagesize:    1048576 kB
Hugetlb:         1048576 kB

可以看出,配置的巨页内存格式为1G * 1的,程序启动之后打印如下:

[root@LFTF dpdk-l3fwd]# ./build/l3fwd_app-static -l 1-2 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)" --parse-ptype
EAL: Detected 8 lcore(s)
EAL: Detected 1 NUMA nodes
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'VA'
EAL: No available hugepages reported in hugepages-2048kB
.
.

EAL: No available hugepages reported in hugepages-2048kB,打印提示2M的巨页不可用,这个可能就是由于设置巨页内存的格式是1G大小的,因此程序也能正常启动。

1.1.3、网卡驱动检测

使用DPDK脚本绑定指定网卡之后,启动程序时会通过EAL初始化接口将网卡信息加载到程序中,未绑定网卡时启动dpdk-l3fwd程序,打印如下:

[root@LFTF dpdk-l3fwd]# ./build/l3fwd_app-static -l 1-2 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)" --parse-ptype
EAL: Detected 8 lcore(s)
EAL: Detected 1 NUMA nodes
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'PA'
EAL: No available hugepages reported in hugepages-2048kB
EAL: Probing VFIO support...
EAL:   Invalid NUMA socket, default to 0
EAL:   Invalid NUMA socket, default to 0
EAL:   Invalid NUMA socket, default to 0
EAL: No legacy callbacks, legacy socket not created
soft parse-ptype is enabled
LPM or EM none selected, default LPM on
port 0 is not present on the board
EAL: Error - exiting with code: 1
  Cause: check_port_config failed

绑定网卡之后,启动dpdk-l3fwd程序,打印如下:

·[root@LFTF dpdk-l3fwd]# ./build/l3fwd_app-static -l 1-2 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)" --parse-ptype
EAL: Detected 8 lcore(s)
EAL: Detected 1 NUMA nodes
EAL: Multi-process socket /var/run/dpdk/rte/mp_socket
EAL: Selected IOVA mode 'PA'
EAL: No available hugepages reported in hugepages-2048kB
EAL: Probing VFIO support...
EAL: VFIO support initialized
EAL:   Invalid NUMA socket, default to 0
EAL:   Invalid NUMA socket, default to 0
EAL:   using IOMMU type 8 (No-IOMMU)
EAL: Ignore mapping IO port bar(3)
EAL: Probe PCI driver: net_vmxnet3 (15ad:7b0) device: 0000:03:00.0 (socket 0)
EAL:   Invalid NUMA socket, default to 0
EAL: Ignore mapping IO port bar(3)
EAL: Probe PCI driver: net_vmxnet3 (15ad:7b0) device: 0000:0b:00.0 (socket 0)
EAL: No legacy callbacks, legacy socket not created
soft parse-ptype is enabled
LPM or EM none selected, default LPM on
.

EAL: VFIO support initialized表示VFIO驱动安装成功,对应的网卡信息打印为:

EAL: Probe PCI driver: net_vmxnet3 (15ad:7b0) device: 0000:03:00.0 (socket 0)
EAL: Probe PCI driver: net_vmxnet3 (15ad:7b0) device: 0000:0b:00.0 (socket 0)

DPDK网卡绑定步骤可参考链接:

小结:EAL层初始化失败原因可通过输出的错误打印进行判断,经常出现的错误有:

1、巨页内存未配置成功导致程序启动失败;

2、逻辑核数量配置错误导致启动失败;

3、如果使用-c掩码模式配置核心,请检查是否配置正确;

4、网卡未绑定启动失败;

5、重复启动程序。

注意:EAL层初始化加载的部分内容为用户配置的,如1、要使用的核心ID(通过-l参数配置);2、加载的驱动为VFIO还是UIO驱动(通过绑卡绑定步骤里的insmod命令挂载;3、加载的巨野内存大小(通过修改grub配置文件进行配置);4、程序加载的网卡(通过dpdk脚本进行绑定)。

1.2、应用程序参数初始化

通过应用参数初始化可以进行如:(网卡端口,队列id,处理核心)之间绑定关系;各个端口转发报文目的mac值;网卡接收数据模式(是否开启混杂模式);匹配模式(LPMEM模式);加载绑定网卡id掩码等等信息的初始化。接口api如下所示:

/* parse application arguments (after the EAL ones) */
ret = parse_args(argc, argv);
if (ret < 0)
    rte_exit(EXIT_FAILURE, "Invalid L3FWD parameters\n");

参数解释之前已经解释,这里不在赘述,后面使用过程中用到了哪一个再单独介绍,各个参数解释可参考链接:【DPDK】dpdk样例源码解析之二:dpdk-helloworld

1.3、网卡初始化

1.3.1、获取驱动信息
ret = rte_eth_dev_info_get(portid, &dev_info);
if (ret != 0)
    rte_exit(EXIT_FAILURE,
             "Error during getting device (port %u) info: %s\n",
             portid, strerror(-ret));

获取设备的默认信息,然后通过后续赋值的方式,修改设备的默认信息,如下:

// 判断网卡驱动是否支持多分片转发数据包
if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
    local_port_conf.txmode.offloads |=
    DEV_TX_OFFLOAD_MBUF_FAST_FREE;
// 配置的rss_conf.rss_hf和默认支持的offloads进行取与运算赋值
local_port_conf.rx_adv_conf.rss_conf.rss_hf &=
    dev_info.flow_type_rss_offloads;
// 判断配置的rss_conf.rss_hf值和默认配置的offloads值是否一样 // 取与操作,不一样也没问题
if (local_port_conf.rx_adv_conf.rss_conf.rss_hf !=
    port_conf.rx_adv_conf.rss_conf.rss_hf) {
    printf("Port %u modified RSS hash function based on hardware support,"
           "requested:%#"PRIx64" configured:%#"PRIx64"\n",
           portid,
           port_conf.rx_adv_conf.rss_conf.rss_hf,
           local_port_conf.rx_adv_conf.rss_conf.rss_hf);
}
1.3.2、网卡驱动配置

配置网卡端口的收发队列数量,以及端口信息,最后一个参数还可以确定队里使用模式,多队列信息以及计算接收队列(RX queue)负载均衡和同源同宿所用到的RSS值所需相关参数,包括keykey_lenhash_function等信息。TX队列一般不做多余设置。

ret = rte_eth_dev_configure(portid, nb_rx_queue,
                            (uint16_t)n_tx_queue, &local_port_conf);
if (ret < 0)
    rte_exit(EXIT_FAILURE,
             "Cannot configure device: err=%d, port=%d\n",
             ret, portid);

详细介绍一下struct rte_eth_conf port_conf结构体参数作用:

static struct rte_eth_conf port_conf = {
	.rxmode = {
		.mq_mode = ETH_MQ_RX_RSS, // 网卡 Multi Queue
		.max_rx_pkt_len = RTE_ETHER_MAX_LEN, // 接收最大数据包长度
		.split_hdr_size = 0, 
		.offloads = DEV_RX_OFFLOAD_CHECKSUM, //网卡驱动的OFFLOADS配置
	},
	.rx_adv_conf = {
		.rss_conf = {
			.rss_key = rss_intel_key,	// 计算RSS所需要的KEY
			.rss_key_len = 40,			// KEY的长度
			.rss_hf = ETH_RSS_IP,		// hash依据
		},
	},
	.txmode = {
		.mq_mode = ETH_MQ_TX_NONE,
	},
};

注意:port_conf.rxmode.offloads值需要设置为网卡类型所支持并开启的OFFLOADS。否则会报错如下:

在这里插入图片描述

代码中配置的RX offloads值为0xe,也就是DEV_RX_OFFLOAD_CHECKSUM的值,而网卡所支持的值为0x82a1d

其中0xe0x82a1d的二进制表示如下:

在这里插入图片描述

可以看出网卡驱动不支持1<<2值对应的OFFLOAD,即为DEV_RX_OFFLOAD_IPV4_CKSUM的值

#define DEV_RX_OFFLOAD_IPV4_CKSUM  0x00000002

port_conf.rxmode.offloads值设置为DEV_RX_OFFLOAD_UDP_CKSUM | DEV_RX_OFFLOAD_TCP_CKSUM,再次启动程序则可以正常启动。

在这里插入图片描述

小结:rxmode.offloads值需要设置为网卡驱动所支持的offloads,否则程序将会报错退出。

1.4、队列初始化

启动命令:

[root@LFTF dpdk-l3fwd]# ./build/l3fwd_app-static -l 1-2 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)" --parse-ptype

启动"(0,0,1),(1,0,2)"对应的关系为(port,queue,locre)(0,0,1)即端口0接收数据,绑定的核心为1,网卡队列id0(1,0,2)即端口1接收数据,绑定的核心为2,网卡队列id0

如果配置网卡多队列接收数据,可采用以下启动命令:

[root@LFTF dpdk-l3fwd]# ./build/l3fwd_app-static -l 1-3 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)(0,1,3)" --parse-ptype

截取部分打印,解释如下:

# 注意:由于是多线程打印,实际可能出现内容错乱现象。以下为整理后的内容

# 初始化端口0网卡, 创建队列:接收队列2个,发送队列3个.  转发源MAC地址为该网卡MAC,目的MAC地址为默认00(可能通过启动参数修改),txq=1,0,0 对应参数为lcore_id, queueid, socketid
Initializing port 0 ... Creating queues: nb_rxq=2 nb_txq=3...  Address:00:0C:29:4C:18:9B, Destination:02:00:00:00:00:00,txq=1,0,0 txq=2,1,0 txq=3,2,0 

# 初始化端口1网卡, 创建队列:接收队列1个,发送队列3个.  转发源MAC地址为该网卡MAC,目的MAC地址为默认01(可能通过启动参数修改),txq=1,0,0 对应参数为lcore_id, queueid, socketid
Initializing port 1 ... Creating queues: nb_rxq=1 nb_txq=3...  Address:00:0C:29:4C:18:A5, Destination:02:00:00:00:00:01, txq=1,0,0 txq=2,1,0 txq=3,2,0 

# 接收队列,通过启动参数进行绑定核心,rxq=0,0,0 对应参数portid, queueid, socketid
# 1. 网卡端口0的队列id_0的数据有cpu核心1进行解析处理
Initializing rx queues on lcore 1 ... rxq=0,0,0 
# 2. 网卡端口1的队列id_0的数据有cpu核心2进行解析处理
Initializing rx queues on lcore 2 ... rxq=1,0,0 
# 3. 网卡端口0的队列id_1的数据有cpu核心3进行解析处理
Initializing rx queues on lcore 3 ... rxq=0,1,0 

示意图如下:

在这里插入图片描述

图中左右NIC 0为一个网卡,这样画只是为了图解方便,NIC 1同理。

一共绑定了3个核,接收队列:网卡0设置了2个接收队列,队列ID 0的数据包由核心1解析处理,队列ID 1的数据包由由核心3解析处理,网卡0设置了1个接收队列,队列ID 0的数据包由核心2解析处理。发送队列:每个网卡都设置了3个发送队列(有几个解析核心就需要有几个发送队列),原因是接收队列接收的数据包经过LPM或者HASH匹配之后才能确定往哪一个网卡端口转发,因此需要对每一个核心都设置两个发送队列与网卡发送队列对应起来。

CPU利用率如下图所示:

在这里插入图片描述

NOTE:核心和接收队列对应关系并非只能1-1对应,也可以通过下列命令启动程序

[root@LFTF dpdk-l3fwd]# ./l3fwd_app-static -l 1-6 -n 4 -- -p 0x3 -P -L --eth-dest 0,00:0c:29:4c:18:7d --eth-dest 1,52:54:00:9d:ae:51 --config="(0,0,1),(1,0,2),(0,1,3),(1,1,4),(0,2,5),(0,3,6),(1,2,1),(1,3,1),(1,4,1),(1,5,1),(1,6,1),(1,7,1),(1,8,1),(1,9,1),(1,10,1),(1,11,1),(1,12,1),(1,13,1),(1,14,1),(1,15,1)" --parse-ptype

注意:RX queues需要为2的幂次方,否则会出现报错,如下:

在这里插入图片描述

1.4.1、接收队列初始化

首先看看如下命令启动之后接收队列打印:

[root@LFTF dpdk-l3fwd]# ./l3fwd_app-static -l 1-6 -n 4 -- -p 0x3 -P -L --eth-dest 0,ac:f9:70:83:b6:61 --eth-dest 1,74:a4:b5:01:8e:1a --config="(0,0,1),(1,0,2),(0,1,3),(1,1,4),(0,2,5),(0,3,6),(1,2,1),(1,3,1),(1,4,1),(1,5,1),(1,6,1),(1,7,1),(1,8,1),(1,9,1),(1,10,1),(1,11,1),(1,12,1),(1,13,1),(1,14,1),(1,15,1)" --parse-ptype

接收队列初始化打印如下:

Initializing rx queues on lcore 1 ... rxq=0,0,0 rxq=1,2,0 rxq=1,3,0 rxq=1,4,0 rxq=1,5,0 rxq=1,6,0 rxq=1,7,0 rxq=1,8,0 rxq=1,9,0 rxq=1,10,0 rxq=1,11,0 rxq=1,12,0 rxq=1,13,0 rxq=1,14,0 rxq=1,15,0 
Initializing rx queues on lcore 2 ... rxq=1,0,0 
Initializing rx queues on lcore 3 ... rxq=0,1,0 
Initializing rx queues on lcore 4 ... rxq=1,1,0 
Initializing rx queues on lcore 5 ... rxq=0,2,0 
Initializing rx queues on lcore 6 ... rxq=0,3,0 

# 参数对应的含义 printf("rxq=%d,%d,%d ", portid, queueid, socketid);

核心1初始化创建了15个接收队列,其他核心均设置了1个接收队列。队列ID连续是和网卡端口相对应的,比如

网卡端口0: 有4个接收队列,其中1个接收队列(队列id 0)绑定在了核心1(即这一个队列里的数据包有核心1进行处理),队列id 1的数据则绑定在了核心3上面;队列id 2的数据则绑定在了核心5上面;队列id 3的数据则绑定在了核心6上面。

网卡端口1: 有16个接收队列,其中14个接收队列(队列id 2-15)绑定在了核心1(即这些队列里的数据包均有核心1进行处理),队列id 0的数据则绑定在了核心2上面;队列id 1的数据则绑定在了核心4上面。

接受队列初始化,伪代码如下:

// 循环遍历每一个核心
for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
    /* init RX queues */ // qconf->n_rx_queue表示该核心绑定的接收队列个数(有“应用程序参数初始化”时赋值)
    for(queue = 0; queue < qconf->n_rx_queue; ++queue) {
        struct rte_eth_rxconf rxq_conf;
        portid = qconf->rx_queue_list[queue].port_id; // 当前队列绑定的网卡端口ID值
        queueid = qconf->rx_queue_list[queue].queue_id; // 队列ID值,不一定连续
        
        printf("rxq=%d,%d,%d ", portid, queueid, socketid);

        ret = rte_eth_dev_info_get(portid, &dev_info); // 获取网卡驱动信息
        if (ret != 0)
            rte_exit(EXIT_FAILURE,
                     "Error during getting device (port %u) info: %s\n",
                     portid, strerror(-ret));

        rxq_conf = dev_info.default_rxconf;
        rxq_conf.offloads = port_conf.rxmode.offloads;

        // 使用之前创建的pktmbuf_pool初始化每一个接收队列
        ret = rte_eth_rx_queue_setup(portid, queueid,
                                     nb_rxd, socketid,
                                     &rxq_conf,
                                     pktmbuf_pool[portid][socketid]);
        if (ret < 0)
            rte_exit(EXIT_FAILURE,
                     "rte_eth_rx_queue_setup: err=%d, port=%d\n",
                     ret, portid);
    }
}

rte_eth_rx_queue_setup参数解释

/* @param port_id
 *   网卡端口ID
 * @param rx_queue_id
 *   接收队列ID
 * @param nb_rx_desc
 *   接收RING SIZE
 * @param socket_id
 *   NUMA架构下, 标识该lcore对应的NUMA node, 非NUMA架构传入值没有约束
 * @param rx_conf
 *   dpdk-l3fwd样例主要用于激活网卡硬件的OFFLOAD功能
 * @param mb_pool
 *   用于指向存放网卡接收到的rte_mbuf*的内存池,
 * @return 返回值信息
 *   - 0: Success, receive queue correctly set up.
 *   - -EIO: if device is removed.
 *   - -ENODEV: if *port_id* is invalid.
 *   - -EINVAL: The memory pool pointer is null or the size of network buffers
 *      which can be allocated from this memory pool does not fit the various
 *      buffer sizes allowed by the device controller.
 *   - -ENOMEM: Unable to allocate the receive ring descriptors or to
 *      allocate network memory buffers from the memory pool when
 *      initializing receive descriptors.
 */

int rte_eth_rx_queue_setup(uint16_t port_id, uint16_t rx_queue_id,
		uint16_t nb_rx_desc, unsigned int socket_id,
		const struct rte_eth_rxconf *rx_conf,
		struct rte_mempool *mb_pool);
1.4.2、发送队列初始化

DPDK-L3FWD样例发送队列初始化遍历的时候,外层是网卡端口,内层是绑定的core id,且队列id的值是自增的,这样初始化就可以保证,每一个绑定发的核心core均可以创建DPDK使用网卡端口个数的队列,便于转发使用。(通俗一点讲就是发送队列的配置是代码里写死的,不能通过启动参数进行配置)

首先看看如下命令启动之后发送队列打印:

[root@LFTF dpdk-l3fwd]# ./l3fwd_app-static -l 1-6 -n 4 -- -p 0x3 -P -L --eth-dest 0,ac:f9:70:83:b6:61 --eth-dest 1,74:a4:b5:01:8e:1a --config="(0,0,1),(1,0,2),(0,1,3),(1,1,4),(0,2,5),(0,3,6),(1,2,1),(1,3,1),(1,4,1),(1,5,1),(1,6,1),(1,7,1),(1,8,1),(1,9,1),(1,10,1),(1,11,1),(1,12,1),(1,13,1),(1,14,1),(1,15,1)" --parse-ptype

发送队列初始化打印如下:

Port 0, txq=1,0,0 txq=2,1,0 txq=3,2,0 txq=4,3,0 txq=5,4,0 txq=6,5,0

Port 1, txq=1,0,0 txq=2,1,0 txq=3,2,0 txq=4,3,0 txq=5,4,0 txq=6,5,0 

#参数对应含义: printf("txq=%u,%d,%d ", lcore_id, queueid, socketid);

注意:队列ID连续是和网卡端口相对应的

发送队列初始化,伪代码如下:

RTE_ETH_FOREACH_DEV(portid) {
    struct rte_eth_conf local_port_conf = port_conf;
    /*去掉了网卡初始化相关代码*/
    
    /* init one TX queue per couple (lcore,port) */
    queueid = 0;
    for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
        if (rte_lcore_is_enabled(lcore_id) == 0)
            continue;

        printf("txq=%u,%d,%d ", lcore_id, queueid, socketid);

        txconf = &dev_info.default_txconf;
        txconf->offloads = local_port_conf.txmode.offloads;
        // 发送队列设置
        ret = rte_eth_tx_queue_setup(portid, queueid, nb_txd,
                                     socketid, txconf);
        if (ret < 0)
            rte_exit(EXIT_FAILURE,
                     "rte_eth_tx_queue_setup: err=%d, "
                     "port=%d\n", ret, portid);
        
        qconf = &lcore_conf[lcore_id];
        qconf->tx_queue_id[portid] = queueid;
        queueid++;

        qconf->tx_port_id[qconf->n_tx_port] = portid;
        qconf->n_tx_port++;
    }
    printf("\n");
}

rte_eth_tx_queue_setup参数解释

/* @param port_id
 *   网卡端口ID
 * @param tx_queue_id
 *   发送队列ID
 * @param nb_tx_desc
 *   发送RING SIZE
 * @param socket_id
 *   NUMA架构下, 标识该lcore对应的NUMA node, 非NUMA架构传入值没有约束
 * @param tx_conf
 *   用于配置发送队列,如果传入NULL值则会使用默认配置 返回值
 * @return 返回值
 *   - 0: Success, the transmit queue is correctly set up.
 *   - -ENOMEM: Unable to allocate the transmit ring descriptors.
 */
int rte_eth_tx_queue_setup(uint16_t port_id, uint16_t tx_queue_id,
		uint16_t nb_tx_desc, unsigned int socket_id,
		const struct rte_eth_txconf *tx_conf);

1.5、添加队列打印

队列初始化完成之后,添加一个函数,功能是打印每个核所绑定的网卡端口及队列ID信息。代码添加位置如下:

在这里插入图片描述

具体代码如下:

static void print_queue_info( void )
{
	unsigned int lcore_id;
	struct lcore_conf *qconf;

	/* printf all lcore */
	printf("\n########################################################\n");
	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
		if (rte_lcore_is_enabled(lcore_id) == 0)
			continue;
		printf("Core Id: %u\n", lcore_id);
		qconf = &lcore_conf[lcore_id];
		for(int i = 0; i < qconf->n_rx_queue; i++) {
			printf("port_id = %u, rx_queue_id = %u, lcore_id = %u\n", \
				qconf->rx_queue_list[i].port_id, qconf->rx_queue_list[i].queue_id, lcore_id);
		}

		for(int j = 0; j < qconf->n_tx_port; j++) {
			printf("port_id = %u, tx_queue_id = %u, lcore_id = %u\n", \
				qconf->tx_port_id[j], qconf->tx_queue_id[j], lcore_id);
		}
		printf("########################################################\n");
	}
}

启动程序参数及打印信息如下:

[root@LFTF dpdk-l3fwd]# ./build/l3fwd_app-static -l 1-3 -n 4 -- -p 0x3 --config="(0,0,1),(1,0,2)(0,1,3)" --parse-ptype

在这里插入图片描述

[root@LFTF dpdk-l3fwd]# ./l3fwd_app-static -l 1-6 -n 4 -- -p 0x3 -P -L --eth-dest 0,ac:f9:70:83:b6:61 --eth-dest 1,74:a4:b5:01:8e:1a --config="(0,0,1),(1,0,2),(0,1,3),(1,1,4),(0,2,5),(0,3,6),(1,2,1),(1,3,1),(1,4,1),(1,5,1),(1,6,1),(1,7,1),(1,8,1),(1,9,1),(1,10,1),(1,11,1),(1,12,1),(1,13,1),(1,14,1),(1,15,1)" --parse-ptype

在这里插入图片描述

可以看出,rx_queuelcore之间的绑定是通过启动命令参数加载配置的,而tx_queuelcore之间的绑定是默认每一个lcore均配置网卡端口数量(2)个tx_queue队列数。

1.6、网卡驱动启动

这里逻辑上没有什么问题,直接调用相关API使用即可

在这里插入图片描述

**小结:**至此,dpdk-l3fwd程序初始化基本完成,后面则循环遍历每一个lcore并生成相应的线程进行数据包的解析处理转发等功能,调用的接口函数如下:

/* launch per-lcore init on every lcore */
rte_eal_mp_remote_launch(l3fwd_lkp.main_loop, NULL, CALL_MAIN);

main_loop代码中涉及到HASH lookup functionsLPM lookup functions,主要区别在于程序解析到L3数据时,使用什么匹配逻辑进行转发,因此这个不作为介绍的重点,后续有需求了在详细了解即可。本文按照lpm_main_loop接口进行后面代码的介绍。

数据解析转发的整体处理流程图如下所示:

在这里插入图片描述

lpm_main_loop源码:

/* main processing loop */
int
lpm_main_loop(__rte_unused void *dummy)
{
	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
	unsigned lcore_id;
	uint64_t prev_tsc, diff_tsc, cur_tsc;
	int i, nb_rx;
	uint16_t portid;
	uint8_t queueid;
	struct lcore_conf *qconf;
	const uint64_t drain_tsc = (rte_get_tsc_hz() + US_PER_S - 1) /
		US_PER_S * BURST_TX_DRAIN_US;
	prev_tsc = 0;
	lcore_id = rte_lcore_id();
	qconf = &lcore_conf[lcore_id];
	if (qconf->n_rx_queue == 0) {
		RTE_LOG(INFO, L3FWD, "lcore %u has nothing to do\n", lcore_id);
		return 0;
	}
	RTE_LOG(INFO, L3FWD, "entering main loop on lcore %u\n", lcore_id);
	for (i = 0; i < qconf->n_rx_queue; i++) {
		portid = qconf->rx_queue_list[i].port_id;
		queueid = qconf->rx_queue_list[i].queue_id;
		RTE_LOG(INFO, L3FWD,
			" -- lcoreid=%u portid=%u rxqueueid=%hhu\n",
			lcore_id, portid, queueid);
	}
	
	while (!force_quit) {
		cur_tsc = rte_rdtsc();
		// 定时将TX队列中的数据转发出去
		/*
		 * TX burst queue drain
		 */
		diff_tsc = cur_tsc - prev_tsc;
		if (unlikely(diff_tsc > drain_tsc)) {
			for (i = 0; i < qconf->n_tx_port; ++i) {
				portid = qconf->tx_port_id[i];
				if (qconf->tx_mbufs[portid].len == 0)
					continue;
				send_burst(qconf,
					qconf->tx_mbufs[portid].len,
					portid);
				qconf->tx_mbufs[portid].len = 0;
			}
			prev_tsc = cur_tsc;
		}
		
		/*
		 * Read packet from RX queues
		 */
		for (i = 0; i < qconf->n_rx_queue; ++i) {
			portid = qconf->rx_queue_list[i].port_id;
			queueid = qconf->rx_queue_list[i].queue_id;
            // 读取RX队列数据包,返回值为获取到数据包的个数,pkts_burst只想第一个rte_mbuf *
			nb_rx = rte_eth_rx_burst(portid, queueid, pkts_burst,
				MAX_PKT_BURST);
            // 如果返回值为0,说明网卡未接受到数据
			if (nb_rx == 0)
				continue;
			// 测试机在LINUX x86_64服务器上, 可通过arch命令查看服务器架构
#if defined RTE_ARCH_X86 || defined __ARM_NEON || defined RTE_ARCH_PPC_64
			l3fwd_lpm_send_packets(nb_rx, pkts_burst,
						portid, qconf);
#else
			l3fwd_lpm_no_opt_send_packets(nb_rx, pkts_burst,
							portid, qconf);
#endif /* X86 */
		}
	}
	return 0;
}

大致逻辑就是每一个LCORE创建一个线程,然后循环处理,定时遍历TX queue,如果队列中有数据则转发到相应的网卡端口,然后遍历接收处理RX queue队列,将接收到的数据包传入到l3fwd_lpm_send_packets进行解析处理。

2、数据解析转发模块

数据接收主要调用rte_eth_rx_burst接口从接收队列中获取rte_mbuf *数据包然后调用l3fwd_lpm_send_packets对数据包进行解析,下面进入到l3fwd_lpm_send_packets函数查看如何进行解析处理的。根据不同的服务器架构,调用的函数也不同,这里x86架构下调用的API接口为l3fwd_lpm_sse.h里的,后面所涉及到不同架构下的API接口,这里均为xxxx_sse.h文件里的调用。如下:

在这里插入图片描述

方便理解,先看一下数据包传入l3fwd_lpm_send_packets接口后的处理流程图及对应代码:

在这里插入图片描述

涉及到的函数名及功能:

函数名功能
processx4_step1循环一次获取4个数据包的目的IPIP类型
processx4_step2循环一次获取4个数据包经过LPM匹配之后需要转发的网卡端口ID
lpm_get_dst_port获取单个数据包经过LPM匹配之后需要转发的网卡端口ID
processx4_step3循环一次修改4个数据包的源和目的MAC
port_groupx4循环一次记录连续目的网卡端口ID相同数据包的个数
process_packet修改单个数据包源和目的MAC
GROUP_PORT_STEP记录单个连续目的网卡端口ID相同数据包的个数
send_packetsx4一次性转发处理多个目的端口ID相同且连续的数据包

l3fwd_lpm_send_packets源码解析如下:

static inline void
l3fwd_lpm_send_packets(int nb_rx, struct rte_mbuf **pkts_burst,
			uint16_t portid, struct lcore_conf *qconf)
{
	int32_t j;
	uint16_t dst_port[MAX_PKT_BURST];
	__m128i dip[MAX_PKT_BURST / FWDSTEP];
	uint32_t ipv4_flag[MAX_PKT_BURST / FWDSTEP];
    // 获取接收数据包数量4的倍数最近的值(k值需小于等于nb_rx),如:接收30个数据包,那么K值为28
    // 调用下面的for循环可以一次性处理4个数据包,然后将剩余的2个数据包单独进行处理,可以提高数据处理效率
	const int32_t k = RTE_ALIGN_FLOOR(nb_rx, FWDSTEP);
	// 一次性处理4个数据包,解析出数据包L3层的目的IP地址
	for (j = 0; j != k; j += FWDSTEP)
		processx4_step1(&pkts_burst[j], &dip[j / FWDSTEP],
				&ipv4_flag[j / FWDSTEP]);
	// 将解析出的4个数据包的目的IP地址进行LPM匹配,获取需要将该数据包转发到哪个网卡端口id中,记录到dst_port带出
	for (j = 0; j != k; j += FWDSTEP)
		processx4_step2(qconf, dip[j / FWDSTEP],
				ipv4_flag[j / FWDSTEP], portid, &pkts_burst[j], &dst_port[j]);

	/* Classify last up to 3 packets one by one */
    // 对不足4个数据包的数据进行单独解析处理
	switch (nb_rx % FWDSTEP) {
	case 3:
		dst_port[j] = lpm_get_dst_port(qconf, pkts_burst[j], portid);
		j++;
		/* fall-through */
	case 2:
		dst_port[j] = lpm_get_dst_port(qconf, pkts_burst[j], portid);
		j++;
		/* fall-through */
	case 1:
		dst_port[j] = lpm_get_dst_port(qconf, pkts_burst[j], portid);
		j++;
	}
	// 将数据包发送匹配得到的det_port中
	send_packets_multi(qconf, pkts_burst, dst_port, nb_rx);
}

send_packets_multi源码注释如下:

/**
 * Send packets burst from pkts_burst to the ports in dst_port array
 */
static __rte_always_inline void
send_packets_multi(struct lcore_conf *qconf, struct rte_mbuf **pkts_burst,
		uint16_t dst_port[MAX_PKT_BURST], int nb_rx)
{
	int32_t k;
	int j = 0;
	uint16_t dlp;
	uint16_t *lp;
    // 用于记录目的端口相同且连续数据包的个数
	uint16_t pnum[MAX_PKT_BURST + 1];

    /*************************START***************************************/
    // 获取目的端口相同且连续数据包的个数;同时更新数据包源/目的MAC地址
	/*
	 * Finish packet processing and group consecutive
	 * packets with the same destination port.
	 */
	k = RTE_ALIGN_FLOOR(nb_rx, FWDSTEP);
	if (k != 0) {
		__m128i dp1, dp2;

		lp = pnum;
		lp[0] = 1;

		processx4_step3(pkts_burst, dst_port);

		/* dp1: <d[0], d[1], d[2], d[3], ... > */
		dp1 = _mm_loadu_si128((__m128i *)dst_port);

		for (j = FWDSTEP; j != k; j += FWDSTEP) {
			processx4_step3(&pkts_burst[j], &dst_port[j]);

			/*
			 * dp2:
			 * <d[j-3], d[j-2], d[j-1], d[j], ... >
			 */
			dp2 = _mm_loadu_si128((__m128i *)
					&dst_port[j - FWDSTEP + 1]);
			lp  = port_groupx4(&pnum[j - FWDSTEP], lp, dp1, dp2);

			/*
			 * dp1:
			 * <d[j], d[j+1], d[j+2], d[j+3], ... >
			 */
			dp1 = _mm_srli_si128(dp2, (FWDSTEP - 1) *
						sizeof(dst_port[0]));
		}

		/*
		 * dp2: <d[j-3], d[j-2], d[j-1], d[j-1], ... >
		 */
		dp2 = _mm_shufflelo_epi16(dp1, 0xf9);
		lp  = port_groupx4(&pnum[j - FWDSTEP], lp, dp1, dp2);

		/*
		 * remove values added by the last repeated
		 * dst port.
		 */
		lp[0]--;
		dlp = dst_port[j - 1];
	} else {
		/* set dlp and lp to the never used values. */
		dlp = BAD_PORT - 1;
		lp = pnum + MAX_PKT_BURST;
	}

	/* Process up to last 3 packets one by one. */
	switch (nb_rx % FWDSTEP) {
	case 3:
		process_packet(pkts_burst[j], dst_port + j);
		GROUP_PORT_STEP(dlp, dst_port, lp, pnum, j);
		j++;
		/* fall-through */
	case 2:
		process_packet(pkts_burst[j], dst_port + j);
		GROUP_PORT_STEP(dlp, dst_port, lp, pnum, j);
		j++;
		/* fall-through */
	case 1:
		process_packet(pkts_burst[j], dst_port + j);
		GROUP_PORT_STEP(dlp, dst_port, lp, pnum, j);
		j++;
	}
	/***************************END*************************************/
	
    /*
	 * Send packets out, through destination port.
	 * Consecutive packets with the same destination port
	 * are already grouped together.
	 * If destination port for the packet equals BAD_PORT,
	 * then free the packet without sending it out.
	 */
    // k表示当前数据包开始后面需要转发的目的端口ID相同的数据包的个数
	for (j = 0; j < nb_rx; j += k) {

		int32_t m;
		uint16_t pn;

		pn = dst_port[j];
		k = pnum[j];

		if (likely(pn != BAD_PORT)){
            // 一次性处理K个数据包
			send_packetsx4(qconf, pn, pkts_burst + j, k);
		}
		else
			for (m = j; m != j + k; m++)
				rte_pktmbuf_free(pkts_burst[m]);

	}
}

**Note:**上述代码中pnum数组如何维护的没有看太懂,但是把相关代码给扒了下来,有兴趣的朋友可以通过下面样例代码自己调试学习下:

// 编译命令: gcc -o main main.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
#include <sys/types.h>
#include <sys/queue.h>
#include <netinet/in.h>
#include <setjmp.h>
#include <stdarg.h>
#include <ctype.h>
#include <errno.h>
#include <getopt.h>
#include <signal.h>
#include <stdbool.h>
#include <emmintrin.h>

#define FWDSTEP				4
#define MAX_PKT_BURST     	32
#define	BAD_PORT ((uint16_t)-1)

#ifndef likely
#define likely(x)	__builtin_expect(!!(x), 1)
#endif /* likely */

/**
 * Check if a branch is unlikely to be taken.
 *
 * This compiler builtin allows the developer to indicate if a branch is
 * unlikely to be taken. Example:
 *
 *   if (unlikely(x < 1))
 *      do_stuff();
 *
 */
#ifndef unlikely
#define unlikely(x)	__builtin_expect(!!(x), 0)
#endif /* unlikely */

/**
 * Macro to align a value to a given power-of-two. The resultant value
 * will be of the same type as the first parameter, and will be no
 * bigger than the first parameter. Second parameter must be a
 * power-of-two value.
 */
#define RTE_ALIGN_FLOOR(val, align) \
	(typeof(val))((val) & (~((typeof(val))((align) - 1))))

/*
 * We group consecutive packets with the same destionation port into one burst.
 * To avoid extra latency this is done together with some other packet
 * processing, but after we made a final decision about packet's destination.
 * To do this we maintain:
 * pnum - array of number of consecutive packets with the same dest port for
 * each packet in the input burst.
 * lp - pointer to the last updated element in the pnum.
 * dlp - dest port value lp corresponds to.
 */

#define	GRPSZ	(1 << FWDSTEP)
#define	GRPMSK	(GRPSZ - 1)

#define GROUP_PORT_STEP(dlp, dcp, lp, pn, idx)	do { \
	if (likely((dlp) == (dcp)[(idx)])) {             \
		(lp)[0]++;                                   \
	} else {                                         \
		(dlp) = (dcp)[idx];                          \
		(lp) = (pn) + (idx);                         \
		(lp)[0] = 1;                                 \
	}                                                \
} while (0)

static const struct {
	uint64_t pnum; /* prebuild 4 values for pnum[]. */
	int32_t  idx;  /* index for new last updated elemnet. */
	uint16_t lpv;  /* add value to the last updated element. */
} gptbl[GRPSZ] = {
	{
		/* 0: a != b, b != c, c != d, d != e */
		.pnum = UINT64_C(0x0001000100010001),
		.idx = 4,
		.lpv = 0,
	},
	{
		/* 1: a == b, b != c, c != d, d != e */
		.pnum = UINT64_C(0x0001000100010002),
		.idx = 4,
		.lpv = 1,
	},
	{
		/* 2: a != b, b == c, c != d, d != e */
		.pnum = UINT64_C(0x0001000100020001),
		.idx = 4,
		.lpv = 0,
	},
	{
		/* 3: a == b, b == c, c != d, d != e */
		.pnum = UINT64_C(0x0001000100020003),
		.idx = 4,
		.lpv = 2,
	},
	{
		/* 4: a != b, b != c, c == d, d != e */
		.pnum = UINT64_C(0x0001000200010001),
		.idx = 4,
		.lpv = 0,
	},
	{
		/* 5: a == b, b != c, c == d, d != e */
		.pnum = UINT64_C(0x0001000200010002),
		.idx = 4,
		.lpv = 1,
	},
	{
		/* 6: a != b, b == c, c == d, d != e */
		.pnum = UINT64_C(0x0001000200030001),
		.idx = 4,
		.lpv = 0,
	},
	{
		/* 7: a == b, b == c, c == d, d != e */
		.pnum = UINT64_C(0x0001000200030004),
		.idx = 4,
		.lpv = 3,
	},
	{
		/* 8: a != b, b != c, c != d, d == e */
		.pnum = UINT64_C(0x0002000100010001),
		.idx = 3,
		.lpv = 0,
	},
	{
		/* 9: a == b, b != c, c != d, d == e */
		.pnum = UINT64_C(0x0002000100010002),
		.idx = 3,
		.lpv = 1,
	},
	{
		/* 0xa: a != b, b == c, c != d, d == e */
		.pnum = UINT64_C(0x0002000100020001),
		.idx = 3,
		.lpv = 0,
	},
	{
		/* 0xb: a == b, b == c, c != d, d == e */
		.pnum = UINT64_C(0x0002000100020003),
		.idx = 3,
		.lpv = 2,
	},
	{
		/* 0xc: a != b, b != c, c == d, d == e */
		.pnum = UINT64_C(0x0002000300010001),
		.idx = 2,
		.lpv = 0,
	},
	{
		/* 0xd: a == b, b != c, c == d, d == e */
		.pnum = UINT64_C(0x0002000300010002),
		.idx = 2,
		.lpv = 1,
	},
	{
		/* 0xe: a != b, b == c, c == d, d == e */
		.pnum = UINT64_C(0x0002000300040001),
		.idx = 1,
		.lpv = 0,
	},
	{
		/* 0xf: a == b, b == c, c == d, d == e */
		.pnum = UINT64_C(0x0002000300040005),
		.idx = 0,
		.lpv = 4,
	},
};

static inline uint16_t *
port_groupx4(uint16_t pn[FWDSTEP + 1], uint16_t *lp, __m128i dp1, __m128i dp2)
{
	union {
		uint16_t u16[FWDSTEP + 1];
		uint64_t u64;
	} *pnum = (void *)pn;

	int32_t v;

	dp1 = _mm_cmpeq_epi16(dp1, dp2);
	dp1 = _mm_unpacklo_epi16(dp1, dp1);
	v = _mm_movemask_ps((__m128)dp1);

	/* update last port counter. */
	lp[0] += gptbl[v].lpv;

	/* if dest port value has changed. */
	if (v != GRPMSK) {
		pnum->u64 = gptbl[v].pnum;
		pnum->u16[FWDSTEP] = 1;
		lp = pnum->u16 + gptbl[v].idx;
	}

	return lp;
}

static void send_packets_multi(uint16_t dst_port[MAX_PKT_BURST], int nb_rx)
{
	int32_t k;
	int j = 0;
	uint16_t dlp;
	uint16_t *lp;
	uint16_t pnum[MAX_PKT_BURST + 1];

	/*
	 * Finish packet processing and group consecutive
	 * packets with the same destination port.
	 */
	k = RTE_ALIGN_FLOOR(nb_rx, FWDSTEP);
	if (k != 0) {
		__m128i dp1, dp2;

		lp = pnum;
		lp[0] = 1;

		/* dp1: <d[0], d[1], d[2], d[3], ... > */
		dp1 = _mm_loadu_si128((__m128i *)dst_port);

		for (j = FWDSTEP; j != k; j += FWDSTEP) {
			/*
			 * dp2:
			 * <d[j-3], d[j-2], d[j-1], d[j], ... >
			 */
			dp2 = _mm_loadu_si128((__m128i *)
					&dst_port[j - FWDSTEP + 1]);
			lp  = port_groupx4(&pnum[j - FWDSTEP], lp, dp1, dp2);

			/*
			 * dp1:
			 * <d[j], d[j+1], d[j+2], d[j+3], ... >
			 */
			dp1 = _mm_srli_si128(dp2, (FWDSTEP - 1) *
						sizeof(dst_port[0]));
		}

		/*
		 * dp2: <d[j-3], d[j-2], d[j-1], d[j-1], ... >
		 */
		dp2 = _mm_shufflelo_epi16(dp1, 0xf9);
		lp  = port_groupx4(&pnum[j - FWDSTEP], lp, dp1, dp2);

		/*
		 * remove values added by the last repeated
		 * dst port.
		 */
		lp[0]--;
		dlp = dst_port[j - 1];
	} else {
		/* set dlp and lp to the never used values. */
		dlp = BAD_PORT - 1;
		lp = pnum + MAX_PKT_BURST;
	}

	/* Process up to last 3 packets one by one. */
	switch (nb_rx % FWDSTEP) {
	case 3:
		GROUP_PORT_STEP(dlp, dst_port, lp, pnum, j);
		j++;
		/* fall-through */
	case 2:
		GROUP_PORT_STEP(dlp, dst_port, lp, pnum, j);
		j++;
		/* fall-through */
	case 1:
		GROUP_PORT_STEP(dlp, dst_port, lp, pnum, j);
		j++;
	}
	for (j = 0; j < nb_rx; j += k) {

		int32_t m;
		uint16_t pn;
		pn = dst_port[j];
		k = pnum[j];	
        // 打印输出K是不是所理解的 [7, 2, 2]
		if (likely(pn != BAD_PORT)){
			printf("k = %u\n", k);
		}
		else
			printf("j = %u\n", j);
	}
}

int main(int argc, char **argv)
{
    // 假装接收了11个数据包
    int nb_rx = 11;
	// 初始化以下接收到的11个数据包要转发的端口ID
    uint16_t dst_port[MAX_PKT_BURST] = {1,1,1,1,1,1,1,0,0,1,1};

	send_packets_multi(dst_port, nb_rx);

	return 0;
}

3、添加计数打印

程序退出时,将每一个RX queueTX queue所接收和转发的计数打印出来,添加位置如下,在main_loop循环退出之后:

在这里插入图片描述

struct lcore_conf结构体添加计数变量,更新前后对比如下:

在这里插入图片描述

struct lcore_conf结构体更新后如下:

struct lcore_rx_queue {
	uint16_t port_id;
	uint8_t queue_id;
	uint64_t rx_packet_cnts;
} __rte_cache_aligned;

struct lcore_conf {
	uint16_t n_rx_queue;
	struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
	uint16_t n_tx_port;
	uint16_t tx_port_id[RTE_MAX_ETHPORTS];
	uint16_t tx_queue_id[RTE_MAX_ETHPORTS];
	uint64_t tx_packet_cnts[RTE_MAX_ETHPORTS];
	struct mbuf_table tx_mbufs[RTE_MAX_ETHPORTS];
	void *ipv4_lookup_struct;
	void *ipv6_lookup_struct;
	
} __rte_cache_aligned;

rx queue计数添加位置如下:

在这里插入图片描述

tx queue计数添加位置如下:

在这里插入图片描述

display_cnt代码如下:

static void display_cnt( void )
{
	unsigned int lcore_id;
	struct lcore_conf *qconf;

	uint32_t rx_total = 0, tx_total = 0;

	/* printf all lcore */
	printf("\n***********************************************************\n");
	for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
		if (rte_lcore_is_enabled(lcore_id) == 0)
			continue;
		printf("Core Id: %u\n", lcore_id);
		qconf = &lcore_conf[lcore_id];
		for(int i = 0; i < qconf->n_rx_queue; i++) {
			printf("port_id = %u, rx_queue_id = %u, lcore_id = %u, rx_cnt = %lu\n", \
				qconf->rx_queue_list[i].port_id, qconf->rx_queue_list[i].queue_id, lcore_id, qconf->rx_queue_list[qconf->rx_queue_list[i].queue_id].rx_packet_cnts);
			rx_total += qconf->rx_queue_list[qconf->rx_queue_list[i].queue_id].rx_packet_cnts;
		}

		for(int j = 0; j < qconf->n_tx_port; j++) {
			printf("port_id = %u, tx_queue_id = %u, lcore_id = %u, tx_cnt = %lu\n", \
				qconf->tx_port_id[j], qconf->tx_queue_id[j], lcore_id, qconf->tx_packet_cnts[qconf->tx_port_id[j]]);
			tx_total += qconf->tx_packet_cnts[qconf->tx_port_id[j]];
		}
		printf("\n***********************************************************\n");
	}
	printf("\nRX_total = %u, TX_total = %u\n", rx_total, tx_total);
	printf("\n***********************************************************\n");
}

启动命令如下:

[root@LFTF dpdk-l3fwd]# ./l3fwd_app-static -l 1-3 -n 4 -- -p 0x3 -P -L --config="(0,0,1),(1,0,2),(0,1,3)" --parse-ptype

打印输出如下:

在这里插入图片描述

4、总结

至此,dpdk-l3fwd源码基本梳理完毕,主要还是整理了一下DPDK绑定的网卡IDlcorequeue id这三者之间的关联以及数据包解析转发逻辑,基本都是代码实现功能的表层逻辑,比较简单。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐