2013年12月25日星期三

Kprobes源码分析----kprobe的处理

    在探测点注册kprobe后,会在执行到探测点的指令时触发断点异常(trap 3)。kprobes在断点异常的通知链die_chain上注册了自己的处理函数,这个函数就是kprobe_exception_notify()。kprobe_exception_notify()不仅会接收到断点异常的通知,还会接收到调试异常(trap 1)和保护异常的通知。这篇文章主要围绕这个函数的处理来展开。

    1.断点异常处理

   断点是int 3指令触发的,系统中这个异常是由int3()函数来处理,这个函数在entry_32.S中定义。断点异常的处理函数int3()是在trap_init()中注册,注册的时候使用的门类型是中断门,这意味着在处理断点异常时是要关闭中断的,这和普通的异常处理是不同的。在关中断的情况下,CPU仍然可以接收到NMI和CPU引发的异常,包括断点异常。
    int3()中主要是调用do_int3()在处理。do_int3()中在由内核处理前,会通知注册在die_chian通知链上的模块,如果这个异常由内核其他模块处理,内核就不再处理了,源码如下所示:
dotraplinkage void __kprobes do_int3(struct pt_regs *regs, long error_code)
{
#ifdef CONFIG_KPROBES
    if (notify_die(DIE_INT3, "int3", regs, error_code, 3, SIGTRAP)
            == NOTIFY_STOP)
        return;
#else
    if (notify_die(DIE_TRAP, "int3", regs, error_code, 3, SIGTRAP)
            == NOTIFY_STOP)
        return;
#endif

    preempt_conditional_sti(regs);
    do_trap(3, SIGTRAP, "int3", regs, error_code, NULL);
    preempt_conditional_cli(regs);
}
    如果断点异常是由kprobe引发的,kprobe_exception_notify()会返回NOTIFY_STOP,不再由内核处理。通过前面的分析,我们可以看出kprobe_exception_notify()是在关中断的情况下调用的,所以这个函数的处理要尽可能地短,不能发生调度,也不能出现会导致睡眠的操作(例如获取互斥锁或信号量)。
    断点异常发生时,通知链上的通知类型为DIE_INT3,这种类型在kprobe_exception_notify()中会调用kprobe_handler()来处理,如下所示:
int __kprobes kprobe_exceptions_notify(struct notifier_block *self,
                       unsigned long val, void *data)
{
......
    switch (val) {
    case DIE_INT3:
        if (kprobe_handler(args->regs))
            ret = NOTIFY_STOP;
        break;
    case DIE_DEBUG:
......
    return ret;
}
    kprobe_handler()中会处理普通的kprobe处理流程,即调用pre_handler接口,然后开始单步执行指令。由于在调用kprobe的handler的时候,handler中有可能会触发断点异常(虽然是关闭了中断,但是异常还是会处理的),所以kprobe_handler()也要处理kprobe重入的问题。重入的问题,这里我们只关注普通的kprobe处理流程,代码如下所示:
static int __kprobes kprobe_handler(struct pt_regs *regs)
{
    kprobe_opcode_t *addr;
    struct kprobe *p;
    struct kprobe_ctlblk *kcb;

    addr = (kprobe_opcode_t *)(regs->ip - sizeof(kprobe_opcode_t));
    if (*addr != BREAKPOINT_INSTRUCTION) {
        /*
         * The breakpoint instruction was removed right
         * after we hit it.  Another cpu has removed
         * either a probepoint or a debugger breakpoint
         * at this address.  In either case, no further
         * handling of this interrupt is appropriate.
         * Back up over the (now missing) int3 and run
         * the original instruction.
         */

        regs->ip = (unsigned long)addr;
        return 1;
    }

    /*
     * We don't want to be preempted for the entire
     * duration of kprobe processing. We conditionally
     * re-enable preemption at the end of this function,
     * and also in reenter_kprobe() and setup_singlestep().
     */

    preempt_disable();

    kcb = get_kprobe_ctlblk();
    p = get_kprobe(addr);

    if (p) {
        if (kprobe_running()) {
                    .......
        } else {
            set_current_kprobe(p, regs, kcb);
            kcb->kprobe_status = KPROBE_HIT_ACTIVE;

            /*
             * If we have no pre-handler or it returned 0, we
             * continue with normal processing.  If we have a
             * pre-handler and it returned non-zero, it prepped
             * for calling the break_handler below on re-entry
             * for jprobe processing, so get out doing nothing
             * more here.
             */

            if (!p->pre_handler || !p->pre_handler(p, regs))
                setup_singlestep(p, regs, kcb);
            return 1;
        }
    } else if (kprobe_running()) {
        ......
    } /* else: not a kprobe fault; let the kernel handle it */

    preempt_enable_no_resched();
    return 0;
}
    执行完断点指令后,指令指针IP会指向下一个指令的位置,所以这里要使用regs->ip减去断点指令的长度,得到断点指令的地址,存在局部变量addr中。如果addr处的指令不是断点指令,kprobes就不会再处理了,将指令指针指向探测点的位置,然后返回。断点异常处理后,CPU会从addr处的指令开始执行。这种情况可能是断点指令在发生断点异常后被另一个CPU移除,或者是探测点被禁止。
    如果断点指令没有被移除,则kprobes会继续处理。kprobes会首先调用get_kprobe_ctlblk()获取kprobe控制块,获取的变量是一个per-cpu变量,这个变量中会存储kprobe处理的状态,还有可能保存处理kprobe时的寄存器信息等。
    接着会调用get_kprobe()获取addr的位置注册的kprobe。如果kprobe不存在,并且没有kprobe正在处理,则表示addr的位置没有注册kprobe,并且也不是jprobes的处理(kprobe不存在,但是有kprobe正在处理)。
    如果addr处有一个对应的kprobe,则会调用set_current_kprobe()将addr处的kprobe设置到per-cpu变量current_kprobe中,并且把TF(单步执行)和IF(开关中断)标志保存到kprobe控制块中。将kprobe变量保存到current_kprobe中,这样就可以通过这个变量来判断是否当前CPU上正在处理kprobe。
    在kprobe的处理过程中要记录当前kprobe处理的状态,在调用pre_handler之前,状态会设置为KPROBE_HIT_ACTIVE。
    如果在注册kprobe时指定了pre_handler接口,则会调用用户指定的接口。如果pre_handler接口返回1,则不会进行单步执行的过程,post_handler接口也不会被调用(依赖于单步执行过程),直接结束kprobe的处理。

    2.单步执行

    单步执行由setup_singlestep()函数来启动,它做的主要工作就是调用prepare_singlestep()来为单步执行做准备,然后把kprobe的处理状态变为KPROBE_HIT_SS,如下所示:
static void __kprobes setup_singlestep(struct kprobe *p, struct pt_regs *regs,
                       struct kprobe_ctlblk *kcb)
{
.......
    prepare_singlestep(p, regs);
    kcb->kprobe_status = KPROBE_HIT_SS;
}
    prepare_singlesetp()中的处理也很简单,主要是将标志寄存器的TF标志位置1,然后就是设置单步执行指令的地址。如果原始的指令就是断点指令,会将指令指针指向探测点的位置;如果不是,则从保存的指令开始。如果原始的指令真的是断点指令,则会发生kprobe重入,重入时的处理后面再讲,这里先跳过。

    3.调试异常处理

    TF标志置1时,CPU每执行完一条指令就产生调试异常(trap 1)。调试异常由debug()函数处理,该函数在entry_32.S中定义,主要是调用do_debug()函数来完成的。do_debug()中会通知注册在die_chian通知链上模块,通知的类型为DIE_DEBUG。Kprobes注册的处理函数仍然是kprobe_exception_notify(),对应的处理如下所示:
int __kprobes kprobe_exceptions_notify(struct notifier_block *self,
                       unsigned long val, void *data)
{
        ......
    switch (val) {
        .......
    case DIE_DEBUG:
        if (post_kprobe_handler(args->regs))
            ret = NOTIFY_STOP;
        break;
    .......
    return ret;
}
    post_kprobe_handler()中主要是恢复原来的处理流程,并且会调用注册时指定的post_handler接口,源码如下所示:
static int __kprobes post_kprobe_handler(struct pt_regs *regs)
{
        .......
    resume_execution(cur, regs, kcb);
    regs->flags |= kcb->kprobe_saved_flags;

    if ((kcb->kprobe_status != KPROBE_REENTER) && cur->post_handler) {
        kcb->kprobe_status = KPROBE_HIT_SSDONE;
        cur->post_handler(cur, regs, 0);
    }
        .......
    reset_current_kprobe();
        .......

    return 1;
}
    为了在异常处理后从探测点之后开始执行,在这里需要调用resume_execution()来将指令指针IP指向探测点之后的位置,在resume_execution()中还会清除TF标志。在kprobe处理完成后,要恢复之前保存的寄存器标志位,这里主要是TF和IF标志。
    如果kprobe控制块的状态是KPROBE_REENTER,即发生了kprobe重入,则不会调用指定的post_handler接口。
    在调用完指定的post_handler接口后,kprobe的处理就完成了。最后调用reset_current_kprobe()将current_kprobe(per-cpu变量)置为NULL,表示kprobe的处理完成了。

    4.kprobe重入

    kprobe重入是指在kprobe的处理过程中又触发了断点异常,这种情况有可能是用户指定的pre_handler或post_handler接口,或者探测点处的指令本身就是断点指令。
    如果第二次发生断点异常的位置没有注册kprobe,kprobe_handler()中会调用正在处理的kprobe的break_handler接口,源码如下所示:
static int __kprobes kprobe_handler(struct pt_regs *regs)
{
    ......
    p = get_kprobe(addr);

    if (p) {
        .......
    } else if (kprobe_running()) {
        p = __get_cpu_var(current_kprobe);
        if (p->break_handler && p->break_handler(p, regs)) {
            setup_singlestep(p, regs, kcb);
            return 1;
        }
    } /* else: not a kprobe fault; let the kernel handle it */
        .......
    return 0;
}
    如果指定了break_handler接口并且返回1,则会开始单步执行保存的指令,顺着普通的kprobe流程继续处理。如果在处理kprobe的过程中又发生了断点异常,这也算是一种“异常”(jprobes就利用了这点,或者是专门为jprobes准备的),这种情况需要由break_handler接口来处理。如果处理成功,即返回1,则表示可以继续处理。
    如果第二次发生断点异常的位置也注册了kprobe,kprobe_handler()中会调用reenter_kprobe()来处理,源码如下所示:
static int __kprobes reenter_kprobe(struct kprobe *p, struct pt_regs *regs,
                    struct kprobe_ctlblk *kcb)
{
    switch (kcb->kprobe_status) {
    case KPROBE_HIT_SSDONE:
#ifdef CONFIG_X86_64
        /* TODO: Provide re-entrancy from post_kprobes_handler() and
         * avoid exception stack corruption while single-stepping on
         * the instruction of the new probe.
         */

        arch_disarm_kprobe(p);
        regs->ip = (unsigned long)p->addr;
        reset_current_kprobe();
        preempt_enable_no_resched();
        break;
#endif
    case KPROBE_HIT_ACTIVE:
        save_previous_kprobe(kcb);
        set_current_kprobe(p, regs, kcb);
        kprobes_inc_nmissed_count(p);
        prepare_singlestep(p, regs);
        kcb->kprobe_status = KPROBE_REENTER;
        break;
    case KPROBE_HIT_SS:
        if (p == kprobe_running()) {
            regs->flags &= ~X86_EFLAGS_TF;
            regs->flags |= kcb->kprobe_saved_flags;
            return 0;
        } else {
            /* A probe has been hit in the codepath leading up
             * to, or just after, single-stepping of a probed
             * instruction. This entire codepath should strictly
             * reside in .kprobes.text section. Raise a warning
             * to highlight this peculiar case.
             */

        }
    default:
        /* impossible cases */
        WARN_ON(1);
        return 0;
    }

    return 1;
}
    reenter_kprobe()中会根据当前kprobe处理的状态(kcb->kprobe_status)来做具体的处理。
    如果是KPROBE_HIT_SSDONE状态,说明是在调试异常中调用用户指定的post_handler接口时,第二次触发了断点异常,相当于是断点异常把调试异常给中断了。如果不是X86-64环境,处理和KPROBE_HIT_ACTIVE状态的处理一样。根据代码中的TODO注释,x86-64环境中单步执行新的kprobe指令时会导致异常栈崩溃,具体的原因现在不清楚,如果有知道的,麻烦告知一下。如果是在x86-64下,会调用arch_disarm_kprobe()将触发二次断点异常的kprobe设置的断点指令恢复成原始的指令,相当于是把当前处理的kprobe给禁止掉。然后将指令指针指向触发二次异常的探测点位置(此时已经没有kprobe设置的断点指令),并且调用reset_current_kprobe()将current_kprobe设置为NULL,这样前一个kprobe的处理相当于也结束了。注意,这里的处理完成后,后面还会回到调试异常的处理中,也就是返回到post_kprobe_handler()函数中。
    如果是KPROBE_HIT_ACTIVE状态,是在调用pre_handler接口时触发的断点异常。这种情况下会调用save_previous_kprobe()将前一个kprobe的信息保存到kprobe控制块kprobe_ctlblk(per-cpu变量)中,接着会调用set_current_kprobe()将当前处理的kprobe设置到current_kprobe中,最后开始单步执行新的kprobe处的指令。由于发生了kprobe重入,所以要将kprobe控制块的状态设置为KPROBE_REENTER。此时要开始新的kprobe的处理,把前一个kprobe的处理给挂起了,后面的处理流程和普通的kprobe处理流程相同。在当前的kprobe处理完成后,会继续前一个kprobe的处理,但是由于发生了重入,所以前一个kprobe的post_handler接口就不会调用了。
    如果是KPROBE_HIT_SS,是在单步执行保存的探测点指令时触发的断点异常。如果触发断点异常的指令就是探测点处的指令,此时处理的kprobe和kprobe_running()返回的kprobe是同一个,这样情况下会取消单步执行,恢复保存的寄存器标志。此时会返回0,表示由内核来处理这种情况,因为这个指令不是krobe设置上去的。如果在探测点之后的指令中,这种函数是不应该被探测的,应该放在.kprobes.text section中。此时kprobes会产生一个警告,并且返回0,由内核来处理这种情况。

Kprobes源码分析----kprobe的注册

    kprobes是一个动态地收集调试和性能信息的工具,使用它几乎可以跟踪任何函数或被执行的指令。它的机制也很简单,就是将被探测的位置的指令替换为断点指令(不考虑jmp优化),断点指令被执行后会通过notifier_call_chain机制来通知kprobes,kprobes会首先调用用户指定的pre_handler接口。执行pre_handler接口后会单步执行原始的指令,如果用户也指定了post_handler接口,会在调用post_handler接口后结束处理。
    基本的处理过程如下图所示:

    如果你没接触过kprobes,上面提到的pre_handler接口和post_handler接口可能让你疑惑,所以首先我们来介绍下和这些接口相关的kprobe结构。

    1.kprobe结构

    当你要探测一个函数时,需要首先分配一个kprobe实例,然后设置要探测的函数名称或地址,以及自己的处理函数。kprobe结构描述了探测点的位置以及要做的处理操作,其结构定义如下:
struct kprobe {
    struct hlist_node hlist;
    /* list of kprobes for multi-handler support */
    struct list_head list;
    /*count the number of times this probe was temporarily disarmed */
    unsigned long nmissed;
    /* location of the probe point */
    kprobe_opcode_t *addr;
    /* Allow user to indicate symbol name of the probe point */
    const char *symbol_name;
    /* Offset into the symbol */
    unsigned int offset;
    /* Called before addr is executed. */
    kprobe_pre_handler_t pre_handler;
    /* Called after addr is executed, unless... */
    kprobe_post_handler_t post_handler;
    /*
     * ... called if executing addr causes a fault (eg. page fault).
     * Return 1 if it handled fault, otherwise kernel will see it.
     */

    kprobe_fault_handler_t fault_handler;
    /*
     * ... called if breakpoint trap occurs in probe handler.
     * Return 1 if it handled break, otherwise kernel will see it.
     */

    kprobe_break_handler_t break_handler;
    /* Saved opcode (which has been replaced with breakpoint) */
    kprobe_opcode_t opcode;
    /* copy of the original instruction */
    struct arch_specific_insn ainsn;
    /*
     * Indicates various status flags.
     * Protected by kprobe_mutex after this kprobe is registered.
     */

    u32 flags;
};
    其成员说明如下:
    hlist:所有注册的kprobe都会添加到kprobe_table哈希表中,hlist成员用来链接到某个槽位中。
    list:如果在同一个位置注册了多个kprobe,这些kprobe会形成一个队列,队首是一个特殊的kprobe实例,list成员用来用来链接到这个队列中。当探测点被触发时,队首的kprobe实例中注册的handler会逐个遍历队列中注册的handler。
    nmissed:记录当前的probe没有被处理的次数。
    addr:这个成员有两个作用,一个是用户在注册前指定探测点的基地址(加上偏移得到真实的地址),另一个是在注册后保存探测点的实际地址。在注册前,这个可以不指定,由kprobes来初始化。如果没有指定,必须指定探测的位置的符号信息,例如函数名。
    symbol_name:探测点的符号名称。名称和地址不能同时指定,否则注册时会返回EINVAL错误。
    offset:探测点相对于addr地址的偏移
    pre_handler:这个接口在断点异常触发之后,开始单步执行原始的指令之前被调用
    post_handler:在单步执行原始的指令后会被调用
    fault_handler:如果执行过程中出错,则调用该接口。如果返回1,则表示错误由kprobes处理,否则由内核来处理。
    break_handler:在调用probe的处理函数(比如pre_handler接口)时触发了断点异常会调用该接口,。断点异常是通过中断门来处理的(参见trap_init()),在调用相应的处理函数前会自动关闭中断。关中断的情况下虽然不会接收可屏蔽的中断,但是CPU引发的异常或者NMI还是会接收到,所以有可能会发生断点异常处理嵌套,jprobes的实现就用到了这点。
    opcode:原始指令,在被替换为断点指令(X86下是int 3指令)前保存。
    ainsn:保存了探测点原始指令的拷贝。这里拷贝的指令要比opcode中存储的指令多,拷贝的大小为MAX_INSN_SIZE * sizeof(kprobe_opcode_t)。
    flags:探测点的标志,可取的值为KPROBE_FLAG_GONE和​KPROBE_FLAG_DISABLED。如果设置了KPROBE_FLAG_GONE标志,表示断点指令被移除;如果设置了 KPROBE_FLAG_DISABLED,则表示只注册probe,但是并不启用它,也就是说在断点异常触发时并不会调用该probe的接口。

    2.探测点的注册----register_kprobe()

    2.1 探测点地址的计算
register_kprobe()用来在指定的位置注册探测点。它会首先调用kprobe_addr()计算需要插入探测点的地址,源码如下所示:
int __kprobes register_kprobe(struct kprobe *p)
{
    int ret = 0;
    struct kprobe *old_p;
    struct module *probed_mod;
    kprobe_opcode_t *addr;

    addr = kprobe_addr(p);
    if (!addr)
        return -EINVAL;
    p->addr = addr;
    ......
}
    kprobe_addr()会首先检查用户是否设置探测点的地址,如果指定了地址,并且指定了符号信息(symbol_name成员也设置了),它会直接返回NULL,不会去检查符号信息和指定的地址是否一致。通过这里的代码不难发现,如果同时指定了符号信息和地址,register_kprobe()会返回EINVAL错误。
   如果没有指定探测点的地址,而是指定了符号信息,kprobe_addr()会调用kprobe_lookup_name()在内核符号表中查找符号对应的地址,在找到对应的符号地址后,加上偏移就得到探测点的实际位置。
  如果只指定了探测点的地址,kprobe_addr()会将这个地址直接加上偏移返回。
  kprobe_addr()返回的地址会被设置到kprobe的addr成员中,注册后通过这个成员就可以拿到被探测位置的地址。利用这个特性,你可以通过kprobe来获取内核中某一个函数的运行时地址。假设你在编写一个内核模块,但是你要使用函数内核没有导出,这个特性就很有用了。
    2.2 检查探测点地址
   计算探测点的地址后,接着就是要检查这个地址是否可以被探测,源码如下所示:
    ......
    preempt_disable();
    if (!kernel_text_address((unsigned long) p->addr) ||
        in_kprobes_functions((unsigned long) p->addr)) {
        preempt_enable();
        return -EINVAL;
    }

    /* User can pass only KPROBE_FLAG_DISABLED to register_kprobe */
    p->flags &= KPROBE_FLAG_DISABLED;

    /*
     * Check if are we probing a module.
     */

    probed_mod = __module_text_address((unsigned long) p->addr);
    if (probed_mod) {
        /*
         * We must hold a refcount of the probed module while updating
         * its code to prohibit unexpected unloading.
         */

        if (unlikely(!try_module_get(probed_mod))) {
            preempt_enable();
            return -EINVAL;
        }
        /*
         * If the module freed .init.text, we couldn't insert
         * kprobes in there.
         */

        if (within_module_init((unsigned long)p->addr, probed_mod) &&
            probed_mod->state != MODULE_STATE_COMING) {
            module_put(probed_mod);
            preempt_enable();
            return -EINVAL;
        }
    }
    preempt_enable();
    ......
    kprobes只能用作内核函数的探测,所以在注册前必须检查探测点的地址是否是在内核地址空间中。探测点的地址要么是在内核映像中(_stext和_etext之间,如果是在系统启动阶段,要在_sinittext和_einittext之间),要么是是在某个内核模块的地址空间中。具体的判断是在kernel_text_address()函数中处理的。
    实现kprobes的内核函数或者一些特殊的函数是不能探测的,所以还要进一步检测探测点的地址是否在这些限制空间内,这是由in_kprobes_functions()函数来处理的。kprobes把所有不能探测的函数都放在.kprobes.text section(使用__kprobes宏)中,这个段的起始地址是__kprobes_text_start和__kprobes_text_end。如果探测的地址在__kprobes_text_start和__kprobes_text_end之间,会返回EINVAL错误。某些函数也是不能探测的,但是已经放在其他section了,这些函数会放在kprobe_blacklist数组中。如果用户探测的函数在这个黑名单中,也会返回EINVAL错误。2.6.32中只有preempt_schedule()函数在kprobe_blacklist数组中,3.12中增加了native_get_debugreg()irq_entries_start()common_interrupt()mcount()
    如果探测点的地址是在一个内核模块中,需要增加对该模块的引用,以免模块提前卸载。但是如果模块已经开始卸载,此时也是不能注册探测点的,同样会返回EINVAL错误。模块中.init.text section占用的内存,在模块加载后,如果不再使用是可以被释放掉的。如果探测点在模块的.init.text section中,而该section已经被释放,此时是不能注册探测点的,同样会返回EINVAL错误。
    前面说了一大堆会返回EINVAL错误的情况,是时候要总结一下了。在以下情况中,会注册失败,返回EINVAL错误:
      1)    函数被__kprobes宏修饰的函数,这些函数会被放在.kprobes.text section中。这种类型大多数是实现kprobes的代码
      2)    kprobe_blacklist数组中列出的函数。2.6.32中只有preempt_schedule()函数,最新的3.12内核中又增加了native_get_debugreg()、irq_entries_start()、    
             common_interrupt()、mcount()。
      3)    要探测的地址位于一个模块的.init.text section,但是这个section已经被释放。
2.3 注册kprobe
    经过前面的处理后,可以开始执行真正的注册操作了,源码如下所示:
    p->nmissed = 0;
    INIT_LIST_HEAD(&p->list);
    mutex_lock(&kprobe_mutex);
    old_p = get_kprobe(p->addr);
    if (old_p) {
        ret = register_aggr_kprobe(old_p, p);
        goto out;
    }

    mutex_lock(&text_mutex);
    ret = arch_prepare_kprobe(p);
    if (ret)
        goto out_unlock_text;

    INIT_HLIST_NODE(&p->hlist);
    hlist_add_head_rcu(&p->hlist,
               &kprobe_table[hash_ptr(p->addr, KPROBE_HASH_BITS)]);

    if (!kprobes_all_disarmed && !kprobe_disabled(p))
        arch_arm_kprobe(p);

out_unlock_text:
    mutex_unlock(&text_mutex);
out:
    mutex_unlock(&kprobe_mutex);
    在初始化kprobe的相关成员后,首先要获取kprobe_mutex互斥锁,这个互斥锁用来保护kprobe_table哈希表,系统中所有已注册的kprobe实例都保存在这个哈希表中。
    kprobes允许在同一个探测点注册多个kprobe,如果调用get_kprobe()能找到一个kprobe实例,说明已经在当前的探测点注册了一个kprobe,这种情况下会调用register_aggr_kprobe()来处理。如果已注册的kprobe只有一个,register_aggr_kprobe()会分配一个新的kprobe实例,将已注册的kprobe中的内容拷贝到新分配的kprobe中,然后把已注册的kprobe的hanlder设置为kprobes提供的特殊的hanlder,这些handler在异常触发时会逐个遍历用户注册的kprobe中的handler。接着会将新分配的kprobe(不是当前调用register_kprobe()注册的那个)链接到前面已经注册的kprobe,形成一个队列。之后如果还有新的kprobe注册到当前的探测点,会直接添加到队列的末尾。注意,前面描述的register_aggr_kprobe()过程中还没有操作当前要注册的kprobe实例,在前面的操作都做完之后,就直接把新注册的kprobe实例添加到队尾,然后返回。这个过程中结构的变化如下图所示:

    如果在当前探测点没有注册过kprobe,则调用arch_prepare_kprobe()将被探测位置的指令保存到kprobe结构的ainsn成员中,并且被探测位置的第一条指令保存到opcode成员中。
    接着就可以将krobe实例添加到kprobe_table哈希表中,这个操作必须在将探测点的指令替换成断点指令前完成,否则有可能在注册完成前断点异常已经发生了。
    经过前面的处理,现在就剩下最后一步了,将探测点的指令替换为断点指令。这是通过调用text_poke()来完成,它可以将替换指定位置的指令。不过在调用text_poke()替换指令前,必须要获取text_mutex互斥锁。text_poke()真是太好用了,可以做很多事情.......
  至此,整个注册过程就完成了。

Linux CFS调度系统----周期性调度器

  周期性调度器由scheduler_tick()函数实现,在每个时钟中断中都会调用该函数来更新一些统计量,并且会激活当前进程所属调度类的周期性处理接口,代码流程如下所示:

  具体来说,scheduler_tick()做了以下工作:
    1)更新就绪队列的实际时钟时间,不是虚拟时钟时间。
    2)更新就绪队列权重数组cpu_load中的权重值
    3)调用当前CPU上正在执行的进程所属调度类的task_tick接口,更新调度类相关的统计信息,并检查是否需要重新调度
    4)如果是多处理器系统,检查当前CPU是否处于IDLE状态,并调用trigger_load_balance()来检查是否需要对CPU之间的负载进行均衡,如果需要,则触发SCHEDULE_SOFTIRQ软中断来迁移进程。
  下面来看一看内核中是如何来完成这些操作的。
  1.就绪队列时钟的更新
    就绪队列的时钟由update_rq_clock()函数来更新,如果在编译内核的时候启用了CONFIG_HAVE_UNSTABLE_SCHED_CLOCK选项(CentOS的内核是默认开启的),sched_clock_stable变量的值为1,这种情况下会调用sched_clock()函数来获取当前的CPU时间。sched_clock()函数中会调用rdstc指令来读取CPU的周期数,然后调用__cycles_2_ns()将周期数转换为纳秒。rdstc指令在多处理器下会有问题,关于这个问题的描述和解决办法,参见《多核时代不宜再用 x86 的 RDTSC 指令测试指令周期和时间》,较新的内核中已使用同步算法来fix这个问题。
  2.权重数组的更新
    就绪队列中的cpu_load数组用来跟踪此前的CPU负荷状态,在sched_init()中初始化为0,在CPU间迁移进程时会用到这个数组中记录的权重。cpu_load数组由update_cpu_load()函数更新,在每个tick周期里都会调用该函数,代码如下所示:

static void update_cpu_load(struct rq *this_rq)
{
    unsigned long this_load = this_rq->load.weight;
    int i, scale;

    this_rq->nr_load_updates++;

    /* Update our load: */
    for (i = 0, scale = 1; i < CPU_LOAD_IDX_MAX; i++, scale += scale) {
        unsigned long old_load, new_load;

        /* scale is effectively 1 << i now, and >> i divides by scale */

        old_load = this_rq->cpu_load[i];
        new_load = this_load;
        /*
         * Round up the averaging division if load is increasing. This
         * prevents us from getting stuck on 9 if the load is 10, for
         * example.
         */

        if (new_load > old_load)
            new_load += scale-1;
        this_rq->cpu_load[i] = (old_load*(scale-1+ new_load) >> i;
    }

    if (time_after_eq(jiffies, this_rq->calc_load_update)) {
        this_rq->calc_load_update += LOAD_FREQ;
        calc_load_account_active(this_rq);
    }
}
  在向就绪队列添加调度实体时,都会将调度实体的权重值添加到就绪队列的当前负荷的统计成员load中。在这里,this_load保存了当前就绪队列中的权重负荷。在更新cpu_load数组前要累加nr_load_updates成员,该成员记录了更新cpu_load数组的次数,只在输出/proc/sched_debug文件时使用,用于调试调度系统。
  在for循环中,根据老的权重值和当前的权重值来进行更新。为了便于理解,我们可以进行下面的转换:

    this_rq->cpu_load[i] = (old_load * (scale - 1+ new_load) >> i
                         = (old_load * (2^- 1+ new_load) / (2^i)
                         = old_load * (1 - 1 / (2^i)) + new_load / (2^i)
                         = old_load + (new_load - old_load) / (2^i)
  通过转换,我们可以清晰地看到cpu_load数组中的元素是如何更新的。如果i等于0,则2^i的值为1,所以this_rq->cpu_load[0]保存的就是更新时当前就绪队列的权重值。如果当前队列的权重值是增加的,会将new_load(保存的就是当前的权重值)加上(scale-1),向上取整。
  update_cpu_load()中除了更新cpu_load数组的内容后,还会检查是否要更新系统平均负载的统计信息,这些信息每隔5秒钟才更新一次,主要是统计在系统中处于active状态的进程的个数,包括进程状态是TASK_RUNNING和TASK_UNINTERRUPTIBLE的进程。系统的平均负载可以通过top或w命令查看。
  3.CFS的周期性处理
    CFS调度系统的调度类实例由全局变量fair_sched_class表示,设置的周期性处理接口是task_tick_fair()。在2.6.24中引入了组调度的概念,所以在task_tick_fair()中通过for_each_sched_entity宏来遍历处理当前进程所在的调度层级。这里为了简化,我们假设当前进程的parent为NULL,即当前进程处在就绪队列中的红黑树中。对当前进程的周期性处理主要由entity_tick()函数来完成,主要代码流程如下所示:


static void
entity_tick(struct cfs_rq *cfs_rq, struct sched_entity *curr, int queued)
{
    /*
     * Update run-time statistics of the 'current'.
     */

    update_curr(cfs_rq);
......
    if (cfs_rq->nr_running > 1 || !sched_feat(WAKEUP_PREEMPT))
        check_preempt_tick(cfs_rq, curr);
}
  update_curr()根据现在的实际时钟时间和进程的权重计算本次运行的虚拟时钟时间,并更新进程和CFS就绪队列相关的统计信息。update_curr()使用CPU就绪队列的实际时钟时间减去当前进程的开始运行时间(由sched_entity结构的exec_start成员描述),得到当前进程实际的运行时间,然后调用__update_curr()来将实际的运行时间转换为虚拟时钟时间,并且加到当前进程总的运行的虚拟时钟时间(由sched_entity的sum_exec_runtime成员描述)。实际时钟时间和虚拟时钟时间的转换公式为:

  在update_curr()中更新当前进程的虚拟运行时间后,需要重新计算CFS就绪队列中最小的虚拟运行时间。假设cfs_rq(结构类型为cfs_rq)为当前CPU就绪队列中的CFS就绪队列,最小的虚拟运行时间在CFS就绪队列当前的最小运行时间(即cfs_rq->min_vruntime)、正在执行的进程的虚拟运行时间(即cfs_rq->curr->vruntime,更新后的值)和CFS就绪队列中最左边节点(管理调度实体的红黑树中最左边的节点)的虚拟运行时间(cfs_rq->leftmost->vruntime)这三个值之间选择一个最小值。如果选出来的值大于当前CFS就绪队列的最小虚拟运行时间,则使用选出来的值来作为新的最小虚拟运行时间,并设置到cfs_rq-> min_vruntime上,否则维持原来的值不变。通过这样的策略,可以保证CFS就绪队列中的最小虚拟运行时间总是单调递增的,防止时钟倒流。由于最小虚拟运行时间总是单调递增的,所以就绪队列中最左边节点的运行时间有可能小于cfs_rq->min_vruntime。
  完成更新操作后,检查CFS就绪队列中可运行进程的数目是否大于1,如果大于1,则调用check_preempt_tick()检查是否要重新调度正在执行的进程,检查主要分以下几个步骤:
    1)调用sched_slice()计算当前进程在调度延迟内期望的运行时间。如果系统中可运行进程的数量小于sched_nr_latency(其值为sysctl_sched_latency/sysctl_sched_min_granularity),调度延迟由系统参数sysctl_sched_latency的值确定;如果可运行进程的数量大于sched_nr_latency,调度延迟的值为(sysctl_sched_latency * (nr_running / sched_nr_latency)),其中nr_running为CFS就绪队列中可运行进程的数量。而当前进程在调度延迟中分得的时间(实际时钟时间)根据下面的公式来计算(period为调度延迟,weight为当前进程的权重,cfs_rq->load.weight为CFS就绪队列的权重):

    2)如果当前进程本次已经执行的时间(实际时钟时间)超过期望的运行时间,说明当前进程运行的时间已经足够了,这种情况下要重新调度当前进程,并调用clear_buddies()确保当前进程不在CFS就绪队列中的next或last中,避免当前进程在下次选择执行进程时又被优先调度到。
    3)如果当前进程本次已经执行的时间小于进程的最小运行时间(保存在sysctl_sched_min_granularity中),也不能重新调度,避免进程切换的太过频繁。
    4)如果就绪队列中可运行进程的数量超过1,比较当前进程和就绪队列中最左边进程的运行时间来确定是否要重新调度。如果当前进程的虚拟运行时间减去就绪队列中最左边进程的虚拟运行时间的差值大于当前进程的期望运行时间,则重新调度当前进程。
  4.负载均衡
  多处理器系统中,内核必须要保证不同CPU间的负载要均衡,避免某些CPU的负载已经很高了,而某些CPU却很空闲,充分利用CPU资源。但是,迁移进程会导致CPU高速缓存失效,严重影响性能,所以在创建进程时,内核已经开始在CPU间负载均衡。
  在SMP系统上,周期性调度器函数scheduler_tick()完成前面的任务后,会调用trigger_load_balance()来检查是否要发起CPU间的负载均衡。这里我们不考虑启用动态时钟(即设置了CONFIG_NO_HZ选项)下的处理。如果当前的时间已经超过就绪队列中保存的下次均衡的时间,并且当前CPU在某个调度域内,则触发SCHED_SOFTIRQ软中断,适当的时候内核会调用run_rebalance_domains()函数在CPU间进行负载均衡。  调度域是一个CPU的集合,是负载均衡的单位,进程的迁移是在调度域内各CPU间进行,所以只有在当前CPU属于某个调度域的时候才发起进程迁移。通过调度域可以将邻近或共享高速缓存的CPU群集起来,优先选择在这些CPU之间迁移进程,这样可以降低迁移进程导致的性能损失。在普通的SMP系统上,所有的处理器都包含在一个调度域中。

2013年12月18日星期三

Linux CFS调度系统----进程优先级和权重

Linux CFS调度系统----进程优先级和权重     调度器的一般原理是,根据所能分配的计算能力,向系统中的每个进程提供最大的公平性。调度器分配的资源就是CPU的时间,尽量保证每个进程都获得相同的CPU时间。Linux的CFS调度系统不同于O(1)调度器,不需要时间片概念,至少不需要传统的时间片。CFS调度系统只考虑进程的等待时间,即进程在就绪队列中已经等待了多长时间。但是并非系统上的所有进程都同样重要,调度器也要保证一些重要的进程优先执行,或者要获得更多的CPU时间。对于应用层来说,进程的重要性就是通过优先级来标识的,而CFS调度器在计算进程的虚拟运行时间或者调度延迟时都是使用的权重,下面我们来看一下这两者是如何计算和转换的。
1. 进程优先级
    普通进程的优先级由task_struct结构中的prio、static_prio和normal_prio三个成员来描述。static_prio是静态优先级,默认值是在进程创建时从父进程继承过来的,可以使用nice()、sched_setscheduler()或者setpriority()修改。normal_prio是普通优先级,是根据进程的静态优先级和调度策略计算出来的优先级。prio是动态优先级,调度器会根据该成员表示的优先级来给进程分配CPU时间。由于在某些情况下会暂时提高进程的优先级,因此需要3个成员来表示进程的优先级,在提高进程优先级运行的持续时间中,普通和静态优先级的值是不变的。这三个成员的值越低,优先级越高。
    实时进程的优先级是由task_struct结构中的rt_priority成员来描述的,该成员保存的值不会代替上面的三个成员。该成员的值越大,优先级越高,实时进程的优先级计算方法和普通进程不同,后面我们会看到。
2. 进程优先级的计算
    进程创建时,在copy_process()中调用的dup_task_struct()会将父进程的所有内容都拷贝到子进程的task_struct结构实例中,所以初始时父子进程中所有描述优先级的成员的值都是相同的。在初始化完成后,会调用sched_fork()和调度系统交互,将子进程添加到调度系统中。在sched_fork()中会调整子进程的优先级,相关代码如下所示:
void sched_fork(struct task_struct *p, int clone_flags)
{
......
    /*
     * Revert to default priority/policy on fork if requested.
     */

    if (unlikely(p->sched_reset_on_fork)) {
        if (p->policy == SCHED_FIFO || p->policy == SCHED_RR) {
            p->policy = SCHED_NORMAL;
            p->normal_prio = p->static_prio;
        }

        if (PRIO_TO_NICE(p->static_prio) < 0) {
            p->static_prio = NICE_TO_PRIO(0);
            p->normal_prio = p->static_prio;
            set_load_weight(p);
        }

        /*
         * We don't need the reset flag anymore after the fork. It has
         * fulfilled its duty:
         */

        p->sched_reset_on_fork = 0;
    }

    /*
     * Make sure we do not leak PI boosting priority to the child.
     */

    p->prio = current->normal_prio;
......
}
    如果父进程通过sched_setscheduler()系统调用指定了SCHED_RESET_ON_FORK策略,父进程的sched_reset_on_fork成员的值是1,由于子进程的内容是从父进程拷贝而来的,子进程的sched_reset_on_fork的值也是1,所以这里使用的子进程中的sched_reset_on_fork来判断。同理,这里使用p获取的成员的值在没有修改前,都是和父进程一致的。
    如果sched_reset_on_fork成员设置,会重新设置子进程的优先级。如果父进程是实时进程,则会将子进程的调度策略重置为SCHED_NORMAL,即普通进程所属的调度策略。普通优先级normal_prio是根据调度策略和静态优先级static_prio计算出来的,如果父进程是实时进程,其normal_prio是根据实时策略计算的,而此处修改了子进程p的调度策略,所以要重新设置子进程的普通优先级normal_prio,普通进程的normal_prio和static_prio的值是一样的,参见effective_prio()。如果父进程的静态优先级高于DEFAULT_PRIO(值为120,对应的nice值为0),会将子进程的静态优先级重置为DEFAULT_PRIO。普通优先级和权重都是根据静态优先级计算的,所以如果修改了静态优先级,也重新计算普通优先级和权重。SCHED_RESET_ON_FORK策略只作用于主动设置的进程,不会传递到子进程,所以这里会将子进程的sched_reset_on_fork初始化为0。之所以引入这样的策略,主要是出于安全考虑,避免fork炸弹,关于SCHED_RESET_ON_FORK策略的更多信息,参见这里
    进程创建后,普通进程可以通过setpriority()和nice()系统调用来调整进程的优先级(不能用于实时进程),在内核中主要是由set_user_nice()来处理的,代码如下所示:
void set_user_nice(struct task_struct *p, long nice)
{
......
    if (TASK_NICE(p) == nice || nice < -20 || nice > 19)
        return;
......
    /*
     * The RT priorities are set via sched_setscheduler(), but we still
     * allow the 'normal' nice value to be set - but as expected
     * it wont have any effect on scheduling until the task is
     * SCHED_FIFO/SCHED_RR:
     */

    if (task_has_rt_policy(p)) {
        p->static_prio = NICE_TO_PRIO(nice);
        goto out_unlock;
    }
......
    p->static_prio = NICE_TO_PRIO(nice);
    set_load_weight(p);
    old_prio = p->prio;
    p->prio = effective_prio(p);
......
out_unlock:
    task_rq_unlock(rq, &flags);
}
    用户空间设置的时候使用的是nice值,nice值的范围是[-20,19],而内核使用的数值范围是[0,139],用来表示内部优先级。[0,99]的范围专供实时进程使用,nice值[-20,19]映射到[100,139]的范围,用于普通进程,实时进程的优先级总是比普通进程更高。这两种范围都是值越低,优先级越高。
    TASK_NICE宏将进程p的静态优先级转换为nice值,如果用户设置的nice值和当前进程的nice值相同,则直接返回。如果设置的nice不是在[-20,19]之间的返回,说明设置的值有误,直接返回。
    setpriority()和nice()系统调用不能用于实时进程,如果当前设置的进程是实时进程,会将指定的nice值对应的优先级设置到static_prio成员上,但是不会产生任何效果。如果是普通进程,设置的nice值会通过NICE_TO_PRIO宏转换为对应的内核优先级,然后设置到静态优先级static_prio。动态优先级和普通优先级都是在effective_prio()函数中计算的,动态优先级是effective_prio()的返回值,普通优先级是由effective_prio()中调用的normal_prio()返回的。如果是普通进程,并且没有将优先级提高到实时优先级,普通进程的普通优先级static_prio和动态优先级prio的值都是和静态优先级static_prio的值是一样的。
    实时进程的优先级调整要使用sched_setscheduler()系统调用来调整,该系统调用还可以设置进程的优先级。如果要将进程的调度策略设置为SCHED_NORMAL、SCHED_BATCH或者SCHED_IDLE,param参数中的sched_priority成员必须设置为0,否则内核会范围EINVAL错误,具体的描述参见man sched_setscheduler()。内核中是在__sched_setscheduler()函数中使用了一个非常巧妙的判断来做这个检查的,如下所示:
static int __sched_setscheduler(struct task_struct *p, int policy,
                struct sched_param *param, bool user)
{
......
    if (rt_policy(policy) != (param->sched_priority != 0))
        return -EINVAL;

    __setscheduler(rq, p, policy, param->sched_priority);
......
}
    真正的操作是在__setscheduler()中完成的,代码如下所示:
/* Actually do priority change: must hold rq lock. */
static void
__setscheduler(struct rq *rq, struct task_struct *p, int policy, int prio)
{
    BUG_ON(p->se.on_rq);

    p->policy = policy;
    switch (p->policy) {
    case SCHED_NORMAL:
    case SCHED_BATCH:
    case SCHED_IDLE:
        p->sched_class = &fair_sched_class;
        break;
    case SCHED_FIFO:
    case SCHED_RR:
        p->sched_class = &rt_sched_class;
        break;
    }

    p->rt_priority = prio;
    p->normal_prio = normal_prio(p);
    /* we are holding p->pi_lock already */
    p->prio = rt_mutex_getprio(p);
    set_load_weight(p);
}
    这里的操作也很简单,首先根据指定的调度策略来指定进程所属的调度类,然后将设置的优先级设置到实时优先级rt_priority成员上,并重新计算普通优先级和动态优先级。但是这里并没有设置static_prio,因为在将普通进程修改为实时进程时,并不会修改该成员。
3. 权重的计算
    我们前面看到每次在调整进程的优先级时都会调用set_load_weight()来计算进程的权重,因为这个函数是根据静态优先级或者实时优先级和调度策略来计算的,所以优先级的调整必然伴随着权重的计算。计算过程非常简单,如下所示:
static void set_load_weight(struct task_struct *p)
{
    if (task_has_rt_policy(p)) {
        p->se.load.weight = prio_to_weight[0] * 2;
        p->se.load.inv_weight = prio_to_wmult[0] >> 1;
        return;
    }

    /*
     * SCHED_IDLE tasks get minimal weight:
     */

    if (p->policy == SCHED_IDLE) {
        p->se.load.weight = WEIGHT_IDLEPRIO;
        p->se.load.inv_weight = WMULT_IDLEPRIO;
        return;
    }

    p->se.load.weight = prio_to_weight[p->static_prio - MAX_RT_PRIO];
    p->se.load.inv_weight = prio_to_wmult[p->static_prio - MAX_RT_PRIO];
}
    Linux中调度系统直接操作的对象是调度实体,由sched_entity结构描述,计算出来的权重值分别保存在sched_entity结构load成员中的weight和inv_weight,但是这两个成员是独立的,并不是要合成出一个权重值。这两个成员的值分别来自于prio_to_weight和prio_to_wmult两个数组,关于这两个数组的信息在代码注释和《深入Linux内核架构》中都有描述。prio_to_weight数组是根据根据nice值定义的,一般的概念是进程每降低一个nice值,则多获得10%的CPU时间,每升高一个nice值,则放弃10%的CPU时间。为执行该策略,内核通过这个数组将优先级转换为权重值。而prio_to_wult数组和prio_to_weight数组中相同索引的值的关系是prio_to_wult[i]=2^32 / prio_to_weight[i],之所以引入这个数组是避免在计算虚拟运行时间时执行除法。内核中使用delta_exec * (NICE_0_LOAD / weight)的公式来将实际时钟时间(delta_exec)转换为虚拟运行时间,通过prio_to_wult就可以将这个公式转换为(delta_exec * NICE_0_LOAD) * inv_weight >> 32,将除法很巧妙地转换为乘法和移位操作。

2013年12月16日星期一

Linux TCP协议栈中的预分配缓存

Linux TCP协议栈中的预分配缓存     TCP协议栈中的预分配缓存的大小由sock结构中的sk_forward_alloc成员描述,在创建套接字时该成员被初始化为0(参见sk_clone())。虽然这个成员在内核文档和《Linux内核源码剖析---TCP/IP实现中》都描述为预分配缓存的长度,但是在代码中并没有看到使用这个成员来预先分配一段内存,更多地是通过这个成员来控制TCP协议栈使用的内存。其实sk_forward_alloc更应该被描述为一个限额,就是限制当前套接字可以使用的内存数量。下面我们详细来看一下sk_forward_alloc在协议栈中具体的处理。
1、预分配缓存长度的计算
    在接收到数据包后,不管是放到接收队列还是乱序队列,都会调用tcp_try_rmem_schedule()来确认缓存是否有效,即是否允许缓存当前接收到的数据包。如果当前数据包的truesize(数据加sk_buff相关结构)小于sk_forward_alloc,则直接返回成功,表示可以缓存。truesize小于sk_forward_alloc表示已分配的限额还没有使用完成,所以就无需进一步确认。如果超过限额,则会调用__sk_mem_schedule()来重新计算k_forward_alloc,并进行全面的确认。
    在发送数据时,分配用于发送的SKB或者向sk_buff中拷贝数据都会调用sk_wmem_schedule()来确认发送缓存是否可用,即是否允许将当前的数据添加到发送队列中。和sk_rmem_schedule()类似,如果数据包的truesize小于sk_forward_alloc,则直接返回成功,否则会调用__sk_mem_schedule()来重新计算sk_forward_alloc,并进行全面的确认。
    不管是发送和接收数据包,都要通过sk_forward_alloc来查看是否可以将数据添加到缓冲区。sk_forward_alloc是在__sk_mem_schedule()中计算的,相关代码如下所示:
int __sk_mem_schedule(struct sock *sk, int size, int kind)
{
    struct proto *prot = sk->sk_prot;
    int amt = sk_mem_pages(size);
    int allocated;

    sk->sk_forward_alloc += amt * SK_MEM_QUANTUM;
    allocated = atomic_add_return(amt, prot->memory_allocated);
......
}
    上面的代码中,sk_mem_pages根据待确认的缓存长度size,向上取整获得所占用内存的页面数,所以amt必定大于等于1。SK_MEM_QUANTUM的值即为PAGE_SIZE,所以sk_forward_alloc的值在没有分配配额时为PAGE_SIZE的整数倍。如果是TCP协议,prot->memory_allocated指向全局变量tcp_memory_allocated,保存的是TCP协议栈所有预分配缓存对应的页面数(分配和释放配额时,sk_forward_alloc会变化),注意,并不是说TCP协议栈所有套接字的缓冲区占用的内存数就是tcp_memory_allocated对应的内存总和,它只是一个标识,协议栈并没有真的预先分配出了这么多内存供套接字使用。这里计算完成后,如果缓存确认失败,会取消这里的操作,缓存的确认后面再讲。
2、预分配缓存的分配和释放
    注意,这里的分配和释放并不是操作内存,而是操作预分配缓存的长度。预分配缓存的分配和释放分配是由sk_mem_charge()和sk_mem_uncharge()实现,这两个函数做的工作主要是在sk_forward_alloc上加上或减去当前skb的truesize或者拷贝数据的长度。
    接收数据时,如果数据包可以缓存,则会调用skb_set_owner_r()设置当前SKB所属的传输控制块,该函数会更新接收队列中缓存数据的长度,并且调用sk_mem_charge()来分配配额。前面已经调用过tcp_try_rmem_schedule()来确认缓存是否有效,所以如果执行到skb_set_owner_r(),分配配额肯定是可以成功的。在skb_set_owner_r()中还会设置SKB包的destructor接口为sock_rfree()函数,在销毁SKB包时会调用该接口。sock_rfree()中会调用sk_mem_uncharge()释放当前SKB占用的配额。
    发送数据时,配额的分配是在将创建的SKB添加到发送队列或者向SKB中拷贝数据的时候,这是直接调用sk_mem_charge()来完成的,并不是通过skb_set_owner_w()。skb_set_owner_r()操作的SKB通常是从接收队列中的SKB克隆出来的,在发送到下层协议栈后就会释放,这些SKB占用的内存并没有算到TCP协议栈的预分配缓存中。发送队列中的SKB占用的配额是在接收到对端的确认、发送超时调用sk_mem_uncharge()释放占用的配额。
    除了单个SKB数据包占用的预分配缓存的释放,在断开连接、释放传输控制块或者关闭套接字等情况下会调用sk_mem_reclaim()来释放为套接字的预分配缓存。sk_mem_reclaim()中只有在sk_forward_alloc大于SK_MEM_QUANTUM(即PAGE_SIZE)时才调用__sk_mem_reclain()进行真正的缓存回收,因为__sk_mem_reclain()中更新sk_forward_alloc和prot->memory_allocated是按照SK_MEM_QUANTUM进行的,如果sk_forward_alloc小于SK_MEM_QUANTUM,这两个成员并不会更新,代码如下所示:
void __sk_mem_reclaim(struct sock *sk)
{
    struct proto *prot = sk->sk_prot;

    atomic_sub(sk->sk_forward_alloc >> SK_MEM_QUANTUM_SHIFT,
           prot->memory_allocated);
    sk->sk_forward_alloc &= SK_MEM_QUANTUM - 1;
.......
}
    我们前面讲到过,sk_forward_alloc在计算时总是SK_MEM_QUANTUM的整数倍,如果套接字占用的预分配缓存都释放的话,sk_forward_alloc的值应该为0。如果在释放传输控制块的时候,sk_forward_alloc的值不为0,说明内核中某处可能有内存泄漏或者没有正确地更新预分配缓存的长度。
3、预分配缓存的确认
    如果预分配缓存已经用完,则调用__sk_mem_schedule()增加预分配缓存,该函数会首先计算根据新增加的数据长度计算出要新分配的页面数,然后更新到sk_forward_alloc和prot->memory_allocated。如果缓存确认失败,则撤销更新操作。
    缓存确认是否成功和sysctl_tcp_mem、sysctl_tcp_rmem、sysctl_tcp_wmem三个系统参数有关,满足下列情况之一,缓存确认会成功:
    1)所有预分配缓存(更新后的值,下同)低于低水平线,即sysctl_tcp_mem[0]的值
    2)所有预分配缓存高于低水平线,低于硬性限制线,确认的是接收缓存,并且接收队列中的数据总长度小于接收缓冲区的长度上限(即sysctl_tcp_rmem[0])
    3)所有预分配缓存高于低水平线,低于硬性限制线,确认的是发送缓存,如果套接字类型是SOCK_STREAM(tcp或者sctp等),发送队列中数据总长度小于发送缓冲区的长度上限(即sysctl_tcp_wmem[0])
    4)所有预分配缓存高于低水平线,低于硬性限制线,确认的是发送缓存,如果套接字类型不是SOCK_STREAM,为发送而分配的所有SKB数据区的总长度小于发送缓冲区的长度上限(即sysctl_tcp_wmem[0])
    5)不满足以上条件,没有进入警告状态,或者当前套接字发送队列中所有段数据总长度、接收队列中所有段数据总长度、预分配缓存长度(即sk_forward_alloc)三者之和乘以当前系统中套接字数量得到的值小于硬性限制线
    6)所有预分配缓存高于硬性限制线,确认的是发送缓存,并且套接字类型是SOCK_STREAM,会调用sk_stream_moderate_sndbuf()调整套接字的发送缓冲区。只有用户没有通过套接字选项设置发送缓冲区大小的情况下才会调整,调整为发送队列占用内存的一半,但是要大于等于2048。如果发送队列占用的内存大小和待确认长度之和大于调整后的发送缓冲区,则确认成功。在这种情况下,把size再添加到发送队列中,这样发送队列占用的内存容量就会超过发送缓冲区,在分配sk_buff前调用tcp_memory_free()就会返回失败,这样就阻止内存的占用。