我们以RHEL6的xen netfront前端驱动为例。RHEL5的前端驱动,有一个acceleration的特性,是由一家叫solarflare的公司为其网卡定制的plugin,基本架构就是直接从网卡拿了包扔给虚拟机,不经过网桥,当然这需要前端后端都要支持这个acceleration feature。感兴趣的可以去看这篇文章

http://www.tecnologika.co.uk/wp-content/uploads/2040_solarflare_Getting_10Gbps_from_Xen_cognidox.pdf

这个plugin到了RHEL6就没有了


言归正传,RHEL6的前端驱动就一个文件 drivers/net/xen-netfront.c,和块设备的IO ring不同的是,网络前后端有两种类型的IO ring,对应rx, tx两种场景。

tx场景代表了前端驱动发送数据包,对应的数据结构为

struct xen_netif_tx_request {
    grant_ref_t gref;      /* Reference to buffer page */
    uint16_t offset;       /* Offset within buffer page */
    uint16_t flags;        /* NETTXF_* */
    uint16_t id;           /* Echoed in response message. */
    uint16_t size;         /* Packet size in bytes.       */
};

struct xen_netif_tx_response {
    uint16_t id;
    int16_t  status;       /* NETIF_RSP_* */
};

请求结构为xen_netif_tx_request,可以看出每一个xen_netif_tx_request最多只有一个page的grant reference,因此对于scatter-gather的skb,必然对应多个xen_netif_tx_request。offset, size对应于page内的偏移和数据长度,id用于标识请求,xen_netif_tx_response会用到相同的id。

xen_netif_tx_request的flags包含的标识有

/*
 * This is the 'wire' format for packets:
 *  Request 1: netif_tx_request -- NETTXF_* (any flags)
 * [Request 2: netif_tx_extra]  (only if request 1 has NETTXF_extra_info)
 * [Request 3: netif_tx_extra]  (only if request 2 has XEN_NETIF_EXTRA_MORE)
 *  Request 4: netif_tx_request -- NETTXF_more_data
 *  Request 5: netif_tx_request -- NETTXF_more_data
 *  ...
 *  Request N: netif_tx_request -- 0
 */

/* Protocol checksum field is blank in the packet (hardware offload)? */
#define _NETTXF_csum_blank     (0)
#define  NETTXF_csum_blank     (1U<<_NETTXF_csum_blank)

/* Packet data has been validated against protocol checksum. */
#define _NETTXF_data_validated (1)
#define  NETTXF_data_validated (1U<<_NETTXF_data_validated)

/* Packet continues in the next request descriptor. */
#define _NETTXF_more_data      (2)
#define  NETTXF_more_data      (1U<<_NETTXF_more_data)

/* Packet to be followed by extra descriptor(s). */
#define _NETTXF_extra_info     (3)
#define  NETTXF_extra_info     (1U<<_NETTXF_extra_info)
另一个结构体xen_netif_extra_info,当第一个xen_netif_tx_request包含NETTXF_extra_info时,下一个xen_netif_tx_request实际包含的是xen_netif_extra_info的内容。
/* Types of netif_extra_info descriptors. */
#define XEN_NETIF_EXTRA_TYPE_NONE  (0)  /* Never used - invalid */
#define XEN_NETIF_EXTRA_TYPE_GSO   (1)  /* u.gso */
#define XEN_NETIF_EXTRA_TYPE_MAX   (2)

/* netif_extra_info flags. */
#define _XEN_NETIF_EXTRA_FLAG_MORE (0)
#define XEN_NETIF_EXTRA_FLAG_MORE  (1U<<_XEN_NETIF_EXTRA_FLAG_MORE)

/* GSO types - only TCPv4 currently supported. */
#define XEN_NETIF_GSO_TYPE_TCPV4        (1)

/*
 * This structure needs to fit within both netif_tx_request and
 * netif_rx_response for compatibility.
 */
struct xen_netif_extra_info {
    uint8_t type;  /* XEN_NETIF_EXTRA_TYPE_* */
    uint8_t flags; /* XEN_NETIF_EXTRA_FLAG_* */

    union {
        struct {
            /*
             * Maximum payload size of each segment. For
             * example, for TCP this is just the path MSS.
             */
            uint16_t size;

            /*
             * GSO type. This determines the protocol of
             * the packet and any extra features required
             * to segment the packet properly.
             */
            uint8_t type; /* XEN_NETIF_GSO_TYPE_* */

            /* Future expansion. */
            uint8_t pad;

            /*
             * GSO features. This specifies any extra GSO
             * features required to process this packet,
             * such as ECN support for TCPv4.
             */
            uint16_t features; /* XEN_NETIF_GSO_FEAT_* */
        } gso;

        uint16_t pad[3];
    } u;
};
xen_netif_extra_info的type目前只有XEN_NETIF_EXTRA_TYPE_GSO


OK,对于GSO的scatter-gather的skb,下面的xen_netif_tx_request都会包含NETTXF_more_data标志,直到最后一个xen_netif_tx_request其flag为0

与xen_netif_tx_request对应的是xen_netif_tx_response,两者id相同,xen_netif_tx_response包含了该request是否被成功发送的标志,e.g.


xen_netif_tx_request, xen_netif_tx_response一起构成了tx IO ring,而xen_netif_rx_request,xen_netif_rx_response则构成了另一个rx IO ring。和一般网卡驱动不同的是,前端驱动接收时,会首先把page分配好, 然后把grant_ref和id做为xen_netif_rx_request传递到后端,后端当有网络包来了之后,copy到page中再通过xen_netif_rx_response交给前端。

struct xen_netif_rx_request {
    uint16_t    id;        /* Echoed in response message.        */
    grant_ref_t gref;      /* Reference to incoming granted frame */
};

struct xen_netif_rx_response {
    uint16_t id;
    uint16_t offset;       /* Offset in page of start of received packet  */
    uint16_t flags;        /* NETRXF_* */
    int16_t  status;       /* -ve: BLKIF_RSP_* ; +ve: Rx'ed pkt size. */
};

/* Packet data has been validated against protocol checksum. */
#define _NETRXF_data_validated (0)
#define  NETRXF_data_validated (1U<<_NETRXF_data_validated)

/* Protocol checksum field is blank in the packet (hardware offload)? */
#define _NETRXF_csum_blank     (1)
#define  NETRXF_csum_blank     (1U<<_NETRXF_csum_blank)

/* Packet continues in the next request descriptor. */
#define _NETRXF_more_data      (2)
#define  NETRXF_more_data      (1U<<_NETRXF_more_data)

/* Packet to be followed by extra descriptor(s). */
#define _NETRXF_extra_info     (3)
#define  NETRXF_extra_info     (1U<<_NETRXF_extra_info)

#define NETIF_RSP_DROPPED         -2
#define NETIF_RSP_ERROR           -1
#define NETIF_RSP_OKAY             0
/* No response: used for auxiliary requests (e.g., netif_tx_extra). */
#define NETIF_RSP_NULL             1

下面是前端驱动的核心数据结构,netfront_info,定义如下

struct netfront_info {
    struct list_head list;
    struct net_device *netdev;  /* 对应的前端网络设备 */

    struct napi_struct napi;  /* NAPI poll用到的数据结构 */

    unsigned int evtchn;
    struct xenbus_device *xbdev;

    spinlock_t   tx_lock;
    struct xen_netif_tx_front_ring tx; /* tx ring的front ring */
    int tx_ring_ref;

    /*
     * {tx,rx}_skbs store outstanding skbuffs. Free tx_skb entries
     * are linked from tx_skb_freelist through skb_entry.link.
     *
     *  NB. Freelist index entries are always going to be less than
     *  PAGE_OFFSET, whereas pointers to skbs will always be equal or
     *  greater than PAGE_OFFSET: we use this property to distinguish
     *  them.
     */

    /* tx_skbs数组,要么存放等待发送的skb,要么存放free的slot,所有free的slot通过skb_netry.link连成一个以skb_free_list开头的链表 */
    union skb_entry {
        struct sk_buff *skb;
        unsigned long link;
    } tx_skbs[NET_TX_RING_SIZE];

    grant_ref_t gref_tx_head;
    grant_ref_t grant_tx_ref[NET_TX_RING_SIZE];  /* 每个tx skb所对应的grant ref */
    unsigned tx_skb_freelist;

    spinlock_t   rx_lock ____cacheline_aligned_in_smp;
    struct xen_netif_rx_front_ring rx; /* rx ring的front ring */
    int rx_ring_ref;

    /* Receive-ring batched refills. */
#define RX_MIN_TARGET 8
#define RX_DFL_MIN_TARGET 64
#define RX_MAX_TARGET min_t(int, NET_RX_RING_SIZE, 256)
    unsigned rx_min_target, rx_max_target, rx_target;  /* 用来计算每次放给后端的用来接收的rx slot个数 */
    struct sk_buff_head rx_batch;

    struct timer_list rx_refill_timer;

    struct sk_buff *rx_skbs[NET_RX_RING_SIZE];  /* 每个rx skb */
    grant_ref_t gref_rx_head;
    grant_ref_t grant_rx_ref[NET_RX_RING_SIZE];  /* 每个rx skb所对应的grant ref */

    unsigned long rx_pfn_array[NET_RX_RING_SIZE];
    struct multicall_entry rx_mcl[NET_RX_RING_SIZE+1];
    struct mmu_update rx_mmu[NET_RX_RING_SIZE];
};

对于tx场景,注意这里有个skb_entry结构的数组

    union skb_entry {
        struct sk_buff *skb;
        unsigned long link;
    } tx_skbs[NET_TX_RING_SIZE];

每一个tx_skbs的slot,要么是等待发送的skb指针,要么是已经空闲的slot index。所有空闲的slot index组成了一个list,list head是tx_skb_freelist,那么第二个则是

tx_skbs[tx_skb_freelist],以此类推

注意,这里skb_entry是一个union,意味这要么是一个pointer,要么是一个index,这是通过这个unsigned long是否大于等于一个PAGE_OFFSET来确定的,如果是那么这就是一个skb指针,否则就是一个array index


对于rx场景,只是一个struct sk_buff* rx_skbs[NET_RX_RING_SIZE] 数组,函数xennet_get_rx_skb用来获取index指定的skb,xennet_get_rx_ref则用来获取index指定的grant_ref_t

xennet_alloc_rx_buffers是用来预先分配接收数据包的skb结构和对应page的,由于受限于grant table的机制,每个skb只能对应一个frag[0].page,其性能必然相比于真正的物理驱动差很多。同时可以看出,grant_ref指向的是frag[0].page,后续还要调用一次pskb_may_pull来生成skb的包头。函数代码如下:

static void xennet_alloc_rx_buffers(struct net_device *dev)
{
    unsigned short id;
    struct netfront_info *np = netdev_priv(dev);
    struct sk_buff *skb;
    struct page *page;
    int i, batch_target, notify;
    RING_IDX req_prod = np->rx.req_prod_pvt;
    grant_ref_t ref;
    unsigned long pfn;
    void *vaddr;
    struct xen_netif_rx_request *req;

    if (unlikely(!netif_carrier_ok(dev)))
        return;

    /*
     * Allocate skbuffs greedily, even though we batch updates to the
     * receive ring. This creates a less bursty demand on the memory
     * allocator, so should reduce the chance of failed allocation requests
     * both for ourself and for other kernel subsystems.
     */
    /* 
    分配若干单个frag页的skb,用于接收 
    这里生成一批skb并append到netfront_info->rx_batch的list中,如果遇到__netdev_alloc_skb失败或者alloc_page失败,调用mod_timer延迟100ms重试(这个100ms我觉得是一个坑,其次这里的重试其实是重新调用napi_schedule,继而调用xennet_poll尝试接收)
    */
    batch_target = np->rx_target - (req_prod - np->rx.rsp_cons);
    for (i = skb_queue_len(&np->rx_batch); i < batch_target; i++) {
        skb = __netdev_alloc_skb(dev, RX_COPY_THRESHOLD + NET_IP_ALIGN,
                     GFP_ATOMIC | __GFP_NOWARN);
        if (unlikely(!skb))
            goto no_skb;

        /* Align ip header to a 16 bytes boundary */
        skb_reserve(skb, NET_IP_ALIGN);

        page = alloc_page(GFP_ATOMIC | __GFP_NOWARN);
        if (!page) {
            kfree_skb(skb);
no_skb:
            /* Any skbuffs queued for refill? Force them out. */
            if (i != 0)
                goto refill;
            /* Could not allocate any skbuffs. Try again later. */
            mod_timer(&np->rx_refill_timer,
                  jiffies + (HZ/10));
            break;
        }

        skb_shinfo(skb)->frags[0].page = page;
        skb_shinfo(skb)->nr_frags = 1;
        __skb_queue_tail(&np->rx_batch, skb);
    }

    /* Is the batch large enough to be worthwhile? */
    if (i < (np->rx_target/2)) {
        if (req_prod > np->rx.sring->req_prod)
            goto push;
        return;
    }

    /* Adjust our fill target if we risked running out of buffers. */
    if (((req_prod - np->rx.sring->rsp_prod) < (np->rx_target / 4)) &&
        ((np->rx_target *= 2) > np->rx_max_target))
        np->rx_target = np->rx_max_target;

refill:
    /*
    这段代码对于netfront_info->rx_batch里的每个skb,计算对应的ring->req_prod值继而计算出该skb在netfront_info->rx_skbs数组中的ring index,把skb插入到rx_skbs数组的相应位置。之后调用gnttab_claim_grant_reference从grant_ref_t数组中取出一个没用的ref,把ref插入到netfront_info->grant_rx_ref数组的相应位置。调用gnttab_grant_foreign_access_ref让后端可以访问这个page。
    */
    for (i = 0; ; i++) {
        skb = __skb_dequeue(&np->rx_batch);
        if (skb == NULL)
            break;

        skb->dev = dev;

        id = xennet_rxidx(req_prod + i);

        BUG_ON(np->rx_skbs[id]);
        np->rx_skbs[id] = skb;

        ref = gnttab_claim_grant_reference(&np->gref_rx_head);
        BUG_ON((signed short)ref < 0);
        np->grant_rx_ref[id] = ref;

        pfn = page_to_pfn(skb_shinfo(skb)->frags[0].page);
        vaddr = page_address(skb_shinfo(skb)->frags[0].page);

        req = RING_GET_REQUEST(&np->rx, req_prod + i);
        gnttab_grant_foreign_access_ref(ref,
                        np->xbdev->otherend_id,
                        pfn_to_mfn(pfn),
                        0);

        req->id = id;
        req->gref = ref;
    }

    wmb();      /* barrier so backend seens requests */

    /* Above is a suitable barrier to ensure backend will see requests. */
    np->rx.req_prod_pvt = req_prod + i;
push:
    /* 
    调用RING_PUSH_REQUESTS_AND_CHECK_NOTIFY检查netfront_info->xen_netif_rx_front_ring是否有新请求需要通知后端,
    如果是则调用notify_remote_via_irq通过evtchn通知后端
    */
    RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&np->rx, notify);
    if (notify)
        notify_remote_via_irq(np->netdev->irq);
}

xennet_poll是驱动提供给napi的poll调用接口,用于协议栈以poll的方式从驱动收包。首先介绍几个xennet_poll里用到的util函数,

/* xennet_get_extras,用于获取IO ring的xen_netif_rx_response接下来的xen_netif_extra_info部分 */
static int xennet_get_extras(struct netfront_info *np,
                 struct xen_netif_extra_info *extras,
                 RING_IDX rp)
{
    struct xen_netif_extra_info *extra;
    struct device *dev = &np->netdev->dev;
    RING_IDX cons = np->rx.rsp_cons;
    int err = 0;

    do {
        struct sk_buff *skb;
        grant_ref_t ref;

        /* 这种情况下,np->rx.rsp_cons + 1 == np->rx.rsp_prod,说明没有xen_netif_extra_info */
        if (unlikely(cons + 1 == rp)) {
            if (net_ratelimit())
                dev_warn(dev, "Missing extra info\n");
            err = -EBADR;
            break;
        }

        extra = (struct xen_netif_extra_info *)
            RING_GET_RESPONSE(&np->rx, ++cons);
        if (unlikely(!extra->type ||
                 extra->type >= XEN_NETIF_EXTRA_TYPE_MAX)) {
            if (net_ratelimit())
                dev_warn(dev, "Invalid extra type: %d\n",
                    extra->type);
            err = -EINVAL;
        } else {
            memcpy(&extras[extra->type - 1], extra,
                   sizeof(*extra));
        }

        /*
        从rx IO ring中基于cons取出skb和grant_ref,xennet_get_rx_skb把netfront_info->rx_skbs[i]的指针拷贝到skb处后置空,xennet_get_rx_ref把netfront_info->grant_tx_ref[i]拷贝给ref后置0。xennet_move_rx_slot函数会首先基于netfront->rx.req_prod_pvt计算出IO ring的xen_netif_rx_request的index,这个rx request slot是free的,因此下面把skb, ref赋值到netfront_info->rx_skbs,netfront_info->grant_rx_ref数组对应的index里。把index, ref赋值到xen_netif_rx_request中(该xen_netif_rx_request通过宏RING_GET_REQUEST基于netfront_info->rx.req_prod_pvt来得到)。这么做是为了释放xen_netif_extra_info的index占用的对应的netfront_info->rx_skbs,netfront_info->grant_rx_ref数组项,而把这些数组项留给xen_netif_rx_request使用。
        */
        skb = xennet_get_rx_skb(np, cons);
        ref = xennet_get_rx_ref(np, cons);
        xennet_move_rx_slot(np, skb, ref);


    } while (extra->flags & XEN_NETIF_EXTRA_FLAG_MORE);

    np->rx.rsp_cons = cons;
    return err;
}

xennet_get_responses用来生成一个skb list,代表一个完整的skb:skb头部+skb所有分片。rinfo存放着skb头部和skb分片信息

static int xennet_get_responses(struct netfront_info *np,
                struct netfront_rx_info *rinfo, RING_IDX rp,
                struct sk_buff_head *list)
{
    struct xen_netif_rx_response *rx = &rinfo->rx;
    struct xen_netif_extra_info *extras = rinfo->extras;
    struct device *dev = &np->netdev->dev;
    RING_IDX cons = np->rx.rsp_cons;
    struct sk_buff *skb = xennet_get_rx_skb(np, cons);
    grant_ref_t ref = xennet_get_rx_ref(np, cons);
    int max = MAX_SKB_FRAGS + (rx->status <= RX_COPY_THRESHOLD);
    int frags = 1;
    int err = 0;
    unsigned long ret;

    /* 如果第一个xen_netif_rx_response的flag中有NETRXF_extra_info标志,调用xennet_get_extras先处理掉后续的xen_netif_extra_info的IO ring slot */
    if (rx->flags & NETRXF_extra_info) {
        err = xennet_get_extras(np, extras, rp);
        cons = np->rx.rsp_cons;
    }

    for (;;) {
        if (unlikely(rx->status < 0 ||
                 rx->offset + rx->status > PAGE_SIZE)) {
            if (net_ratelimit())
                dev_warn(dev, "rx->offset: %x, size: %u\n",
                     rx->offset, rx->status);
            xennet_move_rx_slot(np, skb, ref);
            err = -EINVAL;
            goto next;
        }
        /*
         * This definitely indicates a bug, either in this driver or in
         * the backend driver. In future this should flag the bad
         * situation to the system controller to reboot the backed.
         */
        if (ref == GRANT_INVALID_REF) {
            if (net_ratelimit())
                dev_warn(dev, "Bad rx response id %d.\n",
                     rx->id);
            err = -EINVAL;
            goto next;
        }

        /* 解除grant ref的foreign access并且release掉这个grant ref,此时后端无法访问这个page,只能前端访问了 */
        ret = gnttab_end_foreign_access_ref(ref, 0);
        BUG_ON(!ret);

        gnttab_release_grant_reference(&np->gref_rx_head, ref);

        /* 把skb加入tmpq */
        __skb_queue_tail(list, skb);

next:

        /* 没有NETRXF_more_data标志了,说明这个skb已经被完整接收,否则继续循环接收其余的frag */
        if (!(rx->flags & NETRXF_more_data))
            break;

        if (cons + frags == rp) {
            if (net_ratelimit())
                dev_warn(dev, "Need more frags\n");
            err = -ENOENT;
            break;
        }

        /* 
        获取下一个xen_netif_rx_response的slot,以及netfront_info->rx_skbs,netfront_info->grant_rx_ref相应的slot,同时释放rx_skbs, grant_rx_ref对应slot的资源。注意这里只是用cons + frags,而没有变更cons的值,这么做是因为xennet_poll后续还会用到rsp_cons所以这里不变
        */
        rx = RING_GET_RESPONSE(&np->rx, cons + frags);
        skb = xennet_get_rx_skb(np, cons + frags);
        ref = xennet_get_rx_ref(np, cons + frags);
        frags++;
    }

    if (unlikely(frags > max)) {
        if (net_ratelimit())
            dev_warn(dev, "Too many frags\n");
        err = -E2BIG;
    }

    if (unlikely(err))
        np->rx.rsp_cons = cons + frags;

    return err;
}

xennet_fill_frags就是为了把gso发送的多个skb合并到一个skb结构里,从第二个开始的skb统统作为第一个skb的skb_frag_t保存。

static RING_IDX xennet_fill_frags(struct netfront_info *np,
                  struct sk_buff *skb,
                  struct sk_buff_head *list)
{
    struct skb_shared_info *shinfo = skb_shinfo(skb);
    int nr_frags = shinfo->nr_frags;
    RING_IDX cons = np->rx.rsp_cons;
    skb_frag_t *frag = shinfo->frags + nr_frags;
    struct sk_buff *nskb;

    while ((nskb = __skb_dequeue(list))) {
        struct xen_netif_rx_response *rx =
            RING_GET_RESPONSE(&np->rx, ++cons);

        /* 生成一个skb_frag_t,其中rx->offset为page offset,rx->status为接收到的pkt size。skb->data_len相应的增加rx->status */
        frag->page = skb_shinfo(nskb)->frags[0].page;
        frag->page_offset = rx->offset;
        frag->size = rx->status;

        skb->data_len += rx->status;

        /* skb_shinfo(nskb)->frags[0].page已经被赋值给frag->page了,所以这里直接置nr_frags为0后free掉skb */
        skb_shinfo(nskb)->nr_frags = 0;
        kfree_skb(nskb);

        /* frag指针依次增加。frag初始时为shinfo->frags + nr_frags,这时候nr_frags即第一个skb遗留下来的nr_frags值 */
        frag++;
        nr_frags++;
    }

    shinfo->nr_frags = nr_frags;
    return cons;
}

handle_incoming_queue用来真正接收rxq队列中的skb

static int handle_incoming_queue(struct net_device *dev,
                 struct sk_buff_head *rxq)
{
    int packets_dropped = 0;
    struct sk_buff *skb;

    while ((skb = __skb_dequeue(rxq)) != NULL) {
        struct page *page = NETFRONT_SKB_CB(skb)->page;
        void *vaddr = page_address(page);
        unsigned offset = NETFRONT_SKB_CB(skb)->offset;

        /* 拷贝skb报头到skb->data指向的skb线性区域 */
        memcpy(skb->data, vaddr + offset,
               skb_headlen(skb));

        /* 
        如果page不等于frags[0].page,那么直接释放即可,否则意味着这个page的其余部分被放到了frags[0].page+offset开始的地方,
        原因是大于RX_COPY_THRESHOLD的部分不会放到skb的线性存储区,所以用个skb_frag_t来保存起来
        */
        if (page != skb_shinfo(skb)->frags[0].page)
            __free_page(page);

        /* Ethernet work: Delayed to here as it peeks the header. */
        skb->protocol = eth_type_trans(skb, dev);

        if (skb->ip_summed == CHECKSUM_PARTIAL) {
            if (skb_checksum_setup(skb)) {  /* 如果skb IP checksum不通过,drop掉 */
                kfree_skb(skb);
                packets_dropped++;
                dev->stats.rx_errors++;
                continue;
            }
        }

        dev->stats.rx_packets++;
        dev->stats.rx_bytes += skb->len;

        /* Pass it up. */
        netif_receive_skb(skb);
    }

    return packets_dropped;
}

好了言归正传,开始分析xennet_poll函数,xennet_poll函数原型请参考struct napi_struct结构体: int (*poll)(struct napi_struct *, int)

static int xennet_poll(struct napi_struct *napi, int budget)
{
    struct netfront_info *np = container_of(napi, struct netfront_info, napi);
    struct net_device *dev = np->netdev;
    struct sk_buff *skb;
    struct netfront_rx_info rinfo;
    struct xen_netif_rx_response *rx = &rinfo.rx;
    struct xen_netif_extra_info *extras = rinfo.extras;
    RING_IDX i, rp;
    int work_done;
    struct sk_buff_head rxq;
    struct sk_buff_head errq;
    struct sk_buff_head tmpq;
    unsigned long flags;
    unsigned int len;
    int err;

    spin_lock(&np->rx_lock);

    skb_queue_head_init(&rxq);
    skb_queue_head_init(&errq);
    skb_queue_head_init(&tmpq);

    rp = np->rx.sring->rsp_prod;
    rmb(); /* Ensure we see queued responses up to 'rp'. */

    i = np->rx.rsp_cons;
    work_done = 0;

    /* rsp_cons一直到rsp_prod都是后端给的response,代表后端发给前端接收到的skb。如果有budget就一直接收下去 */
    while ((i != rp) && (work_done < budget)) {
        /* 这两行生成一个netfront_rx_info,包括skb首部和extra_info部分 */
        memcpy(rx, RING_GET_RESPONSE(&np->rx, i), sizeof(*rx));
        memset(extras, 0, sizeof(rinfo.extras));

        /* xennet_get_responses基于rinfo把同一个skb的不同frag部分填到tmpq所在的skb链表中,此时可以认为已经接收了一个完整的skb的内容 */
        err = xennet_get_responses(np, &rinfo, rp, &tmpq);

        if (unlikely(err)) {  /* 如果出错,那么把刚才完整的一个skb全部flush到error队列里面 */
err:
            while ((skb = __skb_dequeue(&tmpq)))
                __skb_queue_tail(&errq, skb);
            dev->stats.rx_errors++;
            i = np->rx.rsp_cons;
            continue;
        }

        skb = __skb_dequeue(&tmpq);

        if (extras[XEN_NETIF_EXTRA_TYPE_GSO - 1].type) {
            struct xen_netif_extra_info *gso;
            gso = &extras[XEN_NETIF_EXTRA_TYPE_GSO - 1];

            /* 如果skb是gso的,但是设置失败,那么进入err处理:把这个完整的skb全部flush到error队里里面 */
            if (unlikely(xennet_set_skb_gso(skb, gso))) {
                __skb_queue_head(&tmpq, skb);
                np->rx.rsp_cons += skb_queue_len(&tmpq);
                goto err;
            }
        }

        NETFRONT_SKB_CB(skb)->page = skb_shinfo(skb)->frags[0].page;
        NETFRONT_SKB_CB(skb)->offset = rx->offset;

        len = rx->status;
        if (len > RX_COPY_THRESHOLD)
            len = RX_COPY_THRESHOLD;
        skb_put(skb, len);

        /* 如果rx->status的size超过了RX_COPY_THRESHOLD,那么skb头部最多只放RX_COPY_THRESHOLD的大小,剩下的内容放到frags[0]中 */
        if (rx->status > len) {
            skb_shinfo(skb)->frags[0].page_offset =
                rx->offset + len;
            skb_shinfo(skb)->frags[0].size = rx->status - len;
            skb->data_len = rx->status - len;
        } else {
            skb_shinfo(skb)->frags[0].page = NULL;
            skb_shinfo(skb)->nr_frags = 0;
        }

        /* xennet_fill_frags把tmpq中的skb全部放到frags数组中 */
        i = xennet_fill_frags(np, skb, &tmpq);

        /*
         * Truesize approximates the size of true data plus
         * any supervisor overheads. Adding hypervisor
         * overheads has been shown to significantly reduce
         * achievable bandwidth with the default receive
         * buffer size. It is therefore not wise to account
         * for it here.
         *
         * After alloc_skb(RX_COPY_THRESHOLD), truesize is set
         * to RX_COPY_THRESHOLD + the supervisor
         * overheads. Here, we add the size of the data pulled
         * in xennet_fill_frags().
         *
         * We also adjust for any unused space in the main
         * data area by subtracting (RX_COPY_THRESHOLD -
         * len). This is especially important with drivers
         * which split incoming packets into header and data,
         * using only 66 bytes of the main data area (see the
         * e1000 driver for example.)  On such systems,
         * without this last adjustement, our achievable
         * receive throughout using the standard receive
         * buffer size was cut by 25%(!!!).
         */
        skb->truesize += skb->data_len - (RX_COPY_THRESHOLD - len);
        skb->len += skb->data_len;

        if (rx->flags & NETRXF_csum_blank)
            skb->ip_summed = CHECKSUM_PARTIAL;
        else if (rx->flags & NETRXF_data_validated)
            skb->ip_summed = CHECKSUM_UNNECESSARY;

        /* 成功接收到的完整skb,被放到rxq队列中 */
        __skb_queue_tail(&rxq, skb);

        np->rx.rsp_cons = ++i;
        work_done++;
    }

    __skb_queue_purge(&errq);

    work_done -= handle_incoming_queue(dev, &rxq);

    /* If we get a callback with very few responses, reduce fill target. */
    /* NB. Note exponential increase, linear decrease. */
    if (((np->rx.req_prod_pvt - np->rx.sring->rsp_prod) >
         ((3*np->rx_target) / 4)) &&
        (--np->rx_target < np->rx_min_target))
        np->rx_target = np->rx_min_target;

    /* 由于消耗了skb用于接收,需要重新生成一些可用skb,补充到np->rx_skbs,np->grant_rx_ref数组中,同时增加np->rx.req_prod_pvt,增加xen_netif_rx_request slot */
    xennet_alloc_rx_buffers(dev);

    if (work_done < budget) {
        int more_to_do = 0;

        local_irq_save(flags);

        RING_FINAL_CHECK_FOR_RESPONSES(&np->rx, more_to_do);
        /* 如果没有skb可接收,则关闭poll机制,等待下一次irq来触发poll,这个也是NAPI的机制 */
        if (!more_to_do)
            __napi_complete(napi);

        local_irq_restore(flags);
    }

    spin_unlock(&np->rx_lock);

    return work_done;
}

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐