8.8. 示例: 并发的目录遍历

在本小节中,我们会创建一个程序来生成指定目录的硬盘使用情况报告,这个程序和Unix里的du工具比较相似。大多数工作用下面这个walkDir函数来完成,这个函数使用dirents函数来枚举一个目录下的所有入口。

gopl.io/ch8/du1

  1. // walkDir recursively walks the file tree rooted at dir
  2. // and sends the size of each found file on fileSizes.
  3. func walkDir(dir string, fileSizes chan<- int64) {
  4. for _, entry := range dirents(dir) {
  5. if entry.IsDir() {
  6. subdir := filepath.Join(dir, entry.Name())
  7. walkDir(subdir, fileSizes)
  8. } else {
  9. fileSizes <- entry.Size()
  10. }
  11. }
  12. }
  13. // dirents returns the entries of directory dir.
  14. func dirents(dir string) []os.FileInfo {
  15. entries, err := ioutil.ReadDir(dir)
  16. if err != nil {
  17. fmt.Fprintf(os.Stderr, "du1: %v\n", err)
  18. return nil
  19. }
  20. return entries
  21. }

ioutil.ReadDir函数会返回一个os.FileInfo类型的slice,os.FileInfo类型也是os.Stat这个函数的返回值。对每一个子目录而言,walkDir会递归地调用其自身,同时也在递归里获取每一个文件的信息。walkDir函数会向fileSizes这个channel发送一条消息。这条消息包含了文件的字节大小。

下面的主函数,用了两个goroutine。后台的goroutine调用walkDir来遍历命令行给出的每一个路径并最终关闭fileSizes这个channel。主goroutine会对其从channel中接收到的文件大小进行累加,并输出其和。

  1. package main
  2. import (
  3. "flag"
  4. "fmt"
  5. "io/ioutil"
  6. "os"
  7. "path/filepath"
  8. )
  9. func main() {
  10. // Determine the initial directories.
  11. flag.Parse()
  12. roots := flag.Args()
  13. if len(roots) == 0 {
  14. roots = []string{"."}
  15. }
  16. // Traverse the file tree.
  17. fileSizes := make(chan int64)
  18. go func() {
  19. for _, root := range roots {
  20. walkDir(root, fileSizes)
  21. }
  22. close(fileSizes)
  23. }()
  24. // Print the results.
  25. var nfiles, nbytes int64
  26. for size := range fileSizes {
  27. nfiles++
  28. nbytes += size
  29. }
  30. printDiskUsage(nfiles, nbytes)
  31. }
  32. func printDiskUsage(nfiles, nbytes int64) {
  33. fmt.Printf("%d files %.1f GB\n", nfiles, float64(nbytes)/1e9)
  34. }

这个程序会在打印其结果之前卡住很长时间。

  1. $ go build gopl.io/ch8/du1
  2. $ ./du1 $HOME /usr /bin /etc
  3. 213201 files 62.7 GB

如果在运行的时候能够让我们知道处理进度的话想必更好。但是,如果简单地把printDiskUsage函数调用移动到循环里会导致其打印出成百上千的输出。

下面这个du的变种会间歇打印内容,不过只有在调用时提供了-v的flag才会显示程序进度信息。在roots目录上循环的后台goroutine在这里保持不变。主goroutine现在使用了计时器来每500ms生成事件,然后用select语句来等待文件大小的消息来更新总大小数据,或者一个计时器的事件来打印当前的总大小数据。如果-v的flag在运行时没有传入的话,tick这个channel会保持为nil,这样在select里的case也就相当于被禁用了。

gopl.io/ch8/du2

  1. var verbose = flag.Bool("v", false, "show verbose progress messages")
  2. func main() {
  3. // ...start background goroutine...
  4. // Print the results periodically.
  5. var tick <-chan time.Time
  6. if *verbose {
  7. tick = time.Tick(500 * time.Millisecond)
  8. }
  9. var nfiles, nbytes int64
  10. loop:
  11. for {
  12. select {
  13. case size, ok := <-fileSizes:
  14. if !ok {
  15. break loop // fileSizes was closed
  16. }
  17. nfiles++
  18. nbytes += size
  19. case <-tick:
  20. printDiskUsage(nfiles, nbytes)
  21. }
  22. }
  23. printDiskUsage(nfiles, nbytes) // final totals
  24. }

由于我们的程序不再使用range循环,第一个select的case必须显式地判断fileSizes的channel是不是已经被关闭了,这里可以用到channel接收的二值形式。如果channel已经被关闭了的话,程序会直接退出循环。这里的break语句用到了标签break,这样可以同时终结select和for两个循环;如果没有用标签就break的话只会退出内层的select循环,而外层的for循环会使之进入下一轮select循环。

现在程序会悠闲地为我们打印更新流:

  1. $ go build gopl.io/ch8/du2
  2. $ ./du2 -v $HOME /usr /bin /etc
  3. 28608 files 8.3 GB
  4. 54147 files 10.3 GB
  5. 93591 files 15.1 GB
  6. 127169 files 52.9 GB
  7. 175931 files 62.2 GB
  8. 213201 files 62.7 GB

然而这个程序还是会花上很长时间才会结束。完全可以并发调用walkDir,从而发挥磁盘系统的并行性能。下面这个第三个版本的du,会对每一个walkDir的调用创建一个新的goroutine。它使用sync.WaitGroup(§8.5)来对仍旧活跃的walkDir调用进行计数,另一个goroutine会在计数器减为零的时候将fileSizes这个channel关闭。

gopl.io/ch8/du3

  1. func main() {
  2. // ...determine roots...
  3. // Traverse each root of the file tree in parallel.
  4. fileSizes := make(chan int64)
  5. var n sync.WaitGroup
  6. for _, root := range roots {
  7. n.Add(1)
  8. go walkDir(root, &n, fileSizes)
  9. }
  10. go func() {
  11. n.Wait()
  12. close(fileSizes)
  13. }()
  14. // ...select loop...
  15. }
  16. func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
  17. defer n.Done()
  18. for _, entry := range dirents(dir) {
  19. if entry.IsDir() {
  20. n.Add(1)
  21. subdir := filepath.Join(dir, entry.Name())
  22. go walkDir(subdir, n, fileSizes)
  23. } else {
  24. fileSizes <- entry.Size()
  25. }
  26. }
  27. }

由于这个程序在高峰期会创建成百上千的goroutine,我们需要修改dirents函数,用计数信号量来阻止他同时打开太多的文件,就像我们在8.7节中的并发爬虫一样:

  1. // sema is a counting semaphore for limiting concurrency in dirents.
  2. var sema = make(chan struct{}, 20)
  3. // dirents returns the entries of directory dir.
  4. func dirents(dir string) []os.FileInfo {
  5. sema <- struct{}{} // acquire token
  6. defer func() { <-sema }() // release token
  7. // ...

这个版本比之前那个快了好几倍,尽管其具体效率还是和你的运行环境,机器配置相关。

练习 8.9: 编写一个du工具,每隔一段时间将root目录下的目录大小计算并显示出来。