跳转到内容
彼岸论坛
欢迎抵达彼岸 彼岸花开 此处谁在 -彼岸论坛

[Go 编程语言] 各位大佬,请教一个 golang 多线程的阻塞问题


小天管理

已推荐帖子

编程小白,在写一个多线程目录文件遍历的时候,出现了阻塞问题,求教各位大佬~

通过增大 var taskChan = make(chan string, 1000),chan 缓冲区为 100 万的时候程序不会阻塞

但是我通过打印日志发现 taskChan 占用很小,只有十几,而且存在通道写入失败的情况

taskChan 的缓冲区为 1000 时,阻塞的日志如下:

[DEBUG]:增加目录,增加 wg, [1802], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1801], taskChan = [5]
[DEBUG]:任务完成,减小 wg, [1798], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1797], taskChan = [17]
[DEBUG]:任务完成,减小 wg, [1816], taskChan = [4]
[DEBUG]:增加目录,增加 wg, [1803], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1843], taskChan = [1]
[DEBUG]:任务完成,减小 wg, [1797], taskChan = [21]
[DEBUG]:任务完成,减小 wg, [1841], taskChan = [24]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1840], taskChan = [6]
[DEBUG]:任务完成,减小 wg, [1798], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1840], taskChan = [13]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1840], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1841], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [2]
[DEBUG]:增加目录,增加 wg, [1842], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [1843], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1844], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1845], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1846], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1847], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1848], taskChan = [0]
[ERROR]:写入通道失败...
[DEBUG]:增加目录,增加 wg, [1849], taskChan = [0]

如果把 taskChan 的缓冲区为 100 万的时候,程序可以正常退出,日志如下:

[DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [3], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [3], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [4], taskChan = [0]
[DEBUG]:增加目录,增加 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [2], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [1], taskChan = [0]
[DEBUG]:任务完成,减小 wg, [0], taskChan = [0]
[INFO]:目录扫描完毕
[DEBUG]:func GetAllFilePath end
[DEBUG]:func StartScan end
[DEBUG]:func btnStartScanOnclick end

代码如下:

package core

import (
	"DopliGo/logs"
	"github.com/panjf2000/ants/v2"
	"os"
	"path/filepath"
	"sync"
	"sync/atomic"
)

func GetAllFilePath(rootPath string) {
	//logs.IsLogDebug = false
	logs.Debug("func GetAllFilePath start")
	// 创建任务通道和结果通道
	var taskChan = make(chan string, 1000000)
	var resultChan = make(chan string, 1000000)
	var wg sync.WaitGroup
	var counter int64 = 0

	// 创建生产者 goroutine 池
	producerPool, _ := ants.NewPoolWithFunc(16, func(i interface{}) {
		produceTasks(i.(string), taskChan, resultChan, &counter, &wg)
	})

	logs.Debug("cap:%d", producerPool.Cap())
	defer producerPool.Release()

	taskChan <- rootPath
	wg.Add(1) // 这里增加计数器
	atomic.AddInt64(&counter, 1)
	logs.Debug("任务开始,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(&counter), len(resultChan))

	// 启动生产者
	go func() {
		//defer logs.Debug("生产者退出")
		for task := range taskChan {
			err := producerPool.Invoke(task)
			if err != nil {
				logs.Error("failed to producerPool Invoke, err: %s", err)
				return
			}
		}
	}()

	// 启动结果处理 goroutine
	go func() {
		//defer logs.Debug("消费者退出")
		for result := range resultChan {
			_ = result
		}
	}()

	// 等待所有任务完成
	wg.Wait()
	close(resultChan)
	close(taskChan)
	logs.Info("目录扫描完毕")
	logs.Debug("func GetAllFilePath end")
}

func produceTasks(rootPath string, taskChan chan string, resultChan chan string, counter *int64, wg *sync.WaitGroup) {
	defer wg.Done() // 确保每次 produceTasks 完成时,调用 Done
	//	logs.Debug("func produceTasks start")

	entries, err := os.ReadDir(rootPath)
	if err != nil {
		logs.Error("failed to read dir: %s , err: %s", rootPath, err)
		return
	}

	for _, entry := range entries {
		path := filepath.Join(rootPath, entry.Name())
		if entry.IsDir() {
			wg.Add(1)
			atomic.AddInt64(counter, 1)
			select {
			case taskChan <- path:
				// 发送成功
			default:
				// 发送失败,通道已满
				logs.Error("写入通道失败...")
			}
			logs.Debug("增加目录,增加 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
		} else {
			resultChan <- path
		}
	}

	atomic.AddInt64(counter, -1)
	logs.Debug("任务完成,减小 wg, [%d], taskChan = [%d]", atomic.LoadInt64(counter), len(resultChan))
	//logs.Debug("func produceTasks end")
}

意见的链接
分享到其他网站

加入讨论

您现在可以发表并稍后注册. 如果您是会员,请现在登录来参与讨论.

游客
回复主题...

×   粘贴为富文本.   粘贴为纯文本来代替

  只允许使用75个表情符号.

×   您的链接已自动嵌入.   显示为链接来代替

×   您之前的内容已恢复.   清除编辑器

×   您无法直接粘贴图片.要从网址上传或插入图片.

  • 游客注册

    游客注册

  • 会员

    没有会员可显示

  • 最新的状态更新

    没有最新的状态更新
  • 最近查看

    • 没有会员查看此页面.
×
×
  • 创建新的...