Introduce of librte_ikcp

Overview

KCP^1是一个纯算法实现的ARQ协议,而librte_ikcp^2是我实现的一个基于KCPDPDK接口风格的封装库。

开发这个封装库的动机源于产品需求。我们的平台是基于DPDK实现的用户态转发面,是不包含协议栈的。一个需求需要确保数据跨节点传输的可靠性,原始实现中使用了UDP进行数据传输,但无法确保其可靠性。

为了解决这个问题,研究了下基于UDP的可靠传输实现,发现大多数的实现都是基于Linux协议栈,在socket层上进行的封装,如UDT^3,这类实现在我们的应用场景不是很适用。后面发现了KCP,它自身实现了ARQ,对报文/消息的输入输出,提供了回调接口,其本身只关注自身状态维护,不关心数据收发的具体实现,可以是UDP socket,也可以是基于DPDK收发包接口的更上一层封装。很显然,这样的实现方式,利于在DPDK应用中实现UDP的可靠传输。

How to use KCP

KCP的使用分为初始化和具体的使用部分。

KCP initialization

对于KCP而言,首先需要调用ikcp_create创建ikcp实例:

1
2
3
4
// create a new kcp control object, 'conv' must equal in two endpoint
// from the same connection. 'user' will be passed to the output callback
// output callback can be setup like this: 'kcp->output = my_udp_output'
ikcpcb* ikcp_create(IUINT32 conv, void *user);

其中,conv是会话的唯一标识符,对同一个会话,通信的双方需要使用相同的conv进行通信;user是传递给发送回调的参数。

然后再设置底层发送函数:

1
2
3
// set output callback, which will be invoked by kcp
void ikcp_setoutput(ikcpcb *kcp, int (*output)(const char *buf, int len,
ikcpcb *kcp, void *user));

每一个ikcp实例对应了一个底层发送函数,在KCP的状态机维护中,在需要时,会调用底层的发送函数发出。

KCP FSM update

上述初始化完成后,即可周期性调用ikcp_update来维护KCP自身状态机:

1
2
3
4
// update state (call it repeatedly, every 10ms-100ms), or you can ask 
// ikcp_check when to call it again (without ikcp_input/_send calling).
// 'current' - current timestamp in millisec.
void ikcp_update(ikcpcb *kcp, IUINT32 current);

ikcp_update会根据KCP自身信息,进行收发队列的维护,实现序号确认、丢失重传以及流控等功能。

KCP TX

KCP给上层应用提供了发送接口ikcp_send

1
2
// user/upper level send, returns below zero for error
int ikcp_send(ikcpcb *kcp, const char *buffer, int len);

ikcp_send的实际作用是将上层应用提供的数据存入KCP内部的发送队列,并不会直接调用上述设置好的底层的发送函数。在KCP自身状态机维护的过程中,在合适的时机KCP会从发送队列发送数据。

KCP RX

KCP提供了底层的输入接口ikcp_input

1
2
// when you received a low level packet (eg. UDP packet), call it
int ikcp_input(ikcpcb *kcp, const char *data, long size);

当接收到KCP报文时,调用ikcp_input会解析其协议字段,如果是数据报文,则会将解析完成后的数据存入KCP内部的接收队列。

对于上层应用而言,需要调用ickp_recvKCP内部的接收队列获取数据:

1
2
// user/upper level recv: returns size, returns below zero for EAGAIN
int ikcp_recv(ikcpcb *kcp, char *buffer, int len);

librte_ikcp

下面将根据KCP的使用方式,说明librte_ikcp的设计流程。

Initialization and data structure

首先看KCP的初始化流程,最终返回了ikcp实例,所以librte_ikcp的封装也需要保存ikcp实例。除此之外,要基于DPDK的接口实现数据收发功能,需要指定发包接口和队列号,同时还需要IP地址,以及下一跳的MAC地址。

综合以上需求,rte_ikcp定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* The RTE ikcp config structure.
*/
struct rte_ikcp_config {
uint16_t port_id; /**< Port id. */
uint16_t txq_id; /**< TX queue id of the port, used when RTE_IKCP_MQ is disabled. */
union {
uint32_t ipv4;
uint8_t ipv6[16];
} src_ip, dst_ip; /**< Addresses to send and receive. */
uint16_t src_port; /**< UDP port when sending. */
uint16_t dst_port; /**< UDP port when receiving. */
struct rte_ether_addr self_mac; /**< MAC address of the port. */
struct rte_ether_addr next_hop_mac; /**< MAC address of the next hop. */
struct rte_mempool *mempool; /**< The mempool to allocate mbufs. */
uint32_t flags; /**< Config flags. */
};

/**
* The RTE ikcp structure.
*/
struct rte_ikcp {
ikcpcb kcp; /**< Internal kcp object. */
struct rte_ikcp_config config; /**< Internal config. */
struct rte_ring *rx_queue, *tx_queue; /**< RX/TX queues when RTE_IKCP_MQ is enabled. */
};

其中除了上面提到的必须的配置外,rte_ikcp->rx_queuerte_ikcp->tx_queue是用于多队列模式的收发队列,在多队列章节会详细说明;rte_ikcp->config->mempool是用于底层发送函数申请mbuf用的mempoolrte_ikcp->config.flagsrte_ikcp相关的配置flag,取值如下:

1
2
3
4
5
6
7
#define RTE_IKCP_USED               (1 << 0) /**< The ikcp structure is used or not. */
#define RTE_IKCP_L3_TYPE_IPV4 (1 << 1) /**< The kcp is using IPv4. */
#define RTE_IKCP_L3_TYPE_IPV6 (1 << 2) /**< The kcp is using IPv6. */
#define RTE_IKCP_ENABLE_PRIVATE_LOG (1 << 3) /**< Enable kcp internal log. */
#define RTE_IKCP_MQ (1 << 4) /**< Send/recv using multi queues. */

#define RTE_IKCP_L3_TYPE_MASK (RTE_IKCP_L3_TYPE_IPV4 | RTE_IKCP_L3_TYPE_IPV6)

对于各个flag:

  • RTE_IKCP_USED:所有的rte_ikcp实例都存储在一个全局的静态数组内,RTE_IKCP_USED用于描述当前数组成员是否已经被使用;
  • RTE_IKCP_L3_TYPE_IPV4:说明当前rte_ikcp实例中的IP地址是IPv4;
  • RTE_IKCP_L3_TYPE_IPV6:说明当前rte_ikcp实例中的IP地址是IPv6;
  • RTE_IKCP_ENABLE_PRIVATE_LOG:启用KCP的内部日志输出;
  • RTE_IKCP_MQ:当前rte_ikcp实例为多队列模式。

rte_ikcp的实例初始化通过调用rte_ikcp_create进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* Create an ikcp object.
*
* @param config
* Config structure of the ikcp object.
* @param conv
* Conversation id between kcp endpoints and it must be unique for a specified conversation.
* @param user
* User specified pointer and will be used in kcp output.
* @return
* - The pointer of the ikcp object.
* - NULL on failure.
*/
struct rte_ikcp *rte_ikcp_create(struct rte_ikcp_config config, uint32_t conv, void *user);

rte_ikcp_create中已经为ikcp实例设置好了底层发送函数,所以无需额外调用ikcp_set_output

而底层发送函数的实现也很简单,从rte_ikcp->config->mempool中申请mbuf,再依据rte_ikcp->config中IP、MAC相关字段,设置mbuf的头部信息,最后发出。

FSM update

KCP的有限状态机进行维护需要调用ikcp_update,需要传入时钟参数,单位为ms,在rte_ikcp的封装中,用tsc寄存器的值完成了时钟计算,上层应用直接调用rte_ikcp_update即可,无需传入当前时钟:

1
2
3
4
void rte_ikcp_update(struct rte_ikcp *ikcp)
{
ikcp_update(&ikcp->kcp, (uint32_t)((double)rte_rdtsc() / rte_get_tsc_hz() * 1000));
}

TX

发送流程没有任何改动,封装了一层接口rte_ikcp_send

1
2
3
4
int32_t rte_ikcp_send(struct rte_ikcp *ikcp, const char* data, int32_t len)
{
return ikcp_send(&ikcp->kcp, data, len);
}

实际上还是将数据先存入ikcp的内部发送队列,由ikcp的有限状态机维护调用底层发送函数发出。

RX

接收流程中,首先调用底层接收函数rte_ikcp_input完成原始数据的输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* Enqueue the data of the mbuf to the kcp internal rx queue.
*
* The mbuf is checked whether belongs to the rte_ikcp or not and freed
* after enqueue sucessfully.
*
* @param ikcp
* The pointer of an ikcp object.
* @param mbuf
* The mbuf contains data to be input to the kcp.
* @return
* - 0 on sucess.
* - Less than 0 on failure.
*/
int32_t rte_ikcp_input(struct rte_ikcp *ikcp, struct rte_mbuf *mbuf);

其中会用mbuf信息和rte_ikcp->config中的信息进行比较,丢弃不属于当前rte_ikcp的报文,最后调用ikcp_input将数据传入ikcp,最后将释放mbuf

而上层应用获取数据的方式和ikcp一致,从ikcp内部队列获取,只做了一层封装rte_ikcp_recv

1
2
3
4
int32_t rte_ikcp_recv(struct rte_ikcp *ikcp, char* data, int32_t len)
{
return ikcp_recv(&ikcp->kcp, data, len);
}

Multi-queue support

ikcp提供的接口本身不是线程安全的,而对DPDK应用而言,通常又会对性能有一定要求,所以librte_ikcp实现了多队列支持,要使用多队列功能,只需要在调用rte_ikcp_create时,为参数rte_ikcp_config->flags置位RTE_IKCP_MQ即可。

首先要确定的一点,由于ikcp接口不是线程安全的,所以对ikcp的有限状态机维护,还是只能由一个线程去完成,能够实现多队列支持的,只有数据收发过程。

简单来说,单队列收发模式下,收包时外层直接调用rte_ikcp_input,发包时将报文从rte_ikcp->config中指定的接口和队列发出;多队列收发模式下,发送和接收都由队列实现:

1
2
3
4
5
6
7
8
/**
* The RTE ikcp structure.
*/
struct rte_ikcp {
ikcpcb kcp; /**< Internal kcp object. */
struct rte_ikcp_config config; /**< Internal config. */
struct rte_ring *rx_queue, *tx_queue; /**< RX/TX queues when RTE_IKCP_MQ is enabled. */
};

收包时,每个收包队列将收取到的报文入队rte_ikcp->rx_queuerte_ikcp_input_bulk再调用rte_ikcp_input完成队列元素的逐个入队:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
int32_t rte_ikcp_input_bulk(struct rte_ikcp *ikcp) {
int nb_rx, i, count;
struct rte_mbuf *mbufs[IKCP_INPUT_BURST_SIZE];

if (!(ikcp->config.flags & RTE_IKCP_MQ)) {
RTE_LOG(DEBUG, IKCP, "Multi queue is not enabled.\n");
return -1;
}

count = RTE_MIN(rte_ring_count(ikcp->rx_queue), IKCP_INPUT_BURST_SIZE);
if (!count) {
return 0;
}

nb_rx = rte_ring_sc_dequeue_bulk(ikcp->rx_queue, (void **)mbufs, count, NULL);
if (!nb_rx) {
return 0;
}

for (i = 0; i < nb_rx; ++i) {
rte_ikcp_input(ikcp, mbufs[i]);
}

return 0;
}

在多队列收包时,上层调用除了要完成原始报文入队外,还应周期性调用rte_ikcp_input_bulk完成真正的报文入队。

发包时,librte_ikcp会将需要发出的报文入队rte_ikcp->tx_queue

1
2
3
4
5
6
7
8
9
10
11
12
13
static inline int32_t ikcp_output(const char *buf, int32_t len, ikcpcb *kcp, void *user)
{
.......
if (ikcp->config.flags & RTE_IKCP_MQ) {
nb_tx = (int32_t)rte_ring_sp_enqueue(ikcp->tx_queue, head) ? 0 : 1;
}
else {
nb_tx = (int32_t)rte_eth_tx_burst(port_id, txq_id, &head, 1);
rte_pktmbuf_free(head);
}

return nb_tx;
}

在多队列发包时,上层调用也需要周期性地轮询rte_ikcp->tx_queue,将可用报文发出。

Other KCP APIs

rte_ikcp对一些KCP提供的API也进行了封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void rte_ikcp_flush(struct rte_ikcp *ikcp)
{
ikcp_flush(&ikcp->kcp);
}

void rte_ikcp_nodelay(struct rte_ikcp *ikcp, int32_t nodelay,
int32_t interval, int32_t resend, int32_t nc)
{
RTE_SET_USED(ikcp_nodelay(&ikcp->kcp, nodelay, interval, resend, nc));
}

int32_t rte_ikcp_peeksize(struct rte_ikcp *ikcp)
{
return ikcp_peeksize(&ikcp->kcp);
}

int32_t rte_ikcp_setmtu(struct rte_ikcp *ikcp, int32_t mtu)
{
return ikcp_setmtu(&ikcp->kcp, mtu);
}

int32_t rte_ikcp_wndsize(struct rte_ikcp *ikcp, int32_t sndwnd, int32_t rcvwnd)
{
return ikcp_wndsize(&ikcp->kcp, sndwnd, rcvwnd);
}

int32_t rte_ikcp_waitsnd(struct rte_ikcp *ikcp)
{
return ikcp_waitsnd(&ikcp->kcp);
}

Example

除了librte_ikcp之外,还提供了一个示例程序dpdk-ikcp,用于简单展示librte_ikcp功能。

其包含两种可用的KCP后端,分别为socket后端和DPDK后端:

1
2
3
4
5
[root@localhost examples]# ./dpdk-ikcp -h
Usage: ./dpdk-ikcp [options]
options:
--socket-kcp: Use kcp socket backend
--dpdk-kcp: Use kcp pmd backend

其中,socket后端使用UDP socket实现KCP的数据收发,而DPDK后端使用librte_ikcp实现。

使用不同的后端类型,对应了不同的参数选项:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# dpdk backend.
[root@localhost examples]# ./dpdk-ikcp --dpdk-kcp -- -h
./dpdk-ikcp --dpdk-kcp [EAL options] -- [--debug] [-4|-6] --device PCI --src SRC --dst DST --next-hop-mac M:M:M:M:M:M
-4: Use IPv4 (default)
-6: Use IPv6
-v: Display verbose send/recv details
--device: Assign the PCI address of device
--src: Assign the source IP
--dst: Assign the destination IP
--next-hop-mac: Assign the MAC address of the next hop
--multi-queue: Use multi queues to transmit
--debug: Enable kcp private log

# socket backend.
[root@localhost examples]# ./dpdk-ikcp --socket-kcp -h
./dpdk-ikcp --socket-kcp [-4|-6] --dst DST
-4: IP address is IPv4 (default)
-6: IP address is IPv6
-v: Display verbose send/recv details
--src: Assign the source IP
--dst: Assign the destination IP

不论哪种后端,都支持sendstopexit三种命令,使用该示例程序可以方便地测试librte_ikcp和常用的基于UDP socket的KCP的通信功能:

1
2
3
4
5
6
[root@localhost dpdk-kcp]# ./dpdk-ikcp --socket-kcp --dst 1.1.1.1 --src 1.1.1.2 -v
cli > timestamp: 1550194806 recv 0 pkts in last 1000 milliseconds.
timestamp: 1550195807 recv 0 pkts in last 1000 milliseconds.
timestamp: 1550196808 recv 0 pkts in last 1000 milliseconds.

cli > support commands: send stop exit
  • Copyright: Copyright is owned by the author. For commercial reprints, please contact the author for authorization. For non-commercial reprints, please indicate the source.
  • Copyrights © 2021-2023 Martzki
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信