Linux_RCU机制详解.doc

上传人:sk****8 文档编号:3551914 上传时间:2019-06-04 格式:DOC 页数:16 大小:58.50KB
下载 相关 举报
Linux_RCU机制详解.doc_第1页
第1页 / 共16页
Linux_RCU机制详解.doc_第2页
第2页 / 共16页
Linux_RCU机制详解.doc_第3页
第3页 / 共16页
Linux_RCU机制详解.doc_第4页
第4页 / 共16页
Linux_RCU机制详解.doc_第5页
第5页 / 共16页
点击查看更多>>
资源描述

1、一:前言RCU 机制出现的比较早,只是在 linux kernel 中一直到 2.5 版本的时候才被采用.关于 RCU 机制,这里就不做过多的介绍了,网上有很多有关 RCU 介绍和使用的文档.请自行查阅.本文主要是从 linux kernel 源代码的角度.来分析 RCU 的实现.在讨论 RCU 的实现之前.有必要重申以下几点:1:RCU 使用在读者多而写者少的情况.RCU 和读写锁相似.但 RCU 的读者占锁没有任何的系统开销.写者与写写者之间必须要保持同步,且写者必须要等它之前的读者全部都退出之后才能释放之前的资源.2:RCU 保护的是指针.这一点尤其重要.因为指针赋值是一条单指令 .也就

2、是说是一个原子操作.因它更改指针指向没必要考虑它的同步.只需要考虑 cache 的影响.3:读者是可以嵌套的.也就是说 rcu_read_lock()可以嵌套调用.4:读者在持有 rcu_read_lock()的时候,不能发生进程上下文切换.否则,因为写者需要要等待读者完成,写者进程也会一直被阻塞.以下的代码是基于 linux kernel 2.6.26二:使用 RCU 的实例Linux kernel 中自己附带有详细的文档来介绍 RCU,这些文档位于 linux-2.6.26.3/Documentation/RCU. 这些文档值得多花点时间去仔细研读一下 .下面以 whatisRCU.txt

3、 中的例子作为今天分析的起点:struct foo int a;char b;long c;DEFINE_SPINLOCK(foo_mutex);struct foo *gbl_foo;void foo_update_a(int new_a)struct foo *new_fp;struct foo *old_fp;new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);spin_lock(old_fp = gbl_foo;*new_fp = *old_fp;new_fp-a = new_a;rcu_assign_pointer(gbl_foo, new_f

4、p);spin_unlock(synchronize_rcu();kfree(old_fp);int foo_get_a(void)int retval;rcu_read_lock();retval = rcu_dereference(gbl_foo)-a;rcu_read_unlock();return retval;如上代码所示,RCU 被用来保护全局指针 struct foo *gbl_foo. foo_get_a()用来从 RCU 保护的结构中取得 gbl_foo 的值.而 foo_update_a()用来更新被 RCU 保护的 gbl_foo 的值.另外,我们思考一下,为什么要在 f

5、oo_update_a()中使用自旋锁 foo_mutex 呢?假设中间没有使用自旋锁.那 foo_update_a()的代码如下:void foo_update_a(int new_a)struct foo *new_fp;struct foo *old_fp;new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);old_fp = gbl_foo;1:- *new_fp = *old_fp;new_fp-a = new_a;rcu_assign_pointer(gbl_foo, new_fp);synchronize_rcu();kfree(old_fp

6、);假设 A 进程在上图-标识处被 B 进程抢点.B 进程也执行了 goo_ipdate_a().等 B 执行完后,再切换回 A 进程.此时,A 进程所持的 old_fd 实际上已经被 B 进程给释放掉了.此后 A 进程对old_fd 的操作都是非法的.另外,我们在上面也看到了几个有关 RCU 的核心 API.它们为别是:rcu_read_lock()rcu_read_unlock()synchronize_rcu()rcu_assign_pointer()rcu_dereference()其中,rcu_read_lock()和 rcu_read_unlock()用来保持一个读者的 RCU 临

7、界区.在该临界区内不允许发生上下文切换.rcu_dereference():读者调用它来获得一个被 RCU 保护的指针.Rcu_assign_pointer():写者使用该函数来为被 RCU 保护的指针分配一个新的值.这样是为了安全从写者到读者更改其值.这个函数会返回一个新值三:RCU API 实现分析Rcu_read_lock()和 rcu_read_unlock()的实现如下:#define rcu_read_lock() _rcu_read_lock()#define rcu_read_unlock() _rcu_read_unlock()#define _rcu_read_lock()

8、 do preempt_disable(); _acquire(RCU); rcu_read_acquire(); while (0)#define _rcu_read_unlock() do rcu_read_release(); _release(RCU); preempt_enable(); while (0)其中_acquire(),rcu_read_read_acquire(),rcu_read_release(),rcu_read_release()都是一些选择编译函数,可以忽略不可看.因此可以得知.rcu_read_lock(),rcu_read_unlock()只是禁止和启用抢

9、占.因为在读者临界区,不允许发生上下文切换.rcu_dereference()和 rcu_assign_pointer()的实现如下:#define rcu_dereference(p) ( typeof(p) _p1 = ACCESS_ONCE(p); smp_read_barrier_depends(); (_p1); )#define rcu_assign_pointer(p, v) ( if (!_builtin_constant_p(v) | (v) != NULL) smp_wmb(); (p) = (v); )它们的实现也很简单.因为它们本身都是原子操作.因为只是为了 cache

10、 一致性,插上了内存屏障.可以让其它的读者/写者可以看到保护指针的最新值 .synchronize_rcu()在 RCU 中是一个最核心的函数 ,它用来等待之前的读者全部退出.我们后面的大部份分析也是围绕着它而进行.实现如下:void synchronize_rcu(void)struct rcu_synchronize rcu;init_completion(/* Will wake me after RCU finished */call_rcu(/* Wait for it */wait_for_completion(我们可以看到,它初始化了一个本地变量,它的类型为 struct rcu

11、_synchronize.调用 call_rcu()之后.一直等待条件变量 petion 的满足.在这里看到了 RCU 的另一个核心 API,它就是 call_run().它的定义如下:void call_rcu(struct rcu_head *head,void (*func)(struct rcu_head *rcu)它用来等待之前的读者操作完成之后,就会调用函数 func.我们也可以看到,在 synchronize_rcu()中,读者操作完了要调用的函数就是 wakeme_after_rcu().另外,call_rcu()用在不可睡眠的条件中,如果中断环境,禁止抢占环境等.而 sync

12、hronize_rcu()用在可睡眠的环境下.先跟踪看一下 wakeme_after_rcu():static void wakeme_after_rcu(struct rcu_head *head)struct rcu_synchronize *rcu;rcu = container_of(head, struct rcu_synchronize, head);complete(我们可以看到,该函数将条件变量置真,然后唤醒了在条件变量上等待的进程.看下 call_rcu():void call_rcu(struct rcu_head *head,void (*func)(struct rcu

13、_head *rcu)unsigned long flags;struct rcu_data *rdp;head-func = func;head-next = NULL;local_irq_save(flags);rdp = *rdp-nxttail = head;rdp-nxttail = if (unlikely(+rdp-qlen qhimark) rdp-blimit = INT_MAX;force_quiescent_state(rdp, local_irq_restore(flags);该函数也很简单,就是将 head 加在了 per_cpu 变量 rcu_data 的 tail

14、 链表上.Rcu_data 定义如下:DEFINE_PER_CPU(struct rcu_data, rcu_data) = 0L ;由此,我们可以得知,每一个 CPU 都有一个 rcu_data.每个调用 call_rcu()/synchronize_rcu()进程所代表的 head 都会挂到 rcu_data 的 tail 链表上.那究竟怎么去判断当前的写者已经操作完了呢?我们在之前看到,不是读者在调用rcu_read_lock()的时候要禁止抢占么?因此,我们只需要判断如有的 CPU 都进过了一次上下文切换,就说明所有读者已经退出了.引用( (http:/ grace period,而

15、CPU 发生了上下文切换称为经历一个quiescent state, grace period 就是所有 CPU 都经历一次 quiescent state 所需要的等待的时间。垃圾收集器就是在 grace period 之后调用写者注册的回调函数来完成真正的数据修改或数据释放操作的”要彻底弄清楚这个问题,我们得从 RCU 的初始化说起.四:从 RCU 的初始化说起RCU 的初始化位于 start_kernel()rcu_init().代码如下:void _init rcu_init(void)_rcu_init();void _init _rcu_init(void)rcu_cpu_noti

16、fy(/* Register notifier for non-boot CPUs */register_cpu_notifier(Reqister_cpu_notifier()是关于通知链表的操作,可以忽略不看.跟进 rcu_cpu_notify():static int _cpuinit rcu_cpu_notify(struct notifier_block *self,unsigned long action, void *hcpu)long cpu = (long)hcpu;switch (action) case CPU_UP_PREPARE:case CPU_UP_PREPARE

17、_FROZEN:rcu_online_cpu(cpu);break;case CPU_DEAD:case CPU_DEAD_FROZEN:rcu_offline_cpu(cpu);break;default:break;return NOTIFY_OK;注意到,在_rcu_init()中是以 CPU_UP_PREPARE 为参数调用此函数,对应流程转入rcu_online_cpu 中:static void _cpuinit rcu_online_cpu(int cpu)struct rcu_data *rdp = struct rcu_data *bh_rdp = rcu_init_perc

18、pu_data(cpu, rcu_init_percpu_data(cpu, open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);我们从这里又看到了另一个 per_cpu 变量,rcu_bh_data.有关 bh 的部份之后再来分析.在这里略过这些部份.Rcu_init_percpu_data()如下:static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,struct rcu_data *rdp)memset(rdp, 0, sizeof(*rdp);rdp-

19、curtail = rdp-nxttail = rdp-donetail = rdp-quiescbatch = rcp-completed;rdp-qs_pending = 0;rdp-cpu = cpu;rdp-blimit = blimit;调用这个函数的第二个参数是一个全局变量 rcu_ctlblk.定义如下:static struct rcu_ctrlblk rcu_ctrlblk = .cur = -300,.completed = -300,.lock = _SPIN_LOCK_UNLOCKED(static struct rcu_ctrlblk rcu_bh_ctrlblk =

20、 .cur = -300,.completed = -300,.lock = _SPIN_LOCK_UNLOCKED(在 rcu_init_percpu_data 中,初始化了三个链表 ,分别是 taillist,curlist 和 donelist.另外, 将 rdp-quiescbatch 赋值为 rcp-completed.这个是一个很重要的操作.Rdp- quiescbatch 表示 rcu_data 已经完成的 grace period 序号(在代码中也被称为了 batch),rcp-completed 表示全部变量 rcu_ctrlblk 计数已经完成的 grace period

21、序号.将 rdp-quiescbatch = rcp-completed;,表示不需要等待 grace period.回到 rcu_online_cpu()中:open_softirq(RCU_SOFTIRQ, rcu_process_callbacks, NULL);初始化了 RCU_SOFTIRQ 类型的软中断 .但这个软中断什么时候被打开 ,还需要之后来分析.之后,每个 CPU 的初始化都会经过 start_kernel()-rcu_init().相应的,也为每个 CPU 初始化了RCU 的相关结构.五:等待 RCU 读者操作完成之前,我们看完了 RCU 的初始化,现在可以来看一下 RC

22、U 如何来判断当前的 RCU 读者已经退出了.在每一次进程切换的时候,都会调用 rcu_qsctr_inc().如下代码片段如示:asmlinkage void _sched schedule(void) rcu_qsctr_inc(cpu);.Rcu_qsctr_inc()代码如下:static inline void rcu_qsctr_inc(int cpu)struct rcu_data *rdp = rdp-passed_quiesc = 1;该函数将对应 CPU 上的 rcu_data 的 passed_quiesc 成员设为了 1.或许你已经发现了,这个过程就标识该 CPU 经过

23、了一次 quiescent state.没错:-)另外,在时钟中断中,会进行以下操作:void update_process_times(int user_tick) if (rcu_pending(cpu)rcu_check_callbacks(cpu, user_tick); 在每一次时钟中断,都会检查是否有需要更新的 RCU 需要处理,如果有,就会为其调用rcu_check_callbacks().Rcu_pending()的代码如下:int rcu_pending(int cpu)return _rcu_pending(同上面一样,忽略 bh 的部份.static int _rcu_p

24、ending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)/* This cpu has pending rcu entries and the grace period* for them has completed.*/if (rdp-curlist /* This cpu has no pending entries, but there are new entries */if (!rdp-curlist /* This cpu has finished callbacks to invoke */if (rdp-donelist)ret

25、urn 1;/* The rcu core waits for a quiescent state from the cpu */if (rdp-quiescbatch != rcp-cur | rdp-qs_pending)return 1;/* nothing to do */return 0;上面有四种情况会返回 1,分别对应:1:该 CPU 上有等待处理的回调函数,且已经经过了一个 batch(grace period).rdp-datch 表示rdp 在等待的 batch 序号2:上一个等待已经处理完了,又有了新注册的回调函数 .3:等待已经完成,但尚末调用该次等待的回调函数 .4:

26、在等待 quiescent state.关于 rcp 和 rdp 结构中成员的含义 ,我们等用到的时候再来分析.如果 rcu_pending 返回 1,就会进入到 rcu_check_callbacks().代码如下:void rcu_check_callbacks(int cpu, int user)if (user |(idle_cpu(cpu) rcu_bh_qsctr_inc(cpu); else if (!in_softirq()rcu_bh_qsctr_inc(cpu);raise_rcu_softirq();如果已经 CPU 中运行的进程是用户空间进程或者是 CPU 空闲且不处于

27、中断环境,那么,它也已经进过了一次切换.注意,RCU 只能在内核空间使用.最后调用 raise_rcu_softirq()打开了软中断处理 .相应的,也就调用 RCU 的软中断处理函数.结合上面分析的初始化流程,软中断的处理函数为 rcu_process_callbacks().代码如下:static void rcu_process_callbacks(struct softirq_action *unused)_rcu_process_callbacks(_rcu_process_callbacks(在阅读_rcu_process_callbacks()之前,先来了解一下 rdp 中几个链

28、表的含义:每次新注册的回调函数,都会链入到 rdp-taillist.当前等待 grace period 完成的函数都会链入到 rdp-curlist 上.到等待的 grace period 已经到来,就会将 curlist 上的链表移到 donelist 上.当一个 grace period 过了之后,就会将 taillist 上的数据移到 rdp-curlist 上.之后加册的回调函数又会将其加到 rdp-taillist 上._rcu_process_callbacks()代码分段分析如下:static void _rcu_process_callbacks(struct rcu_ctr

29、lblk *rcp,struct rcu_data *rdp)if (rdp-curlist rdp-donetail = rdp-curtail;rdp-curlist = NULL;rdp-curtail = 如果有需要处理的回调函数,且已经经过了一次 grace period.就将 curlist 上的数据移到donetlist 上.其中,crp-completed 表示已经完成的 grace period.rdp-batch 表示该 CPU 正在等待的 grace period 序号 .if (rdp-nxtlist rdp-curlist = rdp-nxtlist;rdp-curt

30、ail = rdp-nxttail;rdp-nxtlist = NULL;rdp-nxttail = local_irq_enable();/* start the next batch of callbacks*/* determine batch number */rdp-batch = rcp-cur + 1;/* see the comment and corresponding wmb() in* the rcu_start_batch()*/smp_rmb();if (!rcp-next_pending) /* and start it/schedule start if its

31、a new batch */spin_lock(rcp-next_pending = 1;rcu_start_batch(rcp);spin_unlock(如果上一个等待的回调函数处理完了,而且又有了新注册的回调函数.就将 taillist 上的数据移动到 curlist 上. 并开启新的 grace period 等待.注意里面几个变量的赋值: rdp-batch = rcp-cur + 1 表示该 CPU 等待的 grace period 置为当前已发生 grace period 序号的下一个.每次启动一个新的 grace period 等待之后,就会将 rcp-next_pending.在启动的过程中,也就是rcu_start_batch()的过程中,会将 rcp-next_pending 置为 1.设置这个变量主要是防止多个写者竞争的情况/更新相关信息rcu_check_quiescent_state(rcp, rdp);/处理等待完成的回调函数if (rdp-donelist)rcu_do_batch(rdp);

展开阅读全文
相关资源
相关搜索

当前位置:首页 > 教育教学资料库 > 精品笔记

Copyright © 2018-2021 Wenke99.com All rights reserved

工信部备案号浙ICP备20026746号-2  

公安局备案号:浙公网安备33038302330469号

本站为C2C交文档易平台,即用户上传的文档直接卖给下载用户,本站只是网络服务中间平台,所有原创文档下载所得归上传人所有,若您发现上传作品侵犯了您的权利,请立刻联系网站客服并提供证据,平台将在3个工作日内予以改正。