DPDK  20.05.0-rc0
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
23 #ifdef __cplusplus
24 extern "C" {
25 #endif
26 
27 #include <stdbool.h>
28 #include <stdio.h>
29 #include <stdint.h>
30 #include <inttypes.h>
31 #include <errno.h>
32 #include <rte_common.h>
33 #include <rte_memory.h>
34 #include <rte_lcore.h>
35 #include <rte_debug.h>
36 #include <rte_atomic.h>
37 
38 extern int rte_rcu_log_type;
39 
40 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
41 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
42  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
43  "%s(): " fmt "\n", __func__, ## args)
44 #else
45 #define __RTE_RCU_DP_LOG(level, fmt, args...)
46 #endif
47 
48 #if defined(RTE_LIBRTE_RCU_DEBUG)
49 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
50  if (v->qsbr_cnt[thread_id].lock_cnt) \
51  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
52  "%s(): " fmt "\n", __func__, ## args); \
53 } while (0)
54 #else
55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
56 #endif
57 
58 /* Registered thread IDs are stored as a bitmap of 64b element array.
59  * Given thread id needs to be converted to index into the array and
60  * the id within the array element.
61  */
62 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
63 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
64  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
65  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
66 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
67  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
68 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
69 #define __RTE_QSBR_THRID_MASK 0x3f
70 #define RTE_QSBR_THRID_INVALID 0xffffffff
71 
72 /* Worker thread counter */
73 struct rte_rcu_qsbr_cnt {
74  uint64_t cnt;
80  uint32_t lock_cnt;
83 
84 #define __RTE_QSBR_CNT_THR_OFFLINE 0
85 #define __RTE_QSBR_CNT_INIT 1
86 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
87 
88 /* RTE Quiescent State variable structure.
89  * This structure has two elements that vary in size based on the
90  * 'max_threads' parameter.
91  * 1) Quiescent state counter array
92  * 2) Register thread ID array
93  */
94 struct rte_rcu_qsbr {
95  uint64_t token __rte_cache_aligned;
97  uint64_t acked_token;
102  uint32_t num_elems __rte_cache_aligned;
104  uint32_t num_threads;
106  uint32_t max_threads;
109  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
116 
131 __rte_experimental
132 size_t
133 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
134 
153 __rte_experimental
154 int
155 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
156 
180 __rte_experimental
181 int
182 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
183 
202 __rte_experimental
203 int
204 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
205 
234 __rte_experimental
235 static __rte_always_inline void
236 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
237 {
238  uint64_t t;
239 
240  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
241 
242  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
243  v->qsbr_cnt[thread_id].lock_cnt);
244 
245  /* Copy the current value of token.
246  * The fence at the end of the function will ensure that
247  * the following will not move down after the load of any shared
248  * data structure.
249  */
250  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
251 
252  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
253  * 'cnt' (64b) is accessed atomically.
254  */
255  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
256  t, __ATOMIC_RELAXED);
257 
258  /* The subsequent load of the data structure should not
259  * move above the store. Hence a store-load barrier
260  * is required.
261  * If the load of the data structure moves above the store,
262  * writer might not see that the reader is online, even though
263  * the reader is referencing the shared data structure.
264  */
265 #ifdef RTE_ARCH_X86_64
266  /* rte_smp_mb() for x86 is lighter */
267  rte_smp_mb();
268 #else
269  __atomic_thread_fence(__ATOMIC_SEQ_CST);
270 #endif
271 }
272 
296 __rte_experimental
297 static __rte_always_inline void
298 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
299 {
300  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
301 
302  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
303  v->qsbr_cnt[thread_id].lock_cnt);
304 
305  /* The reader can go offline only after the load of the
306  * data structure is completed. i.e. any load of the
307  * data strcture can not move after this store.
308  */
309 
310  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
311  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
312 }
313 
337 __rte_experimental
338 static __rte_always_inline void
339 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
340  __rte_unused unsigned int thread_id)
341 {
342  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
343 
344 #if defined(RTE_LIBRTE_RCU_DEBUG)
345  /* Increment the lock counter */
346  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
347  1, __ATOMIC_ACQUIRE);
348 #endif
349 }
350 
374 __rte_experimental
375 static __rte_always_inline void
376 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
377  __rte_unused unsigned int thread_id)
378 {
379  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
380 
381 #if defined(RTE_LIBRTE_RCU_DEBUG)
382  /* Decrement the lock counter */
383  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
384  1, __ATOMIC_RELEASE);
385 
386  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
387  "Lock counter %u. Nested locks?\n",
388  v->qsbr_cnt[thread_id].lock_cnt);
389 #endif
390 }
391 
408 __rte_experimental
409 static __rte_always_inline uint64_t
410 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
411 {
412  uint64_t t;
413 
414  RTE_ASSERT(v != NULL);
415 
416  /* Release the changes to the shared data structure.
417  * This store release will ensure that changes to any data
418  * structure are visible to the workers before the token
419  * update is visible.
420  */
421  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
422 
423  return t;
424 }
425 
441 __rte_experimental
442 static __rte_always_inline void
443 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
444 {
445  uint64_t t;
446 
447  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
448 
449  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
450  v->qsbr_cnt[thread_id].lock_cnt);
451 
452  /* Acquire the changes to the shared data structure released
453  * by rte_rcu_qsbr_start.
454  * Later loads of the shared data structure should not move
455  * above this load. Hence, use load-acquire.
456  */
457  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
458 
459  /* Check if there are updates available from the writer.
460  * Inform the writer that updates are visible to this reader.
461  * Prior loads of the shared data structure should not move
462  * beyond this store. Hence use store-release.
463  */
464  if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
465  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
466  t, __ATOMIC_RELEASE);
467 
468  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
469  __func__, t, thread_id);
470 }
471 
472 /* Check the quiescent state counter for registered threads only, assuming
473  * that not all threads have registered.
474  */
475 static __rte_always_inline int
476 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
477 {
478  uint32_t i, j, id;
479  uint64_t bmap;
480  uint64_t c;
481  uint64_t *reg_thread_id;
482  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
483 
484  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
485  i < v->num_elems;
486  i++, reg_thread_id++) {
487  /* Load the current registered thread bit map before
488  * loading the reader thread quiescent state counters.
489  */
490  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
491  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
492 
493  while (bmap) {
494  j = __builtin_ctzl(bmap);
495  __RTE_RCU_DP_LOG(DEBUG,
496  "%s: check: token = %"PRIu64", wait = %d, Bit Map = 0x%"PRIx64", Thread ID = %d",
497  __func__, t, wait, bmap, id + j);
498  c = __atomic_load_n(
499  &v->qsbr_cnt[id + j].cnt,
500  __ATOMIC_ACQUIRE);
501  __RTE_RCU_DP_LOG(DEBUG,
502  "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
503  __func__, t, wait, c, id+j);
504 
505  /* Counter is not checked for wrap-around condition
506  * as it is a 64b counter.
507  */
508  if (unlikely(c !=
509  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
510  /* This thread is not in quiescent state */
511  if (!wait)
512  return 0;
513 
514  rte_pause();
515  /* This thread might have unregistered.
516  * Re-read the bitmap.
517  */
518  bmap = __atomic_load_n(reg_thread_id,
519  __ATOMIC_ACQUIRE);
520 
521  continue;
522  }
523 
524  /* This thread is in quiescent state. Use the counter
525  * to find the least acknowledged token among all the
526  * readers.
527  */
528  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
529  acked_token = c;
530 
531  bmap &= ~(1UL << j);
532  }
533  }
534 
535  /* All readers are checked, update least acknowledged token.
536  * There might be multiple writers trying to update this. There is
537  * no need to update this very accurately using compare-and-swap.
538  */
539  if (acked_token != __RTE_QSBR_CNT_MAX)
540  __atomic_store_n(&v->acked_token, acked_token,
541  __ATOMIC_RELAXED);
542 
543  return 1;
544 }
545 
546 /* Check the quiescent state counter for all threads, assuming that
547  * all the threads have registered.
548  */
549 static __rte_always_inline int
550 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
551 {
552  uint32_t i;
553  struct rte_rcu_qsbr_cnt *cnt;
554  uint64_t c;
555  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
556 
557  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
558  __RTE_RCU_DP_LOG(DEBUG,
559  "%s: check: token = %"PRIu64", wait = %d, Thread ID = %d",
560  __func__, t, wait, i);
561  while (1) {
562  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
563  __RTE_RCU_DP_LOG(DEBUG,
564  "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
565  __func__, t, wait, c, i);
566 
567  /* Counter is not checked for wrap-around condition
568  * as it is a 64b counter.
569  */
570  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
571  break;
572 
573  /* This thread is not in quiescent state */
574  if (!wait)
575  return 0;
576 
577  rte_pause();
578  }
579 
580  /* This thread is in quiescent state. Use the counter to find
581  * the least acknowledged token among all the readers.
582  */
583  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
584  acked_token = c;
585  }
586 
587  /* All readers are checked, update least acknowledged token.
588  * There might be multiple writers trying to update this. There is
589  * no need to update this very accurately using compare-and-swap.
590  */
591  if (acked_token != __RTE_QSBR_CNT_MAX)
592  __atomic_store_n(&v->acked_token, acked_token,
593  __ATOMIC_RELAXED);
594 
595  return 1;
596 }
597 
632 __rte_experimental
633 static __rte_always_inline int
634 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
635 {
636  RTE_ASSERT(v != NULL);
637 
638  /* Check if all the readers have already acknowledged this token */
639  if (likely(t <= v->acked_token))
640  return 1;
641 
642  if (likely(v->num_threads == v->max_threads))
643  return __rte_rcu_qsbr_check_all(v, t, wait);
644  else
645  return __rte_rcu_qsbr_check_selective(v, t, wait);
646 }
647 
669 __rte_experimental
670 void
671 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
672 
691 __rte_experimental
692 int
693 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
694 
695 #ifdef __cplusplus
696 }
697 #endif
698 
699 #endif /* _RTE_RCU_QSBR_H_ */
static __rte_experimental __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:634
__rte_experimental int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:183
#define likely(x)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:236
#define __rte_unused
Definition: rte_common.h:99
__rte_experimental int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:443
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:298
#define unlikely(x)
static void rte_pause(void)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:376
__rte_experimental int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_experimental __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:410
__rte_experimental void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:339
#define __rte_cache_aligned
Definition: rte_common.h:347
static void rte_smp_mb(void)
__rte_experimental size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)