Record of learning AF_XDP

Preface

近年eBPF技术大火,在网络方向上最火的大概就是cilium和XDP了,前者是一个前景非常广阔的CNI,后者是内核里基于eBPF的一个包处理框架。

关于eBPF或XDP,网上有很多介绍文章,这里不会去详细介绍其基础原理,只记录下学习过程中遇到的问题以及对应代码。

AF_XDP是内核基于XDP的一层socket封装,和AF_INETAF_PACKET一样,是一种协议族,可以直接通过socket相关调用去使用。

XDP相关实现的封装层次从低到高为:eBPF->XDP->AF_XDP->libbpf->APP

AF_XDP相关的代码主要分为三部分:

  • 内核:内核代码中的对应实现,对应代码路径为kernel/net/xdp
  • libbpf:eBPF接口的封装库,eBPF编程基本都依赖这个库,一些关键特性,如CO-RE都是基于libbpf实现的,对应代码路径为kernel/tools/lib/bpf/xdp.c
  • 用户态应用代码:使用AF_XDP的用户态应用代码,内核示例为kernel/samples/bpf/xsock_user.c,作为DPDK开发者,我主要关注DPDK代码中的PMD_AF_XDP相关代码,代码路径为dpdk/drivers/net/af_xdp

关于AF_XDP可以参考内核文档AF_XDP技术详解

本文的代码分析基于DPDK 19.11,内核版本4.18以及内核版本对应的libbpf。

PMD_AF_XDP

PMD_AF_XDP也是一种DPDK提供的PMD,主要用途和PMD_AF_PACKET相同,从内核驱动管理的网卡设备获取报文。由于两者都是利用内核socket提供的机制,所以实现上也比较相似,基本就是先创建对应协议族的socket,然后bind到对应的网口上,再通过send/recv等调用去进行报文收发(AF_XDP在收包时不需要调用recv)。

PMD_AF_XDP对应的rte_vdev_driver如下:

1
2
3
4
static struct rte_vdev_driver pmd_af_xdp_drv = {
.probe = rte_pmd_af_xdp_probe,
.remove = rte_pmd_af_xdp_remove,
};

Probe

所有PMD的初始化流程都一样,调用PMD对应的probe函数,这里是rte_pmd_af_xdp_probe

参数解析、合法性检查等通用流程就直接跳过了,看完整个probe函数,发现基本上就是赋值pmd相关的通用成员,dev_opsrx_pkt_bursttx_pkt_burst等。

probe流程中,值得注意的点为收发包队列数据结构初始化完之后,设置了收发包队列互为pair:

1
2
3
4
5
6
for (i = 0; i < queue_cnt; i++) {
internals->tx_queues[i].pair = &internals->rx_queues[i];
internals->rx_queues[i].pair = &internals->tx_queues[i];
internals->rx_queues[i].xsk_queue_idx = start_queue_idx + i;
internals->tx_queues[i].xsk_queue_idx = start_queue_idx + i;
}

除此之外,其他都是通用流程,这里就有一个疑惑:socket在哪创建的?

既然probe流程没有,那就接着看初始化流程,probe完成后,通常就是configurequeue setup,再接着就是start

然而configure流程和start实现都很简单,不涉及socket创建:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static int
eth_dev_start(struct rte_eth_dev *dev)
{
dev->data->dev_link.link_status = ETH_LINK_UP;

return 0;
}

static int
eth_dev_configure(struct rte_eth_dev *dev)
{
/* rx/tx must be paired */
if (dev->data->nb_rx_queues != dev->data->nb_tx_queues)
return -EINVAL;

return 0;
}

那么唯一可能就是在queue setup中。

RX queue setup

先看收包队列eth_rx_queue_setup,其中实现很简单,检查了MBUF空间和XDP帧大小是否匹配后调用了xsk_configure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static int
xsk_configure(struct pmd_internals *internals, struct pkt_rx_queue *rxq,
int ring_size)
{
......
/* Init umem. */
rxq->umem = xdp_umem_configure(internals, rxq);
if (rxq->umem == NULL)
return -ENOMEM;
txq->umem = rxq->umem;
......
/* Create xsk. */
ret = xsk_socket__create(&rxq->xsk, internals->if_name,
rxq->xsk_queue_idx, rxq->umem->umem, &rxq->rx,
&txq->tx, &cfg);
if (ret) {
AF_XDP_LOG(ERR, "Failed to create xsk socket.\n");
goto err;
}
......
}

主要流程包含两个部分,初始化umem、创建xsk(AF_XDP socket)。

UMEM

关于umem官方说明如下:

UMEM is a region of virtual contiguous memory, divided into equal-sized frames. An UMEM is associated to a netdev and a specific queue id of that netdev. It is created and configured (chunk size, headroom, start address and size) by using the XDP_UMEM_REG setsockopt system call. A UMEM is bound to a netdev and queue id, via the bind() system call.

An AF_XDP is socket linked to a single UMEM, but one UMEM can have multiple AF_XDP sockets. To share an UMEM created via one socket A, the next socket B can do this by setting the XDP_SHARED_UMEM flag in struct sockaddr_xdp member sxdp_flags, and passing the file descriptor of A to struct sockaddr_xdp member sxdp_shared_umem_fd.

The UMEM has two single-producer/single-consumer rings that are used to transfer ownership of UMEM frames between the kernel and the user-space application.

AF_XDP可以通过mmap在用户态和内核态之间传递数据,这块共享内存就是umem。在用户态应用中先初始化好umem,再作为xsk_socket__create的参数传入,在内部完成内核态内存和用户态内存的map。

需要注意的一点是,umem初始化流程中,收发包队列的umem指向的是同样的内存,这样的好处是在收发包队列之间转换时不需要拷贝:

RX and TX can share the same UMEM so that a packet does not have to be copied between RX and TX. Moreover, if a packet needs to be kept for a while due to a possible retransmit, the descriptor that points to that packet can be changed to point to another and reused right away. This again avoids copying data.

PMD_AF_XDP的实现中,每个umem对应了一个buf_ring,用于缓存具体的umem和批量操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static struct
xsk_umem_info *xdp_umem_configure(struct pmd_internals *internals,
struct pkt_rx_queue *rxq)
{
......
snprintf(ring_name, sizeof(ring_name), "af_xdp_ring_%s_%u",
internals->if_name, rxq->xsk_queue_idx);
umem->buf_ring = rte_ring_create(ring_name,
ETH_AF_XDP_NUM_BUFFERS,
rte_socket_id(),
0x0);
if (umem->buf_ring == NULL) {
AF_XDP_LOG(ERR, "Failed to create rte_ring\n");
goto err;
}

for (i = 0; i < ETH_AF_XDP_NUM_BUFFERS; i++)
rte_ring_enqueue(umem->buf_ring,
(void *)(i * ETH_AF_XDP_FRAME_SIZE));
......
}

可以看到buf_ring的元素个数为ETH_AF_XDP_NUM_BUFFERS,和umem的元素个数是相同的,且初始化为umem元素的偏移,这也是umem的保存形式,需要访问时再通过xsk_umem__get_data获取真正的umem地址。

XSK

umem初始化完成后,就接着调用xsk_socket__create创建xsk,所有xsk_socket__开头的代码,均为libbpf的封装,声明在xsk.h。使用libbpf封装的API大大减少了XDP/eBPF编码的复杂度。

从这个函数可以看出,和AF_PACKET只创建一个socket不同,对AF_XDP而言,每一个收包队列初始化流程中(实际是收发包队列共用)都会创建一个xsk,并且这里需要传入queue id,和AF_PACKET不同,这个queue id对应具体的硬件队列id,这里指定queue id应该就是将具体的XDP/eBPF程序attach到对应队列上的意思。

同时作为参数传入的不仅有rxq,也有rxq,说明收发包队列使用的是同一个xsk,并且同时初始化。

TX queue setup

发包队列的初始化非常简单,在上面也说了,收发包队列共用umem和xsk,在收包队列初始化时,已经完成了初始化,所以发包队列的初始化没有任何动作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int
eth_tx_queue_setup(struct rte_eth_dev *dev,
uint16_t tx_queue_id,
uint16_t nb_tx_desc __rte_unused,
unsigned int socket_id __rte_unused,
const struct rte_eth_txconf *tx_conf __rte_unused)
{
struct pmd_internals *internals = dev->data->dev_private;
struct pkt_tx_queue *txq;

txq = &internals->tx_queues[tx_queue_id];

dev->data->tx_queues[tx_queue_id] = txq;
return 0;
}

Queue change?

xsk的初始化流程可以看到,每个xsk对应了一个网卡的硬件队列,那么当XDP程序已经attach到对应网卡队列后,再调整网卡队列数,会发生什么呢?

ethtool调整队列数的代码ethtool_set_channels为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static noinline_for_stack int ethtool_set_channels(struct net_device *dev,
void __user *useraddr)
{
......
/* Disabling channels, query zero-copy AF_XDP sockets */
from_channel = channels.combined_count +
min(channels.rx_count, channels.tx_count);
to_channel = curr.combined_count + max(curr.rx_count, curr.tx_count);
for (i = from_channel; i < to_channel; i++)
if (xdp_get_umem_from_qid(dev, i))
return -EINVAL;

return dev->ethtool_ops->set_channels(dev, &channels);
}

可以看到针对待调整的队列,调用了xdp_get_umem_from_qid进行检查:

1
2
3
4
5
6
7
8
9
10
11
struct xdp_umem *xdp_get_umem_from_qid(struct net_device *dev,
u16 queue_id)
{
if (queue_id < dev->real_num_rx_queues)
return dev->_rx[queue_id].umem;
if (queue_id < dev->real_num_tx_queues)
return dev->_tx[queue_id].umem;

return NULL;
}
EXPORT_SYMBOL(xdp_get_umem_from_qid);

可以看到上述前置检查中,如果队列上有对应的umem则返回-EINVAL,即如果网卡队列已经attach了XDP程序,则不允许禁用当前队列。

AF_XDP的应用场景和AF_PACKET相似,都是在内核驱动正常使用的前提下,在用户态捕获网卡报文的机制,在概念理解的很多时候会进行类比,其中队列这一块就是一个明显差异了,AF_XDP的队列就是网卡的硬件队列,而AF_PACKET的队列是其自身的抽象软件队列,和网卡硬件队列并不是一一对应的关系。

XDP mode

1
2
3
4
5
6
#define XDP_FLAGS_SKB_MODE		(1U << 1)
#define XDP_FLAGS_DRV_MODE (1U << 2)
#define XDP_FLAGS_HW_MODE (1U << 3)
#define XDP_FLAGS_MODES (XDP_FLAGS_SKB_MODE | \
XDP_FLAGS_DRV_MODE | \
XDP_FLAGS_HW_MODE)

XDP支持三种模式,分别对应了不同位置的eBPF hook点,自底向上分别为:XDP_HWXDP_DRVXDP_SKB

AF_XDP初始化时,通过xsk_socket__create->xsk_setup_xdp_prog->xsk_load_xdp_prog->bpf_set_link_xdp_fd->rtnl_setlink->do_setlink->dev_change_xdp_fd设置XDP mode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* dev_change_xdp_fd - set or clear a bpf program for a device rx path
* @dev: device
* @extack: netlink extended ack
* @fd: new program fd or negative value to clear
* @flags: xdp-related flags
*
* Set or clear a bpf program for a device
*/
int dev_change_xdp_fd(struct net_device *dev, struct netlink_ext_ack *extack,
int fd, u32 flags)
{
const struct net_device_ops *ops = dev->netdev_ops;
enum bpf_netdev_command query;
struct bpf_prog *prog = NULL;
bpf_op_t bpf_op, bpf_chk;
bool offload;
int err;

ASSERT_RTNL();

rh_mark_used_feature("eBPF/xdp");

offload = flags & XDP_FLAGS_HW_MODE;
query = offload ? XDP_QUERY_PROG_HW : XDP_QUERY_PROG;

bpf_op = bpf_chk = ops->ndo_bpf;
if (!bpf_op && (flags & (XDP_FLAGS_DRV_MODE | XDP_FLAGS_HW_MODE))) {
NL_SET_ERR_MSG(extack, "underlying driver does not support XDP in native mode");
return -EOPNOTSUPP;
}
if (!bpf_op || (flags & XDP_FLAGS_SKB_MODE))
bpf_op = generic_xdp_install;
if (bpf_op == bpf_chk)
bpf_chk = generic_xdp_install;

if (fd >= 0) {
if (!offload && __dev_xdp_query(dev, bpf_chk, XDP_QUERY_PROG)) {
NL_SET_ERR_MSG(extack, "native and generic XDP can't be active at the same time");
return -EEXIST;
}
if ((flags & XDP_FLAGS_UPDATE_IF_NOEXIST) &&
__dev_xdp_query(dev, bpf_op, query)) {
NL_SET_ERR_MSG(extack, "XDP program already attached");
return -EBUSY;
}

prog = bpf_prog_get_type_dev(fd, BPF_PROG_TYPE_XDP,
bpf_op == ops->ndo_bpf);
if (IS_ERR(prog))
return PTR_ERR(prog);

if (!offload && bpf_prog_is_dev_bound(prog->aux)) {
NL_SET_ERR_MSG(extack, "using device-bound program without HW_MODE flag is not supported");
bpf_prog_put(prog);
return -EINVAL;
}
}

err = dev_xdp_install(dev, bpf_op, extack, flags, prog);
if (err < 0 && prog)
bpf_prog_put(prog);

return err;
}

如果调用xsk_socket__createusr_config没有指定xdp_flags的话,默认为0

可以看到,优先通过dev->netdev_ops->ndo_bpf判断对应的设备是否支持XDP_HWXDP_DRV,对bpf_op赋值后通过dev_xdp_install调用。

所以,对于XDP_SKB模式而言,bpf_op对应的函数为generic_xdp_install

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static int generic_xdp_install(struct net_device *dev, struct netdev_bpf *xdp)
{
struct bpf_prog *old = rtnl_dereference(dev->xdp_prog);
struct bpf_prog *new = xdp->prog;
int ret = 0;

switch (xdp->command) {
case XDP_SETUP_PROG:
rcu_assign_pointer(dev->xdp_prog, new);
if (old)
bpf_prog_put(old);

if (old && !new) {
static_branch_dec(&generic_xdp_needed_key);
} else if (new && !old) {
static_branch_inc(&generic_xdp_needed_key);
dev_disable_lro(dev);
dev_disable_gro_hw(dev);
}
break;

case XDP_QUERY_PROG:
xdp->prog_id = old ? old->aux->id : 0;
break;

default:
ret = -EINVAL;
break;
}

return ret;
}

可以看到逻辑非常简单,就是设置dev->xdp_prog,调整generic_xdp_needed_key,如果是初始化的话,再关闭设备的lrogro

PMD_AF_XDP而言,只置位了XDP_FLAGS_UPDATE_IF_NOEXIST,所以和默认行为一样,按照优先顺序使用XDP_HWXDP_DRVXDP_SKB

Where is the XDP/eBPF program?

至此,PMD_AF_XDP的初始化流程已经结束,上面也分析过了,configurestart中都没有复杂操作,start完成后,PMD就可以正常工作进行收发包了。那么问题来了,XDP/eBPF程序是在哪里被attach上去的呢?这里从xsk_socket__create看起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
int xsk_socket__create(struct xsk_socket **xsk_ptr, const char *ifname,
__u32 queue_id, struct xsk_umem *umem,
struct xsk_ring_cons *rx, struct xsk_ring_prod *tx,
const struct xsk_socket_config *usr_config)
{
......
if (!(xsk->config.libbpf_flags & XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD)) {
err = xsk_setup_xdp_prog(xsk);
if (err)
goto out_mmap_tx;
}
......
}

static int xsk_setup_xdp_prog(struct xsk_socket *xsk)
{
......
if (!prog_id) {
err = xsk_create_bpf_maps(xsk);
if (err)
return err;

err = xsk_load_xdp_prog(xsk);
if (err) {
xsk_delete_bpf_maps(xsk);
return err;
}
} else {
xsk->prog_fd = bpf_prog_get_fd_by_id(prog_id);
if (xsk->prog_fd < 0)
return -errno;
err = xsk_lookup_bpf_maps(xsk);
if (err) {
close(xsk->prog_fd);
return err;
}
}
.....
}

static int xsk_load_xdp_prog(struct xsk_socket *xsk)
{
static const int log_buf_size = 16 * 1024;
char log_buf[log_buf_size];
int err, prog_fd;

/* This is the C-program:
* SEC("xdp_sock") int xdp_sock_prog(struct xdp_md *ctx)
* {
* int ret, index = ctx->rx_queue_index;
*
* // A set entry here means that the correspnding queue_id
* // has an active AF_XDP socket bound to it.
* ret = bpf_redirect_map(&xsks_map, index, XDP_PASS);
* if (ret > 0)
* return ret;
*
* // Fallback for pre-5.3 kernels, not supporting default
* // action in the flags parameter.
* if (bpf_map_lookup_elem(&xsks_map, &index))
* return bpf_redirect_map(&xsks_map, index, 0);
* return XDP_PASS;
* }
*/
struct bpf_insn prog[] = {
/* r2 = *(u32 *)(r1 + 16) */
BPF_LDX_MEM(BPF_W, BPF_REG_2, BPF_REG_1, 16),
/* *(u32 *)(r10 - 4) = r2 */
BPF_STX_MEM(BPF_W, BPF_REG_10, BPF_REG_2, -4),
/* r1 = xskmap[] */
BPF_LD_MAP_FD(BPF_REG_1, xsk->xsks_map_fd),
/* r3 = XDP_PASS */
BPF_MOV64_IMM(BPF_REG_3, 2),
/* call bpf_redirect_map */
BPF_EMIT_CALL(BPF_FUNC_redirect_map),
/* if w0 != 0 goto pc+13 */
BPF_JMP32_IMM(BPF_JSGT, BPF_REG_0, 0, 13),
/* r2 = r10 */
BPF_MOV64_REG(BPF_REG_2, BPF_REG_10),
/* r2 += -4 */
BPF_ALU64_IMM(BPF_ADD, BPF_REG_2, -4),
/* r1 = xskmap[] */
BPF_LD_MAP_FD(BPF_REG_1, xsk->xsks_map_fd),
/* call bpf_map_lookup_elem */
BPF_EMIT_CALL(BPF_FUNC_map_lookup_elem),
/* r1 = r0 */
BPF_MOV64_REG(BPF_REG_1, BPF_REG_0),
/* r0 = XDP_PASS */
BPF_MOV64_IMM(BPF_REG_0, 2),
/* if r1 == 0 goto pc+5 */
BPF_JMP_IMM(BPF_JEQ, BPF_REG_1, 0, 5),
/* r2 = *(u32 *)(r10 - 4) */
BPF_LDX_MEM(BPF_W, BPF_REG_2, BPF_REG_10, -4),
/* r1 = xskmap[] */
BPF_LD_MAP_FD(BPF_REG_1, xsk->xsks_map_fd),
/* r3 = 0 */
BPF_MOV64_IMM(BPF_REG_3, 0),
/* call bpf_redirect_map */
BPF_EMIT_CALL(BPF_FUNC_redirect_map),
/* The jumps are to this instruction */
BPF_EXIT_INSN(),
};
size_t insns_cnt = sizeof(prog) / sizeof(struct bpf_insn);

prog_fd = bpf_load_program(BPF_PROG_TYPE_XDP, prog, insns_cnt,
"LGPL-2.1 or BSD-2-Clause", 0, log_buf,
log_buf_size);
if (prog_fd < 0) {
pr_warn("BPF log buffer:\n%s", log_buf);
return prog_fd;
}

err = bpf_set_link_xdp_fd(xsk->ifindex, prog_fd, xsk->config.xdp_flags);
if (err) {
close(prog_fd);
return err;
}

xsk->prog_fd = prog_fd;
return 0;
}

可以看到是通过调用链xsk_socket__create->xsk_setup_xdp_prog->xsk_load_xdp_prog->bpf_load_program完成的,其中对应的eBPF程序直接硬编码到了xsk_load_xdp_prog中,当然也是由于AF_XDP的功能比较简单,如果是其他功能的XDP/eBPF程序,则会采用其他方式保存源码。

注意xsk_setup_xdp_prog中根据prog_id是否合法,有不同的处理逻辑:

  • 当前设备上未attach XDP程序:函数将创建BPF_MAP并进行XDP程序的attach
  • 当前设备上已attach XDP程序:函数查找设备对应的XDP程序id以及对应的BPF_MAP,将其赋值给xsk相应成员

由此可见,XDP程序和BPF_MAP是设备级别的,同一个设备只能有一个XDP程序(不支持XDP_ATTACHED_MULTI的场景)和BPF_MAP。这点在net_device的定义中也有体现:

1
2
3
4
5
6
7
8
9
struct net_device {
......
/* RHEL: while xdp_prog is explicitly removed from the kABI
* whitelist, one semantics must be preserved: comparison of
* xdp_prog to NULL denotes whether a XDP program is loaded or not.
*/
RH_KABI_EXCLUDE(struct bpf_prog __rcu *xdp_prog)
......
}

xdp_prog只是一个指针,并非一个数组。

RX & TX

报文收发是PMD的核心功能,接下来看下收发是怎么实现的。之前已经提到了,内核态XDP和用户态应用之间通过umem进行报文收发,那具体是怎么做的呢?

XDP ring

The UMEM consists of a number of equally sized chunks. A descriptor in one of the rings references a frame by referencing its addr. The addr is simply an offset within the entire UMEM region. The user space allocates memory for this UMEM using whatever means it feels is most appropriate (malloc, mmap, huge pages, etc). This memory area is then registered with the kernel using the new setsockopt XDP_UMEM_REG. The UMEM also has two rings: the FILL ring and the COMPLETION ring. The FILL ring is used by the application to send down addr for the kernel to fill in with RX packet data. References to these frames will then appear in the RX ring once each packet has been received. The COMPLETION ring, on the other hand, contains frame addr that the kernel has transmitted completely and can now be used again by user space, for either TX or RX. Thus, the frame addrs appearing in the COMPLETION ring are addrs that were previously transmitted using the TX ring. In summary, the RX and FILL rings are used for the RX path and the TX and COMPLETION rings are used for the TX path.

实际上,除了umem外,报文收发还用到了4种不同的数据结构,分别为:

  • fill ring:每一个umem只有一个,用于向内核提供umem
  • completion ring:每一个umem只有一个,用于从内核获取umem
  • rx ring:用户态应用从rx ring收取报文,每个xsk可以有多个rx ring,应该是对应XDP_SHARED_UMEM 的场景。
  • tx ring:用户态应用向tx ring发送报文,每个xsk可以有多个tx ring,应该是对应XDP_SHARED_UMEM 的场景。

RX procedure

收包流程使用fill ringrx ring,用户态应用首先需要向内核提供umem用于接收报文,所以先作为producer向fill ring填入umem,内核XDP作为consumer从fill ring获取到可用umem后将报文填入,再作为producer将这部分umem添加到rx ring,最后用户态应用作为consumer从rx ring获取接收到的报文。

xsk初始化流程中就已经对初始化好的fill ring填入umem了,这样当XDP/eBPFattach完成后就可以立即用这部分umem收包了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#define ETH_AF_XDP_DFLT_NUM_DESCS	XSK_RING_CONS__DEFAULT_NUM_DESCS

static int
xsk_configure(struct pmd_internals *internals, struct pkt_rx_queue *rxq,
int ring_size)
{
......
int reserve_size = ETH_AF_XDP_DFLT_NUM_DESCS / 2;

......
ret = reserve_fill_queue(rxq->umem, reserve_size, fq_bufs);
if (ret) {
xsk_socket__delete(rxq->xsk);
AF_XDP_LOG(ERR, "Failed to reserve fill queue.\n");
goto err;
}
......
}

注意这里预留的个数实际上是XSK_RING_CONS__DEFAULT_NUM_DESCS / 2,默认情况下ETH_AF_XDP_NUM_BUFFERS = XSK_RING_CONS__DEFAULT_NUM_DESCS * 2 ,即这里初始化了umem总量的四分之一,填入了fill ring

af_xdp_rx_cp为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static uint16_t
af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
{
......
/* Step 1: Get recv num. */
rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
if (rcvd == 0) {
#if defined(XDP_USE_NEED_WAKEUP)
if (xsk_ring_prod__needs_wakeup(fq))
(void)poll(rxq->fds, 1, 1000);
#endif

goto out;
}

/* Step 2: Reserve some umem to fill ring if necessary. */
if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
(void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE, NULL);

for (i = 0; i < rcvd; i++) {
const struct xdp_desc *desc;
uint64_t addr;
uint32_t len;
void *pkt;

/* Step 3: Recieve every packets from rx ring. */
desc = xsk_ring_cons__rx_desc(rx, idx_rx++);
addr = desc->addr;
len = desc->len;
pkt = xsk_umem__get_data(rxq->umem->mz->addr, addr);

rte_memcpy(rte_pktmbuf_mtod(mbufs[i], void *), pkt, len);
/* Step 4: Enqueue addr to buf_ring after packet recieved. */
rte_ring_enqueue(umem->buf_ring, (void *)addr);
rte_pktmbuf_pkt_len(mbufs[i]) = len;
rte_pktmbuf_data_len(mbufs[i]) = len;
rx_bytes += len;
bufs[i] = mbufs[i];
}

/* Step 5: Update rx ring consumer index. */
xsk_ring_cons__release(rx, rcvd);
......
}

收包流程中各步骤如下:

  • Step 1:依据rx ring的producer和consumer差值,获取当前可收取的报文数量。
  • Step 2:如果fill ring中剩余umem小于阈值的话,则添加一部分进去。
  • Step 3:从rx ring获取报文对应的umem并拷贝到mbuf完成每个报文的收取。
  • Step 4:每个umem处理完后入队buf_ring供下次收包/发包使用。
  • Step 5:更新rx ring中的consumer。

TX procedure

看完收包流程后,我原以为以为内核XDP要相对地提前初始化一部分umem,然后用户态向对应umem填入报文完成发包,但事实上不是这样的。

发包流程中,使用completion ringtx ring,用户态应用首先将报文填入umem,并作为producer将umem填入tx ring,内核XDP作为consumer从tx ring获取报文并发送。发送完成后,作为producer将发送完成的umem填入completion ring,用户态应用再作为consumer从completion ring中回收已经发送完毕的umem供下一次发包/收包使用。

af_xdp_tx_cp为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static uint16_t
af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
{
......
/* Step 1: Get free umem in completion ring and enqueue to buf_ring. */
pull_umem_cq(umem, nb_pkts);

nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
nb_pkts, NULL);
if (nb_pkts == 0)
return 0;

/* Step 2: Reserve tx ring. */
if (xsk_ring_prod__reserve(&txq->tx, nb_pkts, &idx_tx) != nb_pkts) {
kick_tx(txq);
rte_ring_enqueue_bulk(umem->buf_ring, addrs, nb_pkts, NULL);
return 0;
}

for (i = 0; i < nb_pkts; i++) {
struct xdp_desc *desc;
void *pkt;

/* Step 3: Send every packet to umem from tx ring. */
desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i);
mbuf = bufs[i];
desc->len = mbuf->pkt_len;

desc->addr = (uint64_t)addrs[i];
pkt = xsk_umem__get_data(umem->mz->addr,
desc->addr);
rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), desc->len);
tx_bytes += mbuf->pkt_len;
rte_pktmbuf_free(mbuf);
}

/* Step 4: Update tx ring producer index. */
xsk_ring_prod__submit(&txq->tx, nb_pkts);

/* Step 5: Kick XDP. */
kick_tx(txq);
......
}

发包流程中各步骤如下:

  • Step 1:从completion ring中检查是否有已经发包完毕的umem,如果有,则将其入队buf_ring,以供下一次发包/收包使用。
  • Step 2:检查tx ring空闲元素数量是否满足本次发包,如果不足的话,通过kick_tx的方式让内核XDP再发包,以释放一些可用tx ring,同时将上一步出队的buf_ring中的umem再入队。
  • Step 3:将mbuf拷贝到tx ring中对应的umem完成发包。
  • Step 4:更新tx ring的producer。
  • Step 5:调用kick_tx通知内核XDP有报文待发送。之前提到了AF_XDP的发包过程还是有send调用的,原因就在这里,更新了tx ring后通过send通知内核。收包流程没有recv调用,因为收包流程是轮询的,不需要收到来自内核XDP的通知。

这里的kick_tx实际上就是调用xsk对应的sendmsg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void
kick_tx(struct pkt_tx_queue *txq)
{
struct xsk_umem_info *umem = txq->umem;

#if defined(XDP_USE_NEED_WAKEUP)
if (xsk_ring_prod__needs_wakeup(&txq->tx))
#endif
while (send(xsk_socket__fd(txq->pair->xsk), NULL,
0, MSG_DONTWAIT) < 0) {
/* some thing unexpected */
if (errno != EBUSY && errno != EAGAIN && errno != EINTR)
break;

/* pull from completion queue to leave more space */
if (errno == EAGAIN)
pull_umem_cq(umem, ETH_AF_XDP_TX_BATCH_SIZE);
}
#ifndef XDP_UMEM_UNALIGNED_CHUNK_FLAG
pull_umem_cq(umem, ETH_AF_XDP_TX_BATCH_SIZE);
#endif
}

而在xsk的代码中,以xsk_generic_xmit为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
size_t total_len)
{
......
skb = sock_alloc_send_skb(sk, len, 1, &err);
if (unlikely(!skb)) {
err = -EAGAIN;
goto out;
}

skb_put(skb, len);
addr = desc.addr;
buffer = xdp_umem_get_data(xs->umem, addr);
err = skb_store_bits(skb, 0, buffer, len);
if (unlikely(err) || xskq_reserve_addr(xs->umem->cq)) {
kfree_skb(skb);
goto out;
}

skb->dev = xs->dev;
skb->priority = sk->sk_priority;
skb->mark = sk->sk_mark;
skb_shinfo(skb)->destructor_arg = (void *)(long)addr;
skb->destructor = xsk_destruct_skb;

err = dev_direct_xmit(skb, xs->queue_id);
......
}

其逻辑就是从tx_ring中获取报文,构造skb,再调用dev_direct_xmit最终通过网卡驱动的发包函数发送出去。

可以看到整个发包流程,是和XDP程序毫无关系的,这也是为什么很多地方会说XDP只工作于收包路径。AF_XDP的发包能力是利用了umem实现的,AF_XDP将收发包功能封装整合,最终向上层提供了一种协议族。

Comparison with virtio ring

ring这块的话和virtio还是有一定的相似性的,相同的地方在于前后端都是各通过一个ring来通知对端本端的处理进度,以及通过相对的ring来获取对端的处理进度,ring中都是存放了真实描述符(umem/desc ring)的索引。不同的地方在于virtio对于收发包队列,各有一个desc ring用来存储descdesc ring对应到XDP就是umem,然而所有的收发包队列使用同一个umem

eBPF map in AF_XDP

eBPF程序可以通过BPF_MAP和用户态程序进行运行时的数据交换。BPF_MAP支持的类型定义在枚举类型bpf_map_type中。对XDP而言,每一个xsk创建时,都会创建一个BPF_MAP_TYPE_XSKMAP类型的BPF_MAP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static int xsk_create_bpf_maps(struct xsk_socket *xsk)
{
int max_queues;
int fd;

max_queues = xsk_get_max_queues(xsk);
if (max_queues < 0)
return max_queues;

fd = bpf_create_map_name(BPF_MAP_TYPE_XSKMAP, "xsks_map",
sizeof(int), sizeof(int), max_queues, 0);
if (fd < 0)
return fd;

xsk->xsks_map_fd = fd;

return 0;
}

所有的BPF_MAP名称都固定为xsks_mapkey_sizevalue_size都为sizeof(int),因为这里实际逻辑上的key为queue_id,而value为xsk->fd,其数据大小都是int。该BPF_MAP的意义在于指导XDP程序,在key对应的queue上收到报文后,将报文重定向到value对应的xsk

这里要稍微说一下BPF_MAP的创建,libbpf封装的接口bpf_create_map_name最终使用BPF_MAP_CREATE系统调用来创建对应的BPF_MAP,从上述传参可以看到,并没有设备或xsk的相关信息传入,可以想到,所有的BPF_MAP都属于一个统一的namespace,创建使用都依据最终返回的fd进行,只要能够获取fd,就可以访问。所以BPF_MAP本身不和设备、xsk或特定资源绑定,而是由创建BPF_MAP的调用者保存创建后的fd以便于后续的访问和管理。对AF_XDP而言,保存在了xsk->xsks_map_fd

注意这里所有的xsk对应的BPF_MAP的名字都相同,但是每一个xsk唯一对应一个xsks_map_fdxsk->fdprog_fd,并且都保存在xsk结构体中。这里要和XDP程序attach的流程结合起来看,一个设备对应一个XDP程序和BPF_MAP,但一个XDP程序和BPF_MAP可能对应多个xsk。当对同一个设备的不同队列创建多个xsk时,实际上是通过在BPF_MAP中增加queue_idxsk->fd的映射。

AF_XDP而言,在后续收包流程中执行XDP程序时,从BPF_MAP中查找到对应的xsk,并将报文拷贝到xsk对应的umem中。所以AF_XDPBPF_MAP实现了queue_idxsk->fd的映射,用于指导XDP程序进行重定向,而实现内核态与用户态之间进行数据交互的机制是umem

Does XDP support offload?

内核协议栈实现了很多卸载功能,如GRO/GSO,在分片场景能极大提升性能,那XDP是否支持这些卸载能力呢?

GRO/LRO with XDP?

首先看GRO,也就是接收方向,以hook点最晚的SKB MODE来说,其调用路径为:napi_gro_complete->netif_receive_skb_internal->__netif_receive_skb->__netif_receive_skb_core->do_xdp_generic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
{
......
if (static_branch_unlikely(&generic_xdp_needed_key)) {
int ret2;

preempt_disable();
ret2 = do_xdp_generic(rcu_dereference(skb->dev->xdp_prog), skb);
preempt_enable();

if (ret2 != XDP_PASS)
return NET_RX_DROP;
skb_reset_mac_len(skb);
}
......
}

从调用路径可以知道,在skb接收的早期,GRO完成之前就已经执行XDP程序了。所以,除非XDP程序不对该报文进行处理,也就是XDP程序返回了XDP_PASS,报文才可能接着进入GRO处理流程。也就是说,XDP处理的报文,是未经GRO处理的,对AF_XDP而言,就意味着不支持GRO

那有没有可能支持LRO呢?毕竟LRO不需要进入协议栈就可以完成。假设支持LRO,则xsk就需要处理可能出现的巨帧。从上面收发包流程可以知道,报文是由umem承载的,而umem最大支持PAGE_SIZE的帧大小:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int xdp_umem_reg(struct xdp_umem *umem, struct xdp_umem_reg *mr)
{
......
if (chunk_size < XDP_UMEM_MIN_CHUNK_SIZE || chunk_size > PAGE_SIZE) {
/* Strictly speaking we could support this, if:
* - huge pages, or*
* - using an IOMMU, or
* - making sure the memory area is consecutive
* but for now, we simply say "computer says no".
*/
return -EINVAL;
}
......
}

而普通页大小在X86架构下通常为4K,也就是通常XDP只支持最大4K的帧大小,这个大小不足以用于LRO

关于LRO,也可以参考下virtio_net中的virtnet_xdp_set

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog,
struct netlink_ext_ack *extack)
{
......
if (!virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_GUEST_OFFLOADS)
&& (virtio_has_feature(vi->vdev, VIRTIO_NET_F_GUEST_TSO4) ||
virtio_has_feature(vi->vdev, VIRTIO_NET_F_GUEST_TSO6) ||
virtio_has_feature(vi->vdev, VIRTIO_NET_F_GUEST_ECN) ||
virtio_has_feature(vi->vdev, VIRTIO_NET_F_GUEST_UFO))) {
NL_SET_ERR_MSG_MOD(extack, "Can't set XDP while host is implementing LRO, disable LRO first");
return -EOPNOTSUPP;
}
......
}

可以看到,在virtio_net开启了LRO时,也是不支持attach的,原因也是上面提的,目前XDP尚不支持LRO,后续理论上可以支持。

GSO/TSO with XDP?

再看发包方向GSO,在上面发包流程的分析中已经描述了AF_XDP的发包原理:直接将从umem获取到的原始报文,调用dev_direct_xmit直接通过网卡驱动的发包函数发出,所以很明显,XDP也不支持GSO

那是否支持TSO呢?发包流程中调用驱动的发包函数,报文是由skb承载的,而构造skb的流程中,并没有设置skb->gso_size等卸载相关的字段(参考xsk_generic_xmit),所以XDP目前尚不支持TSO,后续理论上可以支持。

libxdp & libbpf

上面已经介绍过PMD_AF_XDP使用了libbpf^1提供的xsk相关接口了,但上述代码基于4.18版本的内核,在这之后libbpf进行了很多改动,其中包括使用一个新的库libxdp^2来维护XDP相关的接口。

XDP应用对不同版本的libbpf可以参考PMD_AF_XDP的处理,根据不同版本引用不同的头文件:

1
2
3
4
5
#ifdef RTE_NET_AF_XDP_LIBXDP
#include <xdp/xsk.h>
#else
#include <bpf/xsk.h>
#endif
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2021-2023 Martzki
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信