小天管理 发表于 2024年8月22日 发表于 2024年8月22日 编程小白,在写一个多线程目录文件遍历的时候,出现了阻塞问题,求教各位大佬~ 通过增大 var taskChan = make(chan string, 1000),chan 缓冲区为 100 万的时候程序不会阻塞 但是我通过打印日志发现 taskChan 占用很小,只有十几,而且存在通道写入失败的情况 taskChan 的缓冲区为 1000 时,阻塞的日志如下: [DEBUG]:增加目录,增加 wg, [1802], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1842], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1843], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1844], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1801], taskChan = [5] [DEBUG]:任务完成,减小 wg, [1798], taskChan = [1] [DEBUG]:任务完成,减小 wg, [1797], taskChan = [17] [DEBUG]:任务完成,减小 wg, [1816], taskChan = [4] [DEBUG]:增加目录,增加 wg, [1803], taskChan = [1] [DEBUG]:任务完成,减小 wg, [1843], taskChan = [1] [DEBUG]:任务完成,减小 wg, [1797], taskChan = [21] [DEBUG]:任务完成,减小 wg, [1841], taskChan = [24] [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1840], taskChan = [6] [DEBUG]:任务完成,减小 wg, [1798], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1840], taskChan = [13] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1840], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1841], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1842], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1842], taskChan = [2] [DEBUG]:增加目录,增加 wg, [1842], taskChan = [0] [DEBUG]:增加目录,增加 wg, [1843], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1844], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1845], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1846], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1847], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1848], taskChan = [0] [ERROR]:写入通道失败... [DEBUG]:增加目录,增加 wg, [1849], taskChan = [0] 如果把 taskChan 的缓冲区为 100 万的时候,程序可以正常退出,日志如下: [DEBUG]:增加目录,增加 wg, [2], taskChan = [0] [DEBUG]:增加目录,增加 wg, [3], taskChan = [0] [DEBUG]:增加目录,增加 wg, [4], taskChan = [0] [DEBUG]:任务完成,减小 wg, [3], taskChan = [0] [DEBUG]:任务完成,减小 wg, [3], taskChan = [0] [DEBUG]:任务完成,减小 wg, [3], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1], taskChan = [0] [DEBUG]:增加目录,增加 wg, [4], taskChan = [0] [DEBUG]:增加目录,增加 wg, [2], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1], taskChan = [0] [DEBUG]:任务完成,减小 wg, [2], taskChan = [0] [DEBUG]:任务完成,减小 wg, [2], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1], taskChan = [0] [DEBUG]:任务完成,减小 wg, [1], taskChan = [0] [DEBUG]:任务完成,减小 wg, [0], taskChan = [0] [INFO]:目录扫描完毕 [DEBUG]:func GetAllFilePath end [DEBUG]:func StartScan end [DEBUG]:func btnStartScanOnclick end 代码如下: package core import ( "DopliGo/logs" "github.com/panjf2000/ants/v2" "os" "path/filepath" "sync" "sync/atomic" ) func GetAllFilePath(rootPath string) { //logs.IsLogDebug = false logs.Debug("func GetAllFilePath start") // 创建任务通道和结果通道 var taskChan = make(chan string, 1000000) var resultChan = make(chan string, 1000000) var wg sync.WaitGroup var counter int64 = 0 // 创建生产者 goroutine 池 producerPool, _ := ants.NewPoolWithFunc(16, func(i interface{}) { produceTasks(i.(string), taskChan, resultChan, &counter, &wg) }) logs.Debug("cap:%d", producerPool.Cap()) defer producerPool.Release() taskChan <- rootPath wg.Add(1) // 这里增加计数器 atomic.AddInt64(&counter, 1) logs.Debug("任务开始,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(&counter), len(resultChan)) // 启动生产者 go func() { //defer logs.Debug("生产者退出") for task := range taskChan { err := producerPool.Invoke(task) if err != nil { logs.Error("failed to producerPool Invoke, err: %s", err) return } } }() // 启动结果处理 goroutine go func() { //defer logs.Debug("消费者退出") for result := range resultChan { _ = result } }() // 等待所有任务完成 wg.Wait() close(resultChan) close(taskChan) logs.Info("目录扫描完毕") logs.Debug("func GetAllFilePath end") } func produceTasks(rootPath string, taskChan chan string, resultChan chan string, counter *int64, wg *sync.WaitGroup) { defer wg.Done() // 确保每次 produceTasks 完成时,调用 Done // logs.Debug("func produceTasks start") entries, err := os.ReadDir(rootPath) if err != nil { logs.Error("failed to read dir: %s , err: %s", rootPath, err) return } for _, entry := range entries { path := filepath.Join(rootPath, entry.Name()) if entry.IsDir() { wg.Add(1) atomic.AddInt64(counter, 1) select { case taskChan <- path: // 发送成功 default: // 发送失败,通道已满 logs.Error("写入通道失败...") } logs.Debug("增加目录,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan)) } else { resultChan <- path } } atomic.AddInt64(counter, -1) logs.Debug("任务完成,减小 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan)) //logs.Debug("func produceTasks end") }
已推荐帖子