控制Goroutine并发量的解决方案

摘要:
前言Go语言虽然开并发Goroutine特别简单,但是实际中如果不控制并发的数量会导致资源的浪费以及同时占用大量服务资源导致服务性能下降!创建完goroutinepool后,通过pool.Submit方法向pool中提交任务。如果pool中尚有空闲的goroutineworker,则pool.Submit立即返回;否则根据pool的配置,pool.Submit立即返回错误或等待有空闲goroutineworker成功接收任务后返回。

前言

Go语言虽然开并发Goroutine特别简单,但是实际中如果不控制并发的数量会导致资源的浪费以及同时占用大量服务资源(http连接、数据库连接、文件句柄等)导致服务性能下降!

笔者之前总结过一篇在业务代码中控制并发数量的文章:Go控制协裎并发数量的用法及实际中的一个案例

ants库实现链接池的效果控制并发量

今天介绍另外一个控制并发数量的第三方库:ants

简而言之,ants库通过实现“Goroutine链接池”来限制Goroutine的数量:通过NewPool函数创建一个goroutine pool实现具体效果。

创建完 goroutine pool 后,通过pool.Submit方法向 pool 中提交任务。
如果 pool 中尚有空闲的 goroutine worker,则pool.Submit立即返回;否则根据 pool 的配置,pool.Submit立即返回错误或等待有空闲 goroutine worker 成功接收任务后返回。

使用案例

使用之前记当然是 go get一下:

go get github.com/panjf2000/ants

基本使用

最基本的使用场景是:提交任务,等待任务完成并获取结果。

控制Goroutine并发量的解决方案第1张控制Goroutine并发量的解决方案第2张
package test1

import (
    "fmt"
    "github.com/panjf2000/ants/v2"
    "sync"
    "testing")

func sum(a, b int) int{
    return a +b
}

func wrapSum(i int, ch chan int, wg *sync.WaitGroup) func() {
    returnfunc() {
        defer wg.Done()
        ch <-sum(i, i)
    }
}

func TestT1(t *testing.T) {

    varwg sync.WaitGroup
    ch := make(chan int, 10)
    //ants.Release 相当于调用 defaultPool.Release,停止 defaultPool 中所有的 goroutine worker.
defer ants.Release()
    for i := 0; i < 10; i++{
        wg.Add(1)
        //ants.Submit 相当于调用 defaultPool.Submit,而 defaultPool 是在 package 初始化时 ants 库创建的
        if err := ants.Submit(wrapSum(i, ch, &wg)); err !=nil {
            return}
    }
    wg.Wait()
    close(ch)
    for v :=range ch {
        fmt.Println(v)
    }

}
g1_test.go

需要注意以下几点:

1、这里使用的是ants包默认的链接池(ants.Submit方法),打开源码可以看到链接池的容量大小为: math.MaxInt32*(2147483647),所以实际中推荐大家自己控制链接池的容量大小。

2、Submit 方法只接受 func() 类型的参数,如果提交的任务有参数,需要自己 wrap。

3、ants 没有提供返回值机制,任务的执行结果需要自己进行处理,例子中用了一个带 buffer 的 channel。需要注意的是,当 pool 中有多个任务时,任务的返回值不是根据任务的提交顺序进行排序的,任务的返回顺序取决于调用时机,可以认为是随机的。

4、ants 没有提供等待所有任务完成的机制,例子中用了 sync.WaitGroup 实现了等待所有任务完成的机制,否则 main goroutine 可能会在任务执行结束前退出。

配置pool为nonblocking状态的情况

以下示例将 pool 配置为 nonblocking。在这种情况下,当 pool 中没有 可用的 goroutine worker 时,Submit 会直接返回错误ants.ErrPoolOverload,而不会等待提交任务成功才返回。

另外这里可以配置链接池的大小(ants.NewPool方法):

控制Goroutine并发量的解决方案第3张控制Goroutine并发量的解决方案第4张
package test1

import (
    "fmt"
    "github.com/panjf2000/ants/v2"
    "testing")

func hangForever() {
    ch := make(chan int)
    ch <- 10}

func TestT2(t *testing.T) {
    pool, err := ants.NewPool(10, ants.WithNonblocking(true))
    if err !=nil {
        return}
    defer pool.Release()
    for i := 0; i < 10; i++{
        if err := pool.Submit(hangForever); err !=nil {
            return}
    }
    if err := pool.Submit(func() { fmt.Println("hello") }); err !=nil {
        fmt.Printf("err=ErrPoolOverload:%t
", err ==ants.ErrPoolOverload)
    }
}
g2_test.go

关于超时任务的处理 ***

在实际中我们往往会希望在摸一个Goroutine执行任务超时或者其他一些情况下退出而不是一直占用着资源!

但是由于线程才是操作系统可调度的最小的单位,Goroutine是代码级别的并发,由于GMP模型的限制,我们并不能确定开启的子Goroutine什么时候执行,Go中也没有像epoll那样的“轮询机制”——专门开一个协程去轮询其他的子Goroutine管理它们,所以想要真正的去实现子Goroutine的超时退出需要程序员们在业务代码中做相应的逻辑处理。
我这里使用context去简单处理超时的Goroutine:

控制Goroutine并发量的解决方案第5张控制Goroutine并发量的解决方案第6张
package test1

import (
    "context"
    "fmt"
    "github.com/panjf2000/ants/v2"
    "testing"
    "time")

func expensiveTask2(ctx context.Context, a, b int) (int, error) {
    select{
    //simulate an expensive task
    case <-time.After(10 *time.Second):
        return a +b, nil
    case <-ctx.Done():
        return 0, ctx.Err()
    }
}

func wrap2(ctx context.Context) func() {
    returnfunc() {
        sum, err := expensiveTask2(ctx, 10, 20)
        if err !=nil {
            fmt.Printf("error is %v
", err)
        } else{
            fmt.Printf("sum is %d
", sum)
        }
    }
}

func TestT31(t *testing.T) {
    pool, err := ants.NewPool(1)
    if err !=nil {
        return}
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    if err := pool.Submit(wrap2(ctx)); err !=nil {
        return}
    //wait other goroutines.
    for i := 0; i < 20; i++{
        time.Sleep(time.Second)
        fmt.Printf("main waits for %d seconds
", i+1)
    }
}
g3_test.go

以上的例子中,在提交任务时,向任务传递了一个 3 秒钟超时的 context。
在任务函数的逻辑中,通过Done()方法等待停止信号(超时或被 main goroutine 主动 cancel),从而使任务函数在一定的时机结束,避免一直执行下去。

需要注意的是,expensiveTask函数中用了select来等待Done()的返回,在业务逻辑的哪个时机等待Done,需要开程序员自己去设计!

免责声明:文章转载自《控制Goroutine并发量的解决方案》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇debian9下安装toturnLinux(三)—— 项目部署环境搭建下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

在Tomcat服务器中启动SpringBoot项目原理(简化版)

总的来说,tomcat方式启动WAR包项目, tomcat会查询context上下文中实现ServletContainerInitializer接口的类,然后调用类的onStartup(Set<Class<?>> c, ServletContext ctx)方法 Spring的SpringServletContainerInitia...

.net微信公众号开发——快速入门【转载】

最近在学习微信公众号开发,将学习的成果做成了一个类库,方便重复使用。 现在微信公众号多如牛毛,开发微信的高手可以直接无视这个系列的文章了。 使用该类库的流程及寥寥数行代码得到的结果如下。 本文的源代码主要在:http://git.oschina.net/xrwang2/xrwang.weixin.PublicAccount/blob/master/xr...

如何在不同的语言/平台中获取Android ID

如何在不同的语言/平台中获取Android ID# 最近开发工作中需要使用到AndroidID,在Unity和native code中也需要使用,java获取很方便,Unity中也不难,最难的是在native code中获取。 获取android ID需要有一个上下文实例,也就是Context实例,看下面的java获取方式: 在java中获取## Andr...

《Spring源码深度解析》一

Spring整体架构 1.1 Spring整体架构 1.1.1 Core Container: 模块:Core、Beans、Context和Expression Language Core:框架的基础部分, 提供IOC 和依赖注入特性。也包含核心工具类。基础概念是BeanFactory, 它提供对工厂模式的经典实现来消除对程序性单例模式的需要, 并真正地...

使用 ASP.NET 一般处理程序或 WebService 返回 JSON

今天, 将为大家说明如何在 ASP.NET 中使用一般处理程序或者 WebService 向 javascript 返回 JSON. 本文更新: 2011-12-9: 增加 -:data 的说明. 由于精力有限, 不能在多个博客中保证文章的同步, 可在如下地址查看最新内容, 请谅解: http://code.google.com/p/zsharedcode...

第86章、系统服务之TELEPHONY_SERVICE(从零开始学Android)

TelephonyManager类主要提供了一系列用于访问与手机通讯相关的状态和信息的get方法。其中包括手机SIM的状态和信息、电信网络的状态及手机用户的信息。在应用程序中可以使用这些get方法获取相关数据。     TelephonyManager类的对象可以通过Context.getSystemService(Context.TELEPHONY_S...