|

并发

虽然 Go 程序编译后生成的是本地可执行代码,但是这些可执行代码必须运行在Go 语言的运行时(Runtime )中。Go 运行时类似 Java 和 .NET 语言所用到的虚拟机,主要负责管理包括内存分配、自动垃圾回收、栈处理、协程(Goroutine)、信道(Channel,也称为通道)、切片(slice)、字典(map)和反射(reflect)等。

Go 运行时通过接口函数调用来管理协程(Goroutine)和信道(Channel)等功能。Go 用户代码对操作系统内核 API 的调用会被 Go 运行时拦截并处理。

Go 运行时的重要组成部分是协程调度器(Goroutine Scheduler)。它负责追踪、调度每个协程运行,实际上是从应用程序的进程(Process)所属的线程池(Thread Pool)中分配一个线程执行这个协程。每个协程只有分配到一个操作系统线程才能运行。

CSP 是一种消息传递模型,通过在 goroutine 之间传递数据来传递消息,而不是对数据进行加锁来实现同步访问。

https://lddpicture.oss-cn-beijing.aliyuncs.com/picture/watermark,type_ZHJvaWRzYW5zZmFsbGJhY2s,shadow_50,text_Q1NETiBAd29odTExMDQ=,size_20,color_FFFFFF,t_70,g_se,x_16#pic_center.jpeg

Go 语言的并发执行体称为 goroutine , Go 语言通过 go 关键字来启动一个 goroutinegoroutine 是一种非常轻量级的实现,可在单个进程里执行成千上万的并发任务。

  • Go 程序从 main 包的 main() 函数开始,在程序启动时, Go 程序就会为 main() 函数创建一个默认的 goroutine
  • Go 程序中使用 go 关键字为一个函数创建一个 goroutine 。一个函数可以被创建多个 goroutine ,而一个 goroutine 必定对应一个函数。
  • 调度器不能保证多个 goroutine 执行次序,且进程退出时不会等待它们结束。
go func( 参数列表 ){
    函数体
}( 调用参数列表 )
package main

import (
	"runtime"
	"time"
)

func main() {
	go func() {
		sum := 0
		for i := 0; i <= 10000; i++ {
			sum += i
		}

		println("sum is: ", sum)
		time.Sleep(1 * time.Second)
	}()
	//NumGoroutine 可以返回当前程序的 goroutine 数目
	println("NumGoroutine=", runtime.NumGoroutine())

	// main goroutine 故意“ sleep ” 5 秒, 防止 main 提前退出
	time.Sleep(5 * time.Second)
}
package main

import (
	"runtime"
	"time"
)

func sum() {
	sum := 0
	for i := 0; i <= 10000; i++ {
		sum += i
	}

	println("sum is: ", sum)
	time.Sleep(1 * time.Second)
}

func main() {
	go sum()
	//NumGoroutine 可以返回当前程序的 goroutine 数目
	println("NumGoroutine=", runtime.NumGoroutine())

	// main goroutine 故意“ sleep ” 5 秒, 防止 main 提前退出
	time.Sleep(5 * time.Second)
}
  1. go 的执行是非阻塞的,不会等待
  2. go 后面的函数的返回值会被忽略
  3. 调度器不能保证多个 goroutine 的执行次序
  4. 没有父子 goroutine 的概念,所有的 goroutine 是平等地被调度和执行的
  5. Go 程序在执行时会单独为 main 函数创建一个 goroutine ,遇到其他 go 关键字时再去创建其他的 goroutine
  6. 主函数返回时,所有的 goroutine 都会被直接打断,程序退出;
  7. Go 没有暴露 goroutine id 给用户,所以不能在一个 goroutine 里面显式地操作另一个 goroutine , 不过 runtime 包提供了一些函数访问和设置 goroutine 的相关信息;
  8. runtime.NumGoroutine 返回一个进程的所有 goroutine 数, main()goroutine 也被算在里面。因此实际创建的 goroutine 数量为扣除 main()goroutine 数。
runtime.GOMAXPROCS(逻辑CPU数量)
=0查询当前的 GOMAXPROCS 
=1设置单核心执行
.> 1设置多核并发执行
package main

import (
	"runtime"
)

func main() {

	// 获取当前的 GOMAXPROCS 值
	println("GOMAXPROCS=", runtime.GOMAXPROCS(0))

	// 设置当前的 GOMAXPROCS 值
	runtime.GOMAXPROCS(2)

	// 获取当前的 GOMAXPROCS 值
	println("GOMAXPROCS=", runtime.GOMAXPROCS(0))
}

func Goexit () 是结束当前 goroutine 的运行, Goexit 在结束当前 goroutine 运行之前会调用当前 goroutine 已经注册的 defer

Goexit 并不会产生 panic ,所以该 goroutine defer 里面的 recover 调用都返回 nil 。

调用 runtime.Goexit 将立即终止当前 goroutine 执行,调度器确保所有已注册 defer 延迟调用被执行。

package main

import (
	"runtime"
	"sync"
)

func main() {
	wg := new(sync.WaitGroup)
	wg.Add(1)
	go func() {
		defer wg.Done()
		defer println("A.defer")
		func() {
			defer println("B.defer")
			runtime.Goexit() // 终止当前 goroutine
			println("B")     // 不会执行
		}()
		println("A") // 不会执行
	}()
	wg.Wait()
}

Gosched 是放弃当前调度执行机会,将当前 goroutine 暂停,放回队列等待下次被调度执行。

  1. I/Oselect
  2. channel
  3. 等待锁
  4. 函数调用(有时)
  5. runetime.Gosched()
package main

import (
	"runtime"
	"sync"
)

func main() {
	wg := new(sync.WaitGroup)
	wg.Add(2)

	go func() {
		defer wg.Done()
		for i := 0; i < 5; i++ {
			println("Hello, World!")
		}

	}()
	go func() {
		defer wg.Done()
		for i := 0; i < 5; i++ {
			println(i)
			if i == 2 {
				runtime.Gosched()
			}
		}
	}()

	println("NumGoroutine is ", runtime.GOMAXPROCS(0))
	wg.Wait()
}
package main

import (
	"fmt"
	"runtime"
)

func say(s string) {
	for i := 0; i < 2; i++ {
		runtime.Gosched()
		fmt.Println(s)
	}
}
func main() {
	go say("world")
	say("hello")
}
//runtime.Gosched() 会在不同的 goroutine 之间切换,当 main goroutine 退出时,其它的 goroutine 都会直接退出
//输出: hello world hello
// 这个示例程序展示如何创建goroutine 以及调度器的行为
package main

import (
	"fmt"
	"runtime"
	"sync"
)

// main是所有Go程序的入口
func main() {
	// 分配一个逻辑处理器给调度器使用, 这个函数允许程序更改调度器可以使用的逻辑处理器的数量。
	runtime.GOMAXPROCS(1)

	// wg用来等待程序完成
	// WaitGroup是一个计数信号量,可以用来记录并维护运行的 goroutine。如果WaitGroup的值大于0,Wait方法就会阻塞。
	var wg sync.WaitGroup

	// 计数加2,表示要等待两个goroutine
	wg.Add(2)

	fmt.Println("Start Goroutines")

	// 声明一个匿名函数,并创建一个goroutine
	go func() {
		// 关键字defer会修改函数调用时机,在正在执行的函数返回时才真正调用defer声明的函数。
		// 关键字defer保证,每个 goroutine 一旦完成其工作就调用Done方法。
		// 在函数退出时调用Done来通知main函数工作已经完成
		defer wg.Done()

		// 显示字母表3次
		for count := 0; count < 3; count++ {
			for char := 'a'; char < 'a'+26; char++ {
				fmt.Printf("%c ", char)
			}
		}
	}()

	// 声明一个匿名函数,并创建一个goroutine
	go func() {
		// 在函数退出时调用Done来通知main函数工作已经完成
		defer wg.Done()

		// 显示字母表3次
		for count := 0; count < 3; count++ {
			for char := 'A'; char < 'A'+26; char++ {
				fmt.Printf("%c ", char)
			}
		}
	}()

	fmt.Println("Waiting To Finish")
	// 等待goroutine结束,否则 main 函数将直接继续往下走
	wg.Wait()

	fmt.Println("\nTerminating Program")
}

goroutine 占用时间过长时,调度器会停止当前正运行的 goroutine ,并给其他可运行的 goroutine 运行的机会。

// 这个示例程序展示如何创建goroutine 以及调度器的行为
package main

import (
	"fmt"
	"runtime"
	"sync"
)

var wg sync.WaitGroup

// main是所有Go程序的入口
func main() {
	// 分配一个逻辑处理器给调度器使用, 这个函数允许程序更改调度器可以使用的逻辑处理器的数量。
	runtime.GOMAXPROCS(1)

	// wg用来等待程序完成
	// WaitGroup是一个计数信号量,可以用来记录并维护运行的 goroutine。如果WaitGroup的值大于0,Wait方法就会阻塞。

	// 计数加2,表示要等待两个goroutine
	wg.Add(2)

	fmt.Println("Start Goroutines")

	// 创建两个goroutine
	go printPrime("A")
	go printPrime("B")
	fmt.Println("Waiting To Finish")
	// 等待goroutine结束,否则 main 函数将直接继续往下走
	wg.Wait()

	fmt.Println("\nTerminating Program")
}

func printPrime(prefix string) {
	defer wg.Done()
next:
	for outer := 2; outer < 50000; outer++ {
		for inner := 2; inner < outer; inner++ {
			if outer%inner == 0 {
				continue next
			}
		}
		fmt.Printf("%s:%d\n", prefix, outer)
	}
	fmt.Println("Completed", prefix)
}

Context 是一个接口,它具备手动、定时、超时发出取消信号、传值等功能,主要用于控制多个 goroutine 之间的协作,尤其是取消操作。一旦取消指令下达,那么被 Context 跟踪的这些 goroutine 都会收到取消信号,就可以做清理和退出操作。

  • 传递数据
  • 主动取消
  • 超时取消
type Context interface {
   Deadline() (deadline time.Time, ok bool)
   Done() <-chan struct{}
   Err() error
   Value(key interface{}) interface{}
}
  • Deadline 方法可以获取设置的截止时间,第一个返回值 deadline 是截止时间,到了这个时间点,Context 会自动发起取消请求,第二个返回值 ok 代表是否设置了截止时间。
  • Done 方法返回一个只读的 channel,类型为 struct{}。在 goroutine 中,如果该方法返回的 chan 可以读取,则意味着 Context 已经发起了取消信号。通过 Done 方法收到这个信号后,就可以做清理操作,然后退出 goroutine ,释放资源。
  • Err 方法返回取消的错误原因,即因为什么原因 Context 被取消。
  • Value 方法获取该 Context 上绑定的值,是一个键值对,所以要通过一个 key 才可以获取对应的值。
  • 空 Context:不可取消,没有截止时间,主要用于 Context 树的根节点。

  • 可取消的 Context:用于发出取消信号,当取消的时候,它的子 Context 也会取消。

  • 可定时取消的 Context:多了一个定时的功能。

  • 值 Context:用于存储一个 key-value 键值对。

WithCancel(parent Context) (ctx Context, cancel CancelFunc)生成一个可取消的 Context

WithDeadline(parent Context, d time.Time) (Context, CancelFunc)生成一个可定时取消的 Context参数 d 为定时取消的具体时间

WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)生成一个可超时取消的 Context参数 timeout 用于设置多久后取消

WithValue(parent Context, key, val interface{}) Context生成一个可携带 key-value 键值对的 Context
package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup
	wg.Add(3)
	ctx, stop := context.WithCancel(context.Background())
	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_1")
	}()

	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_2")
	}()

	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_3")
	}()

	time.Sleep(5 * time.Second)
	stop() //发停止指令
	wg.Wait()
}

func watchDog(ctx context.Context, name string) {
	//开启for select循环,一直后台监控
	for {
		select {
		case <-ctx.Done():
			fmt.Println(name, "receive stop cmd, will stop")
			return
		default:
			fmt.Println(name, "is running ……")
		}
		time.Sleep(1 * time.Second)
	}
}
package main

import (
	"context"
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup
	wg.Add(4)
	ctx, stop := context.WithCancel(context.Background())
	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_1")
	}()

	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_2")
	}()

	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_3")
	}()

	valCtx := context.WithValue(ctx, "userId", 2)
	go func() {
		defer wg.Done()
		getUser(valCtx)
	}()

	time.Sleep(5 * time.Second)
	stop() //发停止指令
	wg.Wait()
}

func watchDog(ctx context.Context, name string) {
	//开启for select循环,一直后台监控
	for {
		select {
		case <-ctx.Done():
			fmt.Println(name, "receive stop cmd, will stop")
			return
		default:
			fmt.Println(name, "is running ……")
		}
		time.Sleep(1 * time.Second)
	}
}

func getUser(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("user exit")
			return
		default:
			userId := ctx.Value("userId")   // 在这里获取value的取值
			fmt.Println("userId is", userId)
			time.Sleep(1 * time.Second)
		}
	}
}
package main

import (
    "fmt"
    "sync"
    "time"

    "golang.org/x/net/context"
)

var (
    wg sync.WaitGroup
)

func startTask(ctx context.Context) error {
    defer wg.Done()

    for i := 0; i < 30; i++ {
        select {
        case <-time.After(2 * time.Second):
            fmt.Printf("in goroutine do task %v\n", i)

        // we received the signal of cancelation in this channel
        case <-ctx.Done():
            fmt.Printf("cancel goroutine task %v\n", i)
            return ctx.Err()
        }
    }
    return nil
}

func main() {
    timeoutCtx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
    defer cancel()

    fmt.Println("startTask")

    wg.Add(1)
    go startTask(timeoutCtx)
    wg.Wait()

    fmt.Println("endTask")
}   
package main

import (
    "fmt"
    "time"

    "golang.org/x/net/context"
)

func startTask(ctx context.Context, task string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("stop goroutine startTask")
            return
        default:
            fmt.Println(task, "in goroutine do task")
            time.Sleep(2 * time.Second)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go startTask(ctx, "start allen.wu task 1")
    go startTask(ctx, "start allen.wu task 2")

    time.Sleep(6 * time.Second)

    fmt.Println("Now, call func cancel to stop all goroutines")
    cancel()

    time.Sleep(5 * time.Second)
}
  • Context 不要放在结构体中,要以参数的方式传递
  • Context 作为函数的参数时,要放在第一位,也就是第一个参数
  • 要使用 context.Background 函数生成根节点的 Context,也就是最顶层的 Context
  • Context 传值要传递必须的值,而且要尽可能地少,不要什么都传
  • Context 多 goroutine 安全,可以在多个 goroutine 中放心使用。
  • 可以把一个 Context 对象传递给任意个数的 Gorotuine,对它执行 取消 操作时,所有 goroutine 都会接收到取消信号
  • Context 一般是作为函数的参数进行传递,并且最优的做法是把 Context 作为第一个参数放到每个关键函数的参数中,并且变量名都建议统一命名,名为 ctx。
  • 一般而言,把 context.Background() 作为第一个 parent Context
  • Context 的 Value 中应该传递必须的核心元数据,不要什么数据都使用 Context 传递。
  • 永远记住,只要传递 Context,就不要把 Context 设置为 nil 来传递

Go 语言在 1.9 版本中提供了一种效率较高的并发安全的 sync.Mapsync.Mapmap 不同,不是以语言原生形态提供,而是在 sync 包下的特殊结构。

  • sync.Map 不能使用 map 的方式进行取值和设置等操作,而是使用 sync.Map 的方法进行调用, Store 表示存储, Load 表示获取, Delete 表示删除。
package main

import (
      "fmt"
      "sync"
)

func main() {
    
	// 声明 scene,类型为 sync.Map,注意,sync.Map 不能使用 make 创建。
    var scene sync.Map

    // 将键值对保存到sync.Map
    // sync.Map 将键和值以 interface{} 类型进行保存。
    scene.Store("greece", 97)
    scene.Store("london", 100)
    scene.Store("egypt", 200)

    // 从sync.Map中根据键取值
    fmt.Println(scene.Load("london"))

    // 根据键删除对应的键值对
    scene.Delete("london")

    // 遍历所有sync.Map中的键值对
    // 遍历需要提供一个匿名函数,参数为 k、v,类型为 interface{},
    // 每次 Range() 在遍历一个元素时,都会调用这个匿名函数把结果返回。
    scene.Range(func(k, v interface{}) bool {

        fmt.Println("iterate:", k, v)
        return true
    })

}
0%