DPDK  20.05.0-rc0
rte_ring_elem.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2019 Arm Limited
4  * Copyright (c) 2010-2017 Intel Corporation
5  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
6  * All rights reserved.
7  * Derived from FreeBSD's bufring.h
8  * Used as BSD-3 Licensed with permission from Kip Macy.
9  */
10 
11 #ifndef _RTE_RING_ELEM_H_
12 #define _RTE_RING_ELEM_H_
13 
19 #ifdef __cplusplus
20 extern "C" {
21 #endif
22 
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <string.h>
26 #include <sys/queue.h>
27 #include <errno.h>
28 #include <rte_common.h>
29 #include <rte_config.h>
30 #include <rte_memory.h>
31 #include <rte_lcore.h>
32 #include <rte_atomic.h>
33 #include <rte_branch_prediction.h>
34 #include <rte_memzone.h>
35 #include <rte_pause.h>
36 
37 #include "rte_ring.h"
38 
60 __rte_experimental
61 ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count);
62 
108 __rte_experimental
109 struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize,
110  unsigned int count, int socket_id, unsigned int flags);
111 
112 static __rte_always_inline void
113 __rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size,
114  uint32_t idx, const void *obj_table, uint32_t n)
115 {
116  unsigned int i;
117  uint32_t *ring = (uint32_t *)&r[1];
118  const uint32_t *obj = (const uint32_t *)obj_table;
119  if (likely(idx + n < size)) {
120  for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
121  ring[idx] = obj[i];
122  ring[idx + 1] = obj[i + 1];
123  ring[idx + 2] = obj[i + 2];
124  ring[idx + 3] = obj[i + 3];
125  ring[idx + 4] = obj[i + 4];
126  ring[idx + 5] = obj[i + 5];
127  ring[idx + 6] = obj[i + 6];
128  ring[idx + 7] = obj[i + 7];
129  }
130  switch (n & 0x7) {
131  case 7:
132  ring[idx++] = obj[i++]; /* fallthrough */
133  case 6:
134  ring[idx++] = obj[i++]; /* fallthrough */
135  case 5:
136  ring[idx++] = obj[i++]; /* fallthrough */
137  case 4:
138  ring[idx++] = obj[i++]; /* fallthrough */
139  case 3:
140  ring[idx++] = obj[i++]; /* fallthrough */
141  case 2:
142  ring[idx++] = obj[i++]; /* fallthrough */
143  case 1:
144  ring[idx++] = obj[i++]; /* fallthrough */
145  }
146  } else {
147  for (i = 0; idx < size; i++, idx++)
148  ring[idx] = obj[i];
149  /* Start at the beginning */
150  for (idx = 0; i < n; i++, idx++)
151  ring[idx] = obj[i];
152  }
153 }
154 
155 static __rte_always_inline void
156 __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head,
157  const void *obj_table, uint32_t n)
158 {
159  unsigned int i;
160  const uint32_t size = r->size;
161  uint32_t idx = prod_head & r->mask;
162  uint64_t *ring = (uint64_t *)&r[1];
163  const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table;
164  if (likely(idx + n < size)) {
165  for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
166  ring[idx] = obj[i];
167  ring[idx + 1] = obj[i + 1];
168  ring[idx + 2] = obj[i + 2];
169  ring[idx + 3] = obj[i + 3];
170  }
171  switch (n & 0x3) {
172  case 3:
173  ring[idx++] = obj[i++]; /* fallthrough */
174  case 2:
175  ring[idx++] = obj[i++]; /* fallthrough */
176  case 1:
177  ring[idx++] = obj[i++];
178  }
179  } else {
180  for (i = 0; idx < size; i++, idx++)
181  ring[idx] = obj[i];
182  /* Start at the beginning */
183  for (idx = 0; i < n; i++, idx++)
184  ring[idx] = obj[i];
185  }
186 }
187 
188 static __rte_always_inline void
189 __rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head,
190  const void *obj_table, uint32_t n)
191 {
192  unsigned int i;
193  const uint32_t size = r->size;
194  uint32_t idx = prod_head & r->mask;
195  rte_int128_t *ring = (rte_int128_t *)&r[1];
196  const rte_int128_t *obj = (const rte_int128_t *)obj_table;
197  if (likely(idx + n < size)) {
198  for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
199  memcpy((void *)(ring + idx),
200  (const void *)(obj + i), 32);
201  switch (n & 0x1) {
202  case 1:
203  memcpy((void *)(ring + idx),
204  (const void *)(obj + i), 16);
205  }
206  } else {
207  for (i = 0; idx < size; i++, idx++)
208  memcpy((void *)(ring + idx),
209  (const void *)(obj + i), 16);
210  /* Start at the beginning */
211  for (idx = 0; i < n; i++, idx++)
212  memcpy((void *)(ring + idx),
213  (const void *)(obj + i), 16);
214  }
215 }
216 
217 /* the actual enqueue of elements on the ring.
218  * Placed here since identical code needed in both
219  * single and multi producer enqueue functions.
220  */
221 static __rte_always_inline void
222 __rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head,
223  const void *obj_table, uint32_t esize, uint32_t num)
224 {
225  /* 8B and 16B copies implemented individually to retain
226  * the current performance.
227  */
228  if (esize == 8)
229  __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num);
230  else if (esize == 16)
231  __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num);
232  else {
233  uint32_t idx, scale, nr_idx, nr_num, nr_size;
234 
235  /* Normalize to uint32_t */
236  scale = esize / sizeof(uint32_t);
237  nr_num = num * scale;
238  idx = prod_head & r->mask;
239  nr_idx = idx * scale;
240  nr_size = r->size * scale;
241  __rte_ring_enqueue_elems_32(r, nr_size, nr_idx,
242  obj_table, nr_num);
243  }
244 }
245 
246 static __rte_always_inline void
247 __rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size,
248  uint32_t idx, void *obj_table, uint32_t n)
249 {
250  unsigned int i;
251  uint32_t *ring = (uint32_t *)&r[1];
252  uint32_t *obj = (uint32_t *)obj_table;
253  if (likely(idx + n < size)) {
254  for (i = 0; i < (n & ~0x7); i += 8, idx += 8) {
255  obj[i] = ring[idx];
256  obj[i + 1] = ring[idx + 1];
257  obj[i + 2] = ring[idx + 2];
258  obj[i + 3] = ring[idx + 3];
259  obj[i + 4] = ring[idx + 4];
260  obj[i + 5] = ring[idx + 5];
261  obj[i + 6] = ring[idx + 6];
262  obj[i + 7] = ring[idx + 7];
263  }
264  switch (n & 0x7) {
265  case 7:
266  obj[i++] = ring[idx++]; /* fallthrough */
267  case 6:
268  obj[i++] = ring[idx++]; /* fallthrough */
269  case 5:
270  obj[i++] = ring[idx++]; /* fallthrough */
271  case 4:
272  obj[i++] = ring[idx++]; /* fallthrough */
273  case 3:
274  obj[i++] = ring[idx++]; /* fallthrough */
275  case 2:
276  obj[i++] = ring[idx++]; /* fallthrough */
277  case 1:
278  obj[i++] = ring[idx++]; /* fallthrough */
279  }
280  } else {
281  for (i = 0; idx < size; i++, idx++)
282  obj[i] = ring[idx];
283  /* Start at the beginning */
284  for (idx = 0; i < n; i++, idx++)
285  obj[i] = ring[idx];
286  }
287 }
288 
289 static __rte_always_inline void
290 __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head,
291  void *obj_table, uint32_t n)
292 {
293  unsigned int i;
294  const uint32_t size = r->size;
295  uint32_t idx = prod_head & r->mask;
296  uint64_t *ring = (uint64_t *)&r[1];
297  unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table;
298  if (likely(idx + n < size)) {
299  for (i = 0; i < (n & ~0x3); i += 4, idx += 4) {
300  obj[i] = ring[idx];
301  obj[i + 1] = ring[idx + 1];
302  obj[i + 2] = ring[idx + 2];
303  obj[i + 3] = ring[idx + 3];
304  }
305  switch (n & 0x3) {
306  case 3:
307  obj[i++] = ring[idx++]; /* fallthrough */
308  case 2:
309  obj[i++] = ring[idx++]; /* fallthrough */
310  case 1:
311  obj[i++] = ring[idx++]; /* fallthrough */
312  }
313  } else {
314  for (i = 0; idx < size; i++, idx++)
315  obj[i] = ring[idx];
316  /* Start at the beginning */
317  for (idx = 0; i < n; i++, idx++)
318  obj[i] = ring[idx];
319  }
320 }
321 
322 static __rte_always_inline void
323 __rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head,
324  void *obj_table, uint32_t n)
325 {
326  unsigned int i;
327  const uint32_t size = r->size;
328  uint32_t idx = prod_head & r->mask;
329  rte_int128_t *ring = (rte_int128_t *)&r[1];
330  rte_int128_t *obj = (rte_int128_t *)obj_table;
331  if (likely(idx + n < size)) {
332  for (i = 0; i < (n & ~0x1); i += 2, idx += 2)
333  memcpy((void *)(obj + i), (void *)(ring + idx), 32);
334  switch (n & 0x1) {
335  case 1:
336  memcpy((void *)(obj + i), (void *)(ring + idx), 16);
337  }
338  } else {
339  for (i = 0; idx < size; i++, idx++)
340  memcpy((void *)(obj + i), (void *)(ring + idx), 16);
341  /* Start at the beginning */
342  for (idx = 0; i < n; i++, idx++)
343  memcpy((void *)(obj + i), (void *)(ring + idx), 16);
344  }
345 }
346 
347 /* the actual dequeue of elements from the ring.
348  * Placed here since identical code needed in both
349  * single and multi producer enqueue functions.
350  */
351 static __rte_always_inline void
352 __rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head,
353  void *obj_table, uint32_t esize, uint32_t num)
354 {
355  /* 8B and 16B copies implemented individually to retain
356  * the current performance.
357  */
358  if (esize == 8)
359  __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num);
360  else if (esize == 16)
361  __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num);
362  else {
363  uint32_t idx, scale, nr_idx, nr_num, nr_size;
364 
365  /* Normalize to uint32_t */
366  scale = esize / sizeof(uint32_t);
367  nr_num = num * scale;
368  idx = cons_head & r->mask;
369  nr_idx = idx * scale;
370  nr_size = r->size * scale;
371  __rte_ring_dequeue_elems_32(r, nr_size, nr_idx,
372  obj_table, nr_num);
373  }
374 }
375 
376 /* Between load and load. there might be cpu reorder in weak model
377  * (powerpc/arm).
378  * There are 2 choices for the users
379  * 1.use rmb() memory barrier
380  * 2.use one-direction load_acquire/store_release barrier,defined by
381  * CONFIG_RTE_USE_C11_MEM_MODEL=y
382  * It depends on performance test results.
383  * By default, move common functions to rte_ring_generic.h
384  */
385 #ifdef RTE_USE_C11_MEM_MODEL
386 #include "rte_ring_c11_mem.h"
387 #else
388 #include "rte_ring_generic.h"
389 #endif
390 
415 static __rte_always_inline unsigned int
416 __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table,
417  unsigned int esize, unsigned int n,
418  enum rte_ring_queue_behavior behavior, unsigned int is_sp,
419  unsigned int *free_space)
420 {
421  uint32_t prod_head, prod_next;
422  uint32_t free_entries;
423 
424  n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
425  &prod_head, &prod_next, &free_entries);
426  if (n == 0)
427  goto end;
428 
429  __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n);
430 
431  update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
432 end:
433  if (free_space != NULL)
434  *free_space = free_entries - n;
435  return n;
436 }
437 
462 static __rte_always_inline unsigned int
463 __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table,
464  unsigned int esize, unsigned int n,
465  enum rte_ring_queue_behavior behavior, unsigned int is_sc,
466  unsigned int *available)
467 {
468  uint32_t cons_head, cons_next;
469  uint32_t entries;
470 
471  n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
472  &cons_head, &cons_next, &entries);
473  if (n == 0)
474  goto end;
475 
476  __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n);
477 
478  update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
479 
480 end:
481  if (available != NULL)
482  *available = entries - n;
483  return n;
484 }
485 
508 static __rte_always_inline unsigned int
509 rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
510  unsigned int esize, unsigned int n, unsigned int *free_space)
511 {
512  return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
513  RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
514 }
515 
537 static __rte_always_inline unsigned int
538 rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
539  unsigned int esize, unsigned int n, unsigned int *free_space)
540 {
541  return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
542  RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
543 }
544 
568 static __rte_always_inline unsigned int
569 rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
570  unsigned int esize, unsigned int n, unsigned int *free_space)
571 {
572  return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
573  RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
574 }
575 
594 static __rte_always_inline int
595 rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
596 {
597  return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
598  -ENOBUFS;
599 }
600 
618 static __rte_always_inline int
619 rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
620 {
621  return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
622  -ENOBUFS;
623 }
624 
644 static __rte_always_inline int
645 rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
646 {
647  return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 :
648  -ENOBUFS;
649 }
650 
673 static __rte_always_inline unsigned int
674 rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
675  unsigned int esize, unsigned int n, unsigned int *available)
676 {
677  return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
678  RTE_RING_QUEUE_FIXED, __IS_MC, available);
679 }
680 
701 static __rte_always_inline unsigned int
702 rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
703  unsigned int esize, unsigned int n, unsigned int *available)
704 {
705  return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
706  RTE_RING_QUEUE_FIXED, __IS_SC, available);
707 }
708 
732 static __rte_always_inline unsigned int
733 rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
734  unsigned int esize, unsigned int n, unsigned int *available)
735 {
736  return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
737  RTE_RING_QUEUE_FIXED, r->cons.single, available);
738 }
739 
759 static __rte_always_inline int
760 rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p,
761  unsigned int esize)
762 {
763  return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
764  -ENOENT;
765 }
766 
783 static __rte_always_inline int
784 rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p,
785  unsigned int esize)
786 {
787  return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
788  -ENOENT;
789 }
790 
811 static __rte_always_inline int
812 rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
813 {
814  return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 :
815  -ENOENT;
816 }
817 
840 static __rte_always_inline unsigned
841 rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
842  unsigned int esize, unsigned int n, unsigned int *free_space)
843 {
844  return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
845  RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
846 }
847 
869 static __rte_always_inline unsigned
870 rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
871  unsigned int esize, unsigned int n, unsigned int *free_space)
872 {
873  return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
874  RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
875 }
876 
900 static __rte_always_inline unsigned
901 rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
902  unsigned int esize, unsigned int n, unsigned int *free_space)
903 {
904  return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
905  RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
906 }
907 
932 static __rte_always_inline unsigned
933 rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
934  unsigned int esize, unsigned int n, unsigned int *available)
935 {
936  return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
937  RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
938 }
939 
961 static __rte_always_inline unsigned
962 rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
963  unsigned int esize, unsigned int n, unsigned int *available)
964 {
965  return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
966  RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
967 }
968 
992 static __rte_always_inline unsigned int
993 rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
994  unsigned int esize, unsigned int n, unsigned int *available)
995 {
996  return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
997  RTE_RING_QUEUE_VARIABLE,
998  r->cons.single, available);
999 }
1000 
1001 #ifdef __cplusplus
1002 }
1003 #endif
1004 
1005 #endif /* _RTE_RING_ELEM_H_ */
static __rte_always_inline int rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
static __rte_always_inline int rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
#define __rte_always_inline
Definition: rte_common.h:183
static __rte_always_inline int rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
#define likely(x)
static __rte_always_inline unsigned rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available)
int flags
Definition: rte_ring.h:88
static __rte_always_inline unsigned int rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space)
static __rte_always_inline unsigned rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space)
static __rte_always_inline unsigned int rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space)
static __rte_always_inline unsigned int rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available)
static __rte_always_inline unsigned rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available)
__rte_experimental struct rte_ring * rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count, int socket_id, unsigned int flags)
static __rte_always_inline int rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
uint32_t size
Definition: rte_ring.h:91
__rte_experimental ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count)
static __rte_always_inline unsigned rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space)
static __rte_always_inline unsigned int rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available)
uint32_t mask
Definition: rte_ring.h:92
static __rte_always_inline unsigned int rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available)
static __rte_always_inline int rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize)
static __rte_always_inline unsigned int rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space)
static __rte_always_inline int rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize)
static __rte_always_inline unsigned int rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available)
static __rte_always_inline unsigned rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space)