99re热这里只有精品视频,7777色鬼xxxx欧美色妇,国产成人精品一区二三区在线观看,内射爽无广熟女亚洲,精品人妻av一区二区三区

6.2. 阻塞 I/O

2018-02-24 15:49 更新

6.2.?阻塞 I/O

回顧第 3 章, 我們看到如何實現(xiàn) read 和 write 方法. 在此, 但是, 我們跳過了一個重要的問題:一個驅(qū)動當它無法立刻滿足請求應當如何響應? 一個對 read 的調(diào)用可能當沒有數(shù)據(jù)時到來, 而以后會期待更多的數(shù)據(jù). 或者一個進程可能試圖寫, 但是你的設備沒有準備好接受數(shù)據(jù), 因為你的輸出緩沖滿了. 調(diào)用進程往往不關心這種問題; 程序員只希望調(diào)用 read 或 write 并且使調(diào)用返回, 在必要的工作已完成后. 這樣, 在這樣的情形中, 你的驅(qū)動應當(缺省地)阻塞進程, 使它進入睡眠直到請求可繼續(xù).

本節(jié)展示如何使一個進程睡眠并且之后再次喚醒它. 如常, 但是, 我們必須首先解釋幾個概念.

6.2.1.?睡眠的介紹

對于一個進程"睡眠"意味著什么? 當一個進程被置為睡眠, 它被標識為處于一個特殊的狀態(tài)并且從調(diào)度器的運行隊列中去除. 直到發(fā)生某些事情改變了那個狀態(tài), 這個進程將不被在任何 CPU 上調(diào)度, 并且, 因此, 將不會運行. 一個睡著的進程已被擱置到系統(tǒng)的一邊, 等待以后發(fā)生事件.

對于一個 Linux 驅(qū)動使一個進程睡眠是一個容易做的事情. 但是, 有幾個規(guī)則必須記住以安全的方式編碼睡眠.

這些規(guī)則的第一個是: 當你運行在原子上下文時不能睡眠. 我們在第 5 章介紹過原子操作; 一個原子上下文只是一個狀態(tài), 這里多個步驟必須在沒有任何類型的并發(fā)存取的情況下進行. 這意味著, 對于睡眠, 是你的驅(qū)動在持有一個自旋鎖, seqlock, 或者 RCU 鎖時不能睡眠. 如果你已關閉中斷你也不能睡眠. 在持有一個旗標時睡眠是合法的, 但是你應當仔細查看這樣做的任何代碼. 如果代碼在持有一個旗標時睡眠, 任何其他的等待這個旗標的線程也睡眠. 因此發(fā)生在持有旗標時的任何睡眠應當短暫, 并且你應當說服自己, 由于持有這個旗標, 你不能阻塞這個將最終喚醒你的進程.

另一件要記住的事情是, 當你醒來, 你從不知道你的進程離開 CPU 多長時間或者同時已經(jīng)發(fā)生了什么改變. 你也常常不知道是否另一個進程已經(jīng)睡眠等待同一個事件; 那個進程可能在你之前醒來并且獲取了你在等待的資源. 結(jié)果是你不能關于你醒后的系統(tǒng)狀態(tài)做任何的假設, 并且你必須檢查來確保你在等待的條件是, 確實, 真的.

一個另外的相關的點, 當然, 是你的進程不能睡眠除非確信其他人, 在某處的, 將喚醒它. 做喚醒工作的代碼必須也能夠找到你的進程來做它的工作. 確保一個喚醒發(fā)生, 是深入考慮你的代碼和對于每次睡眠, 確切知道什么系列的事件將結(jié)束那次睡眠. 使你的進程可能被找到, 真正地, 通過一個稱為等待隊列的數(shù)據(jù)結(jié)構(gòu)實現(xiàn)的. 一個等待隊列就是它聽起來的樣子:一個進程列表, 都等待一個特定的事件.

在 Linux 中, 一個等待隊列由一個"等待隊列頭"來管理, 一個 wait_queue_head_t 類型的結(jié)構(gòu), 定義在<linux/wait.h>中. 一個等待隊列頭可被定義和初始化, 使用:


DECLARE_WAIT_QUEUE_HEAD(name); 

或者動態(tài)地, 如下:


wait_queue_head_t my_queue;
init_waitqueue_head(&my_queue);

我們將很快返回到等待隊列結(jié)構(gòu), 但是我們知道了足夠多的來首先看看睡眠和喚醒.

6.2.2.?簡單睡眠

當一個進程睡眠, 它這樣做以期望某些條件在以后會成真. 如我們之前注意到的, 任何睡眠的進程必須在它再次醒來時檢查來確保它在等待的條件真正為真. Linux 內(nèi)核中睡眠的最簡單方式是一個宏定義, 稱為 wait_event(有幾個變體); 它結(jié)合了處理睡眠的細節(jié)和進程在等待的條件的檢查. wait_event 的形式是:


wait_event(queue, condition)
wait_event_interruptible(queue, condition)
wait_event_timeout(queue, condition, timeout)
wait_event_interruptible_timeout(queue, condition, timeout)

在所有上面的形式中, queue 是要用的等待隊列頭. 注意它是"通過值"傳遞的. 條件是一個被這個宏在睡眠前后所求值的任意的布爾表達式; 直到條件求值為真值, 進程繼續(xù)睡眠. 注意條件可能被任意次地求值, 因此它不應當有任何邊界效應.

如果你使用 wait_event, 你的進程被置為不可中斷地睡眠, 如同我們之前已經(jīng)提到的, 它常常不是你所要的. 首選的選擇是 wait_event_interruptible, 它可能被信號中斷. 這個版本返回一個你應當檢查的整數(shù)值; 一個非零值意味著你的睡眠被某些信號打斷, 并且你的驅(qū)動可能應當返回 -ERESTARTSYS. 最后的版本(wait_event_timeout 和 wait_event_interruptible_timeout)等待一段有限的時間; 在這個時間期間(以嘀噠數(shù)表達的, 我們將在第 7 章討論)超時后, 這個宏返回一個 0 值而不管條件是如何求值的.

圖片的另一半, 當然, 是喚醒. 一些其他的執(zhí)行線程(一個不同的進程, 或者一個中斷處理, 也許)必須為你進行喚醒, 因為你的進程, 當然, 是在睡眠. 基本的喚醒睡眠進程的函數(shù)稱為 wake_up. 它有幾個形式(但是我們現(xiàn)在只看其中 2 個):


void wake_up(wait_queue_head_t *queue);
void wake_up_interruptible(wait_queue_head_t *queue);

wake_up 喚醒所有的在給定隊列上等待的進程(盡管這個情形比那個要復雜一些, 如同我們之后將見到的). 其他的形式(wake_up_interruptible)限制它自己到處理一個可中斷的睡眠. 通常, 這 2 個是不用區(qū)分的(如果你使用可中斷的睡眠); 實際上, 慣例是使用 wake_up 如果你在使用 wait_event , wake_up_interruptible 如果你在使用 wait_event_interruptible.

我們現(xiàn)在知道足夠多來看一個簡單的睡眠和喚醒的例子. 在這個例子代碼中, 你可找到一個稱為 sleepy 的模塊. 它實現(xiàn)一個有簡單行為的設備:任何試圖從這個設備讀取的進程都被置為睡眠. 無論何時一個進程寫這個設備, 所有的睡眠進程被喚醒. 這個行為由下面的 read 和 write 方法實現(xiàn):


static DECLARE_WAIT_QUEUE_HEAD(wq);
static int flag = 0;

ssize_t sleepy_read (struct file *filp, char __user *buf, size_t count, loff_t *pos)
{
        printk(KERN_DEBUG "process %i (%s) going to sleep\n",
               current->pid, current->comm);
        wait_event_interruptible(wq, flag != 0);
        flag = 0;
        printk(KERN_DEBUG "awoken %i (%s)\n", current->pid, current->comm);
        return 0; /* EOF */
}
ssize_t sleepy_write (struct file *filp, const char __user *buf, size_t count, loff_t *pos)
{
        printk(KERN_DEBUG "process %i (%s) awakening the readers...\n",
               current->pid, current->comm);
        flag = 1;
        wake_up_interruptible(&wq);
        return count; /* succeed, to avoid retrial */

}

注意這個例子里 flag 變量的使用. 因為 wait_event_interruptible 檢查一個必須變?yōu)檎娴臈l件, 我們使用 flag 來創(chuàng)建那個條件.

有趣的是考慮當 sleepy_write 被調(diào)用時如果有 2 個進程在等待會發(fā)生什么. 因為 sleepy_read 重置 flag 為 0 一旦它醒來, 你可能認為醒來的第 2 個進程會立刻回到睡眠. 在一個單處理器系統(tǒng), 這幾乎一直是發(fā)生的事情. 但是重要的是要理解為什么你不能依賴這個行為. wake_up_interruptible 調(diào)用將使 2 個睡眠進程醒來. 完全可能它們都注意到 flag 是非零, 在另一個有機會重置它之前. 對于這個小模塊, 這個競爭條件是不重要的. 在一個真實的驅(qū)動中, 這種競爭可能導致少見的難于查找的崩潰. 如果正確的操作要求只能有一個進程看到這個非零值, 它將必須以原子的方式被測試. 我們將見到一個真正的驅(qū)動如何處理這樣的情況. 但首先我們必須開始另一個主題.

6.2.3.?阻塞和非阻塞操作

在我們看全功能的 read 和 write 方法的實現(xiàn)之前, 我們觸及的最后一點是決定何時使進程睡眠. 有時實現(xiàn)正確的 unix 語義要求一個操作不阻塞, 即便它不能完全地進行下去.

有時還有調(diào)用進程通知你他不想阻塞, 不管它的 I/O 是否繼續(xù). 明確的非阻塞 I/O 由 filp->f_flags 中的 O_NONBLOCK 標志來指示. 這個標志定義于 <linux/fcntl.h>, 被 <linux/fs.h>自動包含. 這個標志得名自"打開-非阻塞", 因為它可在打開時指定(并且起初只能在那里指定). 如果你瀏覽源碼, 你會發(fā)現(xiàn)一些對一個 O_NDELAY 標志的引用; 這是一個替代 O_NONBLOCK 的名子, 為兼容 System V 代碼而被接受的. 這個標志缺省地被清除, 因為一個等待數(shù)據(jù)的進程的正常行為僅僅是睡眠. 在一個阻塞操作的情況下, 這是缺省地, 下列的行為應當實現(xiàn)來符合標準語法:

  • 如果一個進程調(diào)用 read 但是沒有數(shù)據(jù)可用(尚未), 這個進程必須阻塞. 這個進程在有數(shù)據(jù)達到時被立刻喚醒, 并且那個數(shù)據(jù)被返回給調(diào)用者, 即便小于在給方法的 count 參數(shù)中請求的數(shù)量.

  • 如果一個進程調(diào)用 write 并且在緩沖中沒有空間, 這個進程必須阻塞, 并且它必須在一個與用作 read 的不同的等待隊列中. 當一些數(shù)據(jù)被寫入硬件設備, 并且在輸出緩沖中的空間變空閑, 這個進程被喚醒并且寫調(diào)用成功, 盡管數(shù)據(jù)可能只被部分寫入如果在緩沖只沒有空間給被請求的 count 字節(jié).

這 2 句都假定有輸入和輸出緩沖; 實際上, 幾乎每個設備驅(qū)動都有. 要求有輸入緩沖是為了避免丟失到達的數(shù)據(jù), 當無人在讀時. 相反, 數(shù)據(jù)在寫時不能丟失, 因為如果系統(tǒng)調(diào)用不能接收數(shù)據(jù)字節(jié), 它們保留在用戶空間緩沖. 即便如此, 輸出緩沖幾乎一直有用, 對于從硬件擠出更多的性能.

在驅(qū)動中實現(xiàn)輸出緩沖所獲得的性能來自減少了上下文切換和用戶級/內(nèi)核級切換的次數(shù). 沒有一個輸出緩沖(假定一個慢速設備), 每次系統(tǒng)調(diào)用接收這樣一個或幾個字符, 并且當一個進程在 write 中睡眠, 另一個進程運行(那是一次上下文切換). 當?shù)谝粋€進程被喚醒, 它恢復(另一次上下文切換), 寫返回(內(nèi)核/用戶轉(zhuǎn)換), 并且這個進程重新發(fā)出系統(tǒng)調(diào)用來寫入更多的數(shù)據(jù)(用戶/內(nèi)核轉(zhuǎn)換); 這個調(diào)用阻塞并且循環(huán)繼續(xù). 增加一個輸出緩沖可允許驅(qū)動在每個寫調(diào)用中接收大的數(shù)據(jù)塊, 性能上有相應的提高. 如果這個緩沖足夠大, 寫調(diào)用在第一次嘗試就成功 -- 被緩沖的數(shù)據(jù)之后將被推到設備 -- 不必控制需要返回用戶空間來第二次或者第三次寫調(diào)用. 選擇一個合適的值給輸出緩沖顯然是設備特定的.

我們不使用一個輸入緩沖在 scull中, 因為數(shù)據(jù)當發(fā)出 read 時已經(jīng)可用. 類似的, 不用輸出緩沖, 因為數(shù)據(jù)被簡單地拷貝到和設備關聯(lián)的內(nèi)存區(qū). 本質(zhì)上, 這個設備是一個緩沖, 因此額外緩沖的實現(xiàn)可能是多余的. 我們將在第 10 章見到緩沖的使用.

如果指定 O_NONBLOCK, read 和 write 的行為是不同的. 在這個情況下, 這個調(diào)用簡單地返回 -EAGAIN(("try it agin")如果一個進程當沒有數(shù)據(jù)可用時調(diào)用 read , 或者如果當緩沖中沒有空間時它調(diào)用 write .

如你可能期望的, 非阻塞操作立刻返回, 允許這個應用程序輪詢數(shù)據(jù). 應用程序當使用 stdio 函數(shù)處理非阻塞文件中, 必須小心, 因為它們?nèi)菀赘沐e一個的非阻塞返回為 EOF. 它們始終必須檢查 errno.

自然地, O_NONBLOCK 也在 open 方法中有意義. 這個發(fā)生在當這個調(diào)用真正阻塞長時間時; 例如, 當打開(為讀存取)一個 沒有寫者的(尚無)FIFO, 或者存取一個磁盤文件使用一個懸掛鎖. 常常地, 打開一個設備或者成功或者失敗, 沒有必要等待外部的事件. 有時, 但是, 打開這個設備需要一個長的初始化, 并且你可能選擇在你的 open 方法中支持 O_NONBLOCK , 通過立刻返回 -EAGAIN,如果這個標志被設置. 在開始這個設備的初始化進程之后. 這個驅(qū)動可能還實現(xiàn)一個阻塞 open 來支持存取策略, 通過類似于文件鎖的方式. 我們將見到這樣一個實現(xiàn)在"阻塞 open 作為對 EBUSY 的替代"一節(jié), 在本章后面.

一些驅(qū)動可能還實現(xiàn)特別的語義給 O_NONBLOCK; 例如, 一個磁帶設備的 open 常常阻塞直到插入一個磁帶. 如果這個磁帶驅(qū)動器使用 O_NONBLOCK 打開, 這個 open 立刻成功, 不管是否介質(zhì)在或不在.

只有 read, write, 和 open 文件操作受到非阻塞標志影響.

6.2.4.?一個阻塞 I/O 的例子

最后, 我們看一個實現(xiàn)了阻塞 I/O 的真實驅(qū)動方法的例子. 這個例子來自 scullpipe 驅(qū)動; 它是 scull 的一個特殊形式, 實現(xiàn)了一個象管道的設備.

在驅(qū)動中, 一個阻塞在讀調(diào)用上的進程被喚醒, 當數(shù)據(jù)到達時; 常常地硬件發(fā)出一個中斷來指示這樣一個事件, 并且驅(qū)動喚醒等待的進程作為處理這個中斷的一部分. scullpipe 驅(qū)動不同, 以至于它可運行而不需要任何特殊的硬件或者一個中斷處理. 我們選擇來使用另一個進程來產(chǎn)生數(shù)據(jù)并喚醒讀進程; 類似地, 讀進程被用來喚醒正在等待緩沖空間可用的寫者進程.

這個設備驅(qū)動使用一個設備結(jié)構(gòu), 它包含 2 個等待隊列和一個緩沖. 緩沖大小是以常用的方法可配置的(在編譯時間, 加載時間, 或者運行時間).


struct scull_pipe
{
        wait_queue_head_t inq, outq; /* read and write queues */
        char *buffer, *end; /* begin of buf, end of buf */
        int buffersize; /* used in pointer arithmetic */
        char *rp, *wp; /* where to read, where to write */
        int nreaders, nwriters; /* number of openings for r/w */
        struct fasync_struct *async_queue; /* asynchronous readers */
        struct semaphore sem;  /* mutual exclusion semaphore */
        struct cdev cdev;  /* Char device structure */
};

讀實現(xiàn)既管理阻塞也管理非阻塞輸入, 看來如此:


static ssize_t scull_p_read (struct file *filp, char __user *buf, size_t count, loff_t *f_pos)
{
        struct scull_pipe *dev = filp->private_data;
        if (down_interruptible(&dev->sem))
                return -ERESTARTSYS;

        while (dev->rp == dev->wp)
        { /* nothing to read */
                up(&dev->sem); /* release the lock */
                if (filp->f_flags & O_NONBLOCK)

                        return -EAGAIN;
                PDEBUG("\"%s\" reading: going to sleep\n", current->comm);
                if (wait_event_interruptible(dev->inq, (dev->rp != dev->wp)))
                        return -ERESTARTSYS; /* signal: tell the fs layer to handle it */ /* otherwise loop, but first reacquire the lock */
                if (down_interruptible(&dev->sem))
                        return -ERESTARTSYS;
        }
        /* ok, data is there, return something */

        if (dev->wp > dev->rp)
                count = min(count, (size_t)(dev->wp - dev->rp));
        else /* the write pointer has wrapped, return data up to dev->end */
                count = min(count, (size_t)(dev->end - dev->rp));
        if (copy_to_user(buf, dev->rp, count))
        {
                up (&dev->sem);
                return -EFAULT;
        }
        dev->rp += count;
        if (dev->rp == dev->end)

                dev->rp = dev->buffer; /* wrapped */
        up (&dev->sem);

        /* finally, awake any writers and return */
        wake_up_interruptible(&dev->outq);
        PDEBUG("\"%s\" did read %li bytes\n",current->comm, (long)count);
        return count;
}

如同你可見的, 我們在代碼中留有一些 PDEBUG 語句. 當你編譯這個驅(qū)動, 你可使能消息機制來易于跟隨不同進程間的交互.

讓我們仔細看看 scull_p_read 如何處理對數(shù)據(jù)的等待. 這個 while 循環(huán)在持有設備旗標下測試這個緩沖. 如果有數(shù)據(jù)在那里, 我們知道我們可立刻返回給用戶, 不必睡眠, 因此整個循環(huán)被跳過. 相反, 如果這個緩沖是空的, 我們必須睡眠. 但是在我們可做這個之前, 我們必須丟掉設備旗標; 如果我們要持有它而睡眠, 就不會有寫者有機會喚醒我們. 一旦這個確保被丟掉, 我們做一個快速檢查來看是否用戶已請求非阻塞 I/O, 并且如果是這樣就返回. 否則, 是時間調(diào)用 wait_event_interruptible.

一旦我們過了這個調(diào)用, 某些東東已經(jīng)喚醒了我們, 但是我們不知道是什么. 一個可能是進程接收到了一個信號. 包含 wait_event_interruptible 調(diào)用的這個 if 語句檢查這種情況. 這個語句保證了正確的和被期望的對信號的反應, 它可能負責喚醒這個進程(因為我們處于一個可中斷的睡眠). 如果一個信號已經(jīng)到達并且它沒有被這個進程阻塞, 正確的做法是讓內(nèi)核的上層處理這個事件. 到此, 這個驅(qū)動返回 -ERESTARTSYS 到調(diào)用者; 這個值被虛擬文件系統(tǒng)(VFS)在內(nèi)部使用, 它或者重啟系統(tǒng)調(diào)用或者返回 -EINTR 給用戶空間. 我們使用相同類型的檢查來處理信號, 給每個讀和寫實現(xiàn).

但是, 即便沒有一個信號, 我們還是不確切知道有數(shù)據(jù)在那里為獲取. 其他人也可能已經(jīng)在等待數(shù)據(jù), 并且它們可能贏得競爭并且首先得到數(shù)據(jù). 因此我們必須再次獲取設備旗標; 只有這時我們才可以測試讀緩沖(在 while 循環(huán)中)并且真正知道我們可以返回緩沖中的數(shù)據(jù)給用戶. 全部這個代碼的最終結(jié)果是, 當我們從 while 循環(huán)中退出時, 我們知道旗標被獲得并且緩沖中有數(shù)據(jù)我們可以用.

僅僅為了完整, 我們要注意, scull_p_read 可以在另一個地方睡眠, 在我們獲得設備旗標之后: 對 copy_to_user 的調(diào)用. 如果 scull 當在內(nèi)核和用戶空間之間拷貝數(shù)據(jù)時睡眠, 它在持有設備旗標中睡眠. 在這種情況下持有旗標是合理的因為它不能死鎖系統(tǒng)(我們知道內(nèi)核將進行拷貝到用戶空間并且在不加鎖進程中的同一個旗標下喚醒我們), 并且因為重要的是設備內(nèi)存數(shù)組在驅(qū)動睡眠時不改變.

6.2.5.?高級睡眠

許多驅(qū)動能夠滿足它們的睡眠要求, 使用至今我們已涉及到的函數(shù). 但是, 有時需要深入理解 Linux 等待隊列機制如何工作. 復雜的加鎖或者性能需要可強制一個驅(qū)動來使用低層函數(shù)來影響一個睡眠. 在本節(jié), 我們在低層看而理解在一個進程睡眠時發(fā)生了什么.

6.2.5.1.?一個進程如何睡眠

如果我們深入 <linux/wait.h>, 你見到在 wait_queue_head_t 類型后面的數(shù)據(jù)結(jié)構(gòu)是非常簡單的; 它包含一個自旋鎖和一個鏈表. 這個鏈表是一個等待隊列入口, 它被聲明做 wait_queue_t. 這個結(jié)構(gòu)包含關于睡眠進程的信息和它想怎樣被喚醒.

使一個進程睡眠的第一步常常是分配和初始化一個 wait_queue_t 結(jié)構(gòu), 隨后將其添加到正確的等待隊列. 當所有東西都就位了, 負責喚醒工作的人就可以找到正確的進程.

下一步是設置進程的狀態(tài)來標志它為睡眠. 在 <linux/sched.h> 中定義有幾個任務狀態(tài). TASK_RUNNING 意思是進程能夠運行, 盡管不必在任何特定的時刻在處理器上運行. 有 2 個狀態(tài)指示一個進程是在睡眠: TASK_INTERRUPTIBLE 和 TASK_UNTINTERRUPTIBLE; 當然, 它們對應 2 類的睡眠. 其他的狀態(tài)正常地和驅(qū)動編寫者無關.

在 2.6 內(nèi)核, 對于驅(qū)動代碼通常不需要直接操作進程狀態(tài). 但是, 如果你需要這樣做, 使用的代碼是:


void set_current_state(int new_state); 

在老的代碼中, 你常常見到如此的東西:


current->state = TASK_INTERRUPTIBLE; 

但是象這樣直接改變 current 是不鼓勵的; 當數(shù)據(jù)結(jié)構(gòu)改變時這樣的代碼會輕易地失效. 但是, 上面的代碼確實展示了自己改變一個進程的當前狀態(tài)不能使其睡眠. 通過改變 current 狀態(tài), 你已改變了調(diào)度器對待進程的方式, 但是你還未讓出處理器.

放棄處理器是最后一步, 但是要首先做一件事: 你必須先檢查你在睡眠的條件. 做這個檢查失敗會引入一個競爭條件; 如果在你忙于上面的這個過程并且有其他的線程剛剛試圖喚醒你, 如果這個條件變?yōu)檎鏁l(fā)生什么? 你可能錯過喚醒并且睡眠超過你預想的時間. 因此, 在睡眠的代碼下面, 典型地你會見到下面的代碼:


if (!condition)
    schedule();

通過在設置了進程狀態(tài)后檢查我們的條件, 我們涵蓋了所有的可能的事件進展. 如果我們在等待的條件已經(jīng)在設置進程狀態(tài)之前到來, 我們在這個檢查中注意到并且不真正地睡眠. 如果之后發(fā)生了喚醒, 進程被置為可運行的不管是否我們已真正進入睡眠.

調(diào)用 schedule , 當然, 是引用調(diào)度器和讓出 CPU 的方式. 無論何時你調(diào)用這個函數(shù), 你是在告訴內(nèi)核來考慮應當運行哪個進程并且轉(zhuǎn)換控制到那個進程, 如果必要. 因此你從不知道在 schedule 返回到你的代碼會是多長時間.

在 if 測試和可能的調(diào)用 schedule (并從其返回)之后, 有些清理工作要做. 因為這個代碼不再想睡眠, 它必須保證任務狀態(tài)被重置為 TASK_RUNNING. 如果代碼只是從 schedule 返回, 這一步是不必要的; 那個函數(shù)不會返回直到進程處于可運行態(tài). 如果由于不再需要睡眠而對 schedule 的調(diào)用被跳過, 進程狀態(tài)將不正確. 還有必要從等待隊列中去除這個進程, 否則它可能被多次喚醒.

6.2.5.2.?手動睡眠

在 Linux 內(nèi)核的之前的版本, 正式的睡眠要求程序員手動處理所有上面的步驟. 它是一個繁瑣的過程, 包含相當多的易出錯的樣板式的代碼. 程序員如果愿意還是可能用那種方式手動睡眠; <linux/sched.h> 包含了所有需要的定義, 以及圍繞例子的內(nèi)核源碼. 但是, 有一個更容易的方式.

第一步是創(chuàng)建和初始化一個等待隊列. 這常常由這個宏定義完成:


DEFINE_WAIT(my_wait); 

其中, name 是等待隊列入口項的名子. 你可用 2 步來做:


wait_queue_t my_wait;
init_wait(&my_wait);

但是常常更容易的做法是放一個 DEFINE_WAIT 行在循環(huán)的頂部, 來實現(xiàn)你的睡眠.

下一步是添加你的等待隊列入口到隊列, 并且設置進程狀態(tài). 2 個任務都由這個函數(shù)處理:


void prepare_to_wait(wait_queue_head_t *queue, wait_queue_t *wait, int state); 

這里, queue 和 wait 分別地是等待隊列頭和進程入口. state 是進程的新狀態(tài); 它應當或者是 TASK_INTERRUPTIBLE(給可中斷的睡眠, 這常常是你所要的)或者 TASK_UNINTERRUPTIBLE(給不可中斷睡眠).

在調(diào)用 prepare_to_wait 之后, 進程可調(diào)用 schedule -- 在它已檢查確認它仍然需要等待之后. 一旦 schedule 返回, 就到了清理時間. 這個任務, 也, 被一個特殊的函數(shù)處理:


void finish_wait(wait_queue_head_t *queue, wait_queue_t *wait); 

之后, 你的代碼可測試它的狀態(tài)并且看是否它需要再次等待.

我們早該需要一個例子了. 之前我們看了 給 scullpipe 的 read 方法, 它使用 wait_event. 同一個驅(qū)動中的 write 方法使用 prepare_to_wait 和 finish_wait 來實現(xiàn)它的等待. 正常地, 你不會在一個驅(qū)動中象這樣混用各種方法, 但是我們這樣作是為了能夠展示 2 種處理睡眠的方式.

為完整起見, 首先, 我們看 write 方法本身:


/* How much space is free? */
static int spacefree(struct scull_pipe *dev)
{

        if (dev->rp == dev->wp)
                return dev->buffersize - 1;
        return ((dev->rp + dev->buffersize - dev->wp) % dev->buffersize) - 1;
}

static ssize_t scull_p_write(struct file *filp, const char __user *buf, size_t count,
                             loff_t *f_pos)
{

        struct scull_pipe *dev = filp->private_data;
        int result;
        if (down_interruptible(&dev->sem))
                return -ERESTARTSYS;

        /* Make sure there's space to write */
        result = scull_getwritespace(dev, filp);
        if (result)
                return result; /* scull_getwritespace called up(&dev->sem) */
        /* ok, space is there, accept something */
        count = min(count, (size_t)spacefree(dev));
        if (dev->wp >= dev->rp)
                count = min(count, (size_t)(dev->end - dev->wp)); /* to end-of-buf */
        else /* the write pointer has wrapped, fill up to rp-1 */
                count = min(count, (size_t)(dev->rp - dev->wp - 1));
        PDEBUG("Going to accept %li bytes to %p from %p\n", (long)count, dev->wp, buf);
        if (copy_from_user(dev->wp, buf, count))
        {
                up (&dev->sem);
                return -EFAULT;
        }
        dev->wp += count;
        if (dev->wp == dev->end)
                dev->wp = dev->buffer; /* wrapped */
        up(&dev->sem);

        /* finally, awake any reader */
        wake_up_interruptible(&dev->inq); /* blocked in read() and select() */

        /* and signal asynchronous readers, explained late in chapter 5 */
        if (dev->async_queue)
                kill_fasync(&dev->async_queue, SIGIO, POLL_IN);
        PDEBUG("\"%s\" did write %li bytes\n",current->comm, (long)count);
        return count;
}

這個代碼看來和 read 方法類似, 除了我們已經(jīng)將睡眠代碼放到了一個單獨的函數(shù), 稱為 scull_getwritespace. 它的工作是確保在緩沖中有空間給新的數(shù)據(jù), 睡眠直到有空間可用. 一旦空間在, scull_p_write 可簡單地拷貝用戶的數(shù)據(jù)到那里, 調(diào)整指針, 并且喚醒可能已經(jīng)在等待讀取數(shù)據(jù)的進程.

處理實際的睡眠的代碼是:


/* Wait for space for writing; caller must hold device semaphore. On
 * error the semaphore will be released before returning. */
static int scull_getwritespace(struct scull_pipe *dev, struct file *filp)
{

        while (spacefree(dev) == 0)
        { /* full */
                DEFINE_WAIT(wait);

                up(&dev->sem);
                if (filp->f_flags & O_NONBLOCK)
                        return -EAGAIN;

                PDEBUG("\"%s\" writing: going to sleep\n",current->comm);
                prepare_to_wait(&dev->outq, &wait, TASK_INTERRUPTIBLE);
                if (spacefree(dev) == 0)
                        schedule();
                finish_wait(&dev->outq, &wait);
                if (signal_pending(current))

                        return -ERESTARTSYS; /* signal: tell the fs layer to handle it */
                if (down_interruptible(&dev->sem))
                        return -ERESTARTSYS;
        }
        return 0;

}

再次注意 while 循環(huán). 如果有空間可用而不必睡眠, 這個函數(shù)簡單地返回. 否則, 它必須丟掉設備旗標并且等待. 這個代碼使用 DEFINE_WAIT 來設置一個等待隊列入口并且 prepare_to_wait 來準備好實際的睡眠. 接著是對緩沖的必要的檢查; 我們必須處理的情況是在我們已經(jīng)進入 while 循環(huán)后以及在我們將自己放入等待隊列之前 (并且丟棄了旗標), 緩沖中有空間可用了. 沒有這個檢查, 如果讀進程能夠在那時完全清空緩沖, 我們可能錯過我們能得到的唯一的喚醒并且永遠睡眠. 在說服我們自己必須睡眠之后, 我們調(diào)用 schedule.

值得再看看這個情況: 當睡眠發(fā)生在 if 語句測試和調(diào)用 schedule 之間, 會發(fā)生什么? 在這個情況里, 都好. 這個喚醒重置了進程狀態(tài)為 TASK_RUNNING 并且 schedule 返回 -- 盡管不必馬上. 只要這個測試發(fā)生在進程放置自己到等待隊列和改變它的狀態(tài)之后, 事情都會順利.

為了結(jié)束, 我們調(diào)用 finish_wait. 對 signal_pending 的調(diào)用告訴我們是否我們被一個信號喚醒; 如果是, 我們需要返回到用戶并且使它們稍后再試. 否則, 我們請求旗標, 并且再次照常測試空閑空間.

6.2.5.3.?互斥等待

我們已經(jīng)見到當一個進程調(diào)用 wake_up 在等待隊列上, 所有的在這個隊列上等待的進程被置為可運行的. 在許多情況下, 這是正確的做法. 但是, 在別的情況下, 可能提前知道只有一個被喚醒的進程將成功獲得需要的資源, 并且其余的將簡單地再次睡眠. 每個這樣的進程, 但是, 必須獲得處理器, 競爭資源(和任何的管理用的鎖), 并且明確地回到睡眠. 如果在等待隊列中的進程數(shù)目大, 這個"驚群"行為可能嚴重降低系統(tǒng)的性能.

為應對實際世界中的驚群問題, 內(nèi)核開發(fā)者增加了一個"互斥等待"選項到內(nèi)核中. 一個互斥等待的行為非常象一個正常的睡眠, 有 2 個重要的不同:

  • 當一個等待隊列入口有 WQ_FLAG_EXCLUSEVE 標志置位, 它被添加到等待隊列的尾部. 沒有這個標志的入口項, 相反, 添加到開始.

  • 當 wake_up 被在一個等待隊列上調(diào)用, 它在喚醒第一個有 WQ_FLAG_EXCLUSIVE 標志的進程后停止.

最后的結(jié)果是進行互斥等待的進程被一次喚醒一個, 以順序的方式, 并且沒有引起驚群問題. 但內(nèi)核仍然每次喚醒所有的非互斥等待者.

在驅(qū)動中采用互斥等待是要考慮的, 如果滿足 2 個條件: 你希望對資源的有效競爭, 并且喚醒一個進程就足夠來完全消耗資源當資源可用時. 互斥等待對 Apacheweb 服務器工作地很好, 例如; 當一個新連接進入, 確實地系統(tǒng)中的一個 Apache 進程應當被喚醒來處理它. 我們在 scullpipe 驅(qū)動中不使用互斥等待, 但是; 很少見到競爭數(shù)據(jù)的讀者(或者競爭緩沖空間的寫者), 并且我們無法知道一個讀者, 一旦被喚醒, 將消耗所有的可用數(shù)據(jù).

使一個進程進入可中斷的等待, 是調(diào)用 prepare_to_wait_exclusive 的簡單事情:


void prepare_to_wait_exclusive(wait_queue_head_t *queue, wait_queue_t *wait, int state); 

這個調(diào)用, 當用來代替 prepare_to_wait, 設置"互斥"標志在等待隊列入口項并且添加這個進程到等待隊列的尾部. 注意沒有辦法使用 wait_event 和它的變體來進行互斥等待.

6.2.5.4.?喚醒的細節(jié)

我們已展現(xiàn)的喚醒進程的樣子比內(nèi)核中真正發(fā)生的要簡單. 當進程被喚醒時產(chǎn)生的真正動作是被位于等待隊列入口項的一個函數(shù)控制的. 缺省的喚醒函數(shù)[22]設置進程為可運行的狀態(tài), 并且可能地進行一個上下文切換到有更高優(yōu)先級進程. 設備驅(qū)動應當從不需要提供一個不同的喚醒函數(shù); 如果你例外, 關于如何做的信息見 <linux/wait.h>

我們尚未看到所有的 wake_up 變體. 大部分驅(qū)動編寫者從不需要其他的, 但是, 為完整起見, 這里是整個集合:

wake_up(wait_queue_head_t queue);wake_up_interruptible(wait_queue_head_t queue);
wake_up 喚醒隊列中的每個不是在互斥等待中的進程, 并且就只一個互斥等待者, 如果它存在. wake_up_interruptible 同樣, 除了它跳過處于不可中斷睡眠的進程. 這些函數(shù), 在返回之前, 使一個或多個進程被喚醒來被調(diào)度(盡管如果它們被從一個原子上下文調(diào)用, 這就不會發(fā)生).

wake_up_nr(wait_queue_head_t queue, int nr);wake_up_interruptible_nr(wait_queue_head_t queue, int nr);
這些函數(shù)類似 wake_up, 除了它們能夠喚醒多達 nr 個互斥等待者, 而不只是一個. 注意傳遞 0 被解釋為請求所有的互斥等待者都被喚醒, 而不是一個沒有.

wake_up_all(wait_queue_head_t queue);wake_up_interruptible_all(wait_queue_head_t queue);
這種 wake_up 喚醒所有的進程, 不管它們是否進行互斥等待(盡管可中斷的類型仍然跳過在做不可中斷等待的進程)

wake_up_interruptible_sync(wait_queue_head_t *queue);
正常地, 一個被喚醒的進程可能搶占當前進程, 并且在 wake_up 返回之前被調(diào)度到處理器. 換句話說, 調(diào)用 wake_up 可能不是原子的. 如果調(diào)用 wake_up 的進程運行在原子上下文(它可能持有一個自旋鎖, 例如, 或者是一個中斷處理), 這個重調(diào)度不會發(fā)生. 正常地, 那個保護是足夠的. 但是, 如果你需要明確要求不要被調(diào)度出處理器在那時, 你可以使用 wake_up_interruptible 的"同步"變體. 這個函數(shù)最常用在當調(diào)用者要無論如何重新調(diào)度, 并且它會更有效的來首先簡單地完成剩下的任何小的工作.

如果上面的全部內(nèi)容在第一次閱讀時沒有完全清楚, 不必擔心. 很少請求會需要調(diào)用 wake_up_interruptible 之外的.

6.2.5.5.?以前的歷史: sleep_on

如果你花些時間深入內(nèi)核源碼, 你可能遇到我們到目前忽略討論的 2 個函數(shù):


void sleep_on(wait_queue_head_t *queue);
void interruptible_sleep_on(wait_queue_head_t *queue);

如你可能期望的, 這些函數(shù)無條件地使當前進程睡眠在給定隊列尚. 這些函數(shù)強烈不推薦, 但是, 并且你應當從不使用它們. 如果你想想它則問題是明顯的: sleep_on 沒提供方法來避免競爭條件. 常常有一個窗口在當你的代碼決定它必須睡眠時和當 sleep_on 真正影響到睡眠時. 在那個窗口期間到達的喚醒被錯過. 因此, 調(diào)用 sleep_on 的代碼從不是完全安全的.

當前計劃對 sleep_on 和 它的變體的調(diào)用(有多個我們尚未展示的超時的類型)在不太遠的將來從內(nèi)核中去掉.

6.2.6.?測試 scullpipe 驅(qū)動

我們已經(jīng)見到了 scullpipe 驅(qū)動如何實現(xiàn)阻塞 I/O. 如果你想試一試, 這個驅(qū)動的源碼可在剩下的本書例子中找到. 阻塞 I/O 的動作可通過打開 2 個窗口見到. 第一個可運行一個命令諸如 cat /dev/scullpipe. 如果你接著, 在另一個窗口拷貝文件到 /dev/scullpipe, 你可見到文件的內(nèi)容出現(xiàn)在第一個窗口.

測試非阻塞的動作是技巧性的, 因為可用于 shell 的傳統(tǒng)的程序不做非阻塞操作. misc-progs 源碼目錄包含下面簡單的程序, 稱為 nbtest, 來測試非阻塞操作. 所有它做的是拷貝它的輸入到它的輸出, 使用非阻塞 I/O 和在重試間延時. 延時時間在命令行被傳遞被缺省是 1 秒.


int main(int argc, char **argv)
{

        int delay = 1, n, m = 0;
        if (argc > 1)
                delay=atoi(argv[1]);
        fcntl(0, F_SETFL, fcntl(0,F_GETFL) | O_NONBLOCK); /* stdin */
        fcntl(1, F_SETFL, fcntl(1,F_GETFL) | O_NONBLOCK); /* stdout */

        while (1) {
                n = read(0, buffer, 4096);
                if (n >= 0)
                        m = write(1, buffer, n);
                if ((n < 0 || m < 0) && (errno != EAGAIN))
                        break;
                sleep(delay);
        }
        perror(n < 0 ? "stdin" : "stdout");
        exit(1);
}

如果你在一個進程跟蹤工具, 如 strace 下運行這個程序, 你可見到每個操作的成功或者失敗, 依賴是否當進行操作時有數(shù)據(jù)可用.

[22] 它有一個想象的名子 default_wake_function.

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號