diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h index 610c0bdc0c2b..88b9ae24658d 100644 --- a/net/xdp/xsk_queue.h +++ b/net/xdp/xsk_queue.h @@ -43,6 +43,48 @@ struct xsk_queue { u64 invalid_descs; }; +/* The structure of the shared state of the rings are the same as the + * ring buffer in kernel/events/ring_buffer.c. For the Rx and completion + * ring, the kernel is the producer and user space is the consumer. For + * the Tx and fill rings, the kernel is the consumer and user space is + * the producer. + * + * producer consumer + * + * if (LOAD ->consumer) { LOAD ->producer + * (A) smp_rmb() (C) + * STORE $data LOAD $data + * smp_wmb() (B) smp_mb() (D) + * STORE ->producer STORE ->consumer + * } + * + * (A) pairs with (D), and (B) pairs with (C). + * + * Starting with (B), it protects the data from being written after + * the producer pointer. If this barrier was missing, the consumer + * could observe the producer pointer being set and thus load the data + * before the producer has written the new data. The consumer would in + * this case load the old data. + * + * (C) protects the consumer from speculatively loading the data before + * the producer pointer actually has been read. If we do not have this + * barrier, some architectures could load old data as speculative loads + * are not discarded as the CPU does not know there is a dependency + * between ->producer and data. + * + * (A) is a control dependency that separates the load of ->consumer + * from the stores of $data. In case ->consumer indicates there is no + * room in the buffer to store $data we do not. So no barrier is needed. + * + * (D) protects the load of the data to be observed to happen after the + * store of the consumer pointer. If we did not have this memory + * barrier, the producer could observe the consumer pointer being set + * and overwrite the data with a new value before the consumer got the + * chance to read the old value. The consumer would thus miss reading + * the old entry and very likely read the new entry twice, once right + * now and again after circling through the ring. + */ + /* Common functions operating for both RXTX and umem queues */ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q) @@ -106,6 +148,7 @@ static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr) static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr) { if (q->cons_tail == q->cons_head) { + smp_mb(); /* D, matches A */ WRITE_ONCE(q->ring->consumer, q->cons_tail); q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); @@ -128,10 +171,11 @@ static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr) if (xskq_nb_free(q, q->prod_tail, 1) == 0) return -ENOSPC; + /* A, matches D */ ring->desc[q->prod_tail++ & q->ring_mask] = addr; /* Order producer and data */ - smp_wmb(); + smp_wmb(); /* B, matches C */ WRITE_ONCE(q->ring->producer, q->prod_tail); return 0; @@ -144,6 +188,7 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr) if (xskq_nb_free(q, q->prod_head, LAZY_UPDATE_THRESHOLD) == 0) return -ENOSPC; + /* A, matches D */ ring->desc[q->prod_head++ & q->ring_mask] = addr; return 0; } @@ -152,7 +197,7 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q, u32 nb_entries) { /* Order producer and data */ - smp_wmb(); + smp_wmb(); /* B, matches C */ q->prod_tail += nb_entries; WRITE_ONCE(q->ring->producer, q->prod_tail); @@ -163,6 +208,7 @@ static inline int xskq_reserve_addr(struct xsk_queue *q) if (xskq_nb_free(q, q->prod_head, 1) == 0) return -ENOSPC; + /* A, matches D */ q->prod_head++; return 0; } @@ -204,11 +250,12 @@ static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q, struct xdp_desc *desc) { if (q->cons_tail == q->cons_head) { + smp_mb(); /* D, matches A */ WRITE_ONCE(q->ring->consumer, q->cons_tail); q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE); /* Order consumer and data */ - smp_rmb(); + smp_rmb(); /* C, matches B */ } return xskq_validate_desc(q, desc); @@ -228,6 +275,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, if (xskq_nb_free(q, q->prod_head, 1) == 0) return -ENOSPC; + /* A, matches D */ idx = (q->prod_head++) & q->ring_mask; ring->desc[idx].addr = addr; ring->desc[idx].len = len; @@ -238,7 +286,7 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q, static inline void xskq_produce_flush_desc(struct xsk_queue *q) { /* Order producer and data */ - smp_wmb(); + smp_wmb(); /* B, matches C */ q->prod_tail = q->prod_head, WRITE_ONCE(q->ring->producer, q->prod_tail); diff --git a/tools/lib/bpf/libbpf_util.h b/tools/lib/bpf/libbpf_util.h index 81ecda0cb9c9..172b707e007b 100644 --- a/tools/lib/bpf/libbpf_util.h +++ b/tools/lib/bpf/libbpf_util.h @@ -23,6 +23,36 @@ do { \ #define pr_info(fmt, ...) __pr(LIBBPF_INFO, fmt, ##__VA_ARGS__) #define pr_debug(fmt, ...) __pr(LIBBPF_DEBUG, fmt, ##__VA_ARGS__) +/* Use these barrier functions instead of smp_[rw]mb() when they are + * used in a libbpf header file. That way they can be built into the + * application that uses libbpf. + */ +#if defined(__i386__) || defined(__x86_64__) +# define libbpf_smp_rmb() asm volatile("" : : : "memory") +# define libbpf_smp_wmb() asm volatile("" : : : "memory") +# define libbpf_smp_mb() \ + asm volatile("lock; addl $0,-4(%%rsp)" : : : "memory", "cc") +/* Hinders stores to be observed before older loads. */ +# define libbpf_smp_rwmb() asm volatile("" : : : "memory") +#elif defined(__aarch64__) +# define libbpf_smp_rmb() asm volatile("dmb ishld" : : : "memory") +# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory") +# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory") +# define libbpf_smp_rwmb() libbpf_smp_mb() +#elif defined(__arm__) +/* These are only valid for armv7 and above */ +# define libbpf_smp_rmb() asm volatile("dmb ish" : : : "memory") +# define libbpf_smp_wmb() asm volatile("dmb ishst" : : : "memory") +# define libbpf_smp_mb() asm volatile("dmb ish" : : : "memory") +# define libbpf_smp_rwmb() libbpf_smp_mb() +#else +# warning Architecture missing native barrier functions in libbpf_util.h. +# define libbpf_smp_rmb() __sync_synchronize() +# define libbpf_smp_wmb() __sync_synchronize() +# define libbpf_smp_mb() __sync_synchronize() +# define libbpf_smp_rwmb() __sync_synchronize() +#endif + #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/tools/lib/bpf/xsk.h b/tools/lib/bpf/xsk.h index a497f00e2962..82ea71a0f3ec 100644 --- a/tools/lib/bpf/xsk.h +++ b/tools/lib/bpf/xsk.h @@ -16,6 +16,7 @@ #include #include "libbpf.h" +#include "libbpf_util.h" #ifdef __cplusplus extern "C" { @@ -36,6 +37,10 @@ struct name { \ DEFINE_XSK_RING(xsk_ring_prod); DEFINE_XSK_RING(xsk_ring_cons); +/* For a detailed explanation on the memory barriers associated with the + * ring, please take a look at net/xdp/xsk_queue.h. + */ + struct xsk_umem; struct xsk_socket; @@ -105,7 +110,7 @@ static inline __u32 xsk_cons_nb_avail(struct xsk_ring_cons *r, __u32 nb) static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod, size_t nb, __u32 *idx) { - if (unlikely(xsk_prod_nb_free(prod, nb) < nb)) + if (xsk_prod_nb_free(prod, nb) < nb) return 0; *idx = prod->cached_prod; @@ -116,10 +121,10 @@ static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod, static inline void xsk_ring_prod__submit(struct xsk_ring_prod *prod, size_t nb) { - /* Make sure everything has been written to the ring before signalling - * this to the kernel. + /* Make sure everything has been written to the ring before indicating + * this to the kernel by writing the producer pointer. */ - smp_wmb(); + libbpf_smp_wmb(); *prod->producer += nb; } @@ -129,11 +134,11 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons, { size_t entries = xsk_cons_nb_avail(cons, nb); - if (likely(entries > 0)) { + if (entries > 0) { /* Make sure we do not speculatively read the data before * we have received the packet buffers from the ring. */ - smp_rmb(); + libbpf_smp_rmb(); *idx = cons->cached_cons; cons->cached_cons += entries; @@ -144,6 +149,11 @@ static inline size_t xsk_ring_cons__peek(struct xsk_ring_cons *cons, static inline void xsk_ring_cons__release(struct xsk_ring_cons *cons, size_t nb) { + /* Make sure data has been read before indicating we are done + * with the entries by updating the consumer pointer. + */ + libbpf_smp_rwmb(); + *cons->consumer += nb; }