并发

虽然 Go 程序编译后生成的是本地可执行代码,但是这些可执行代码必须运行在Go 语言的运行时(Runtime )中。Go 运行时类似 Java 和 .NET 语言所用到的虚拟机,主要负责管理包括内存分配、自动垃圾回收、栈处理、协程(Goroutine)、信道(Channel,也称为通道)、切片(slice)、字典(map)和反射(reflect)等。
Go运行时通过接口函数调用来管理协程(Goroutine)和信道(Channel)等功能。Go用户代码对操作系统内核API的调用会被Go运行时拦截并处理。
Go运行时的重要组成部分是协程调度器(Goroutine Scheduler)。它负责追踪、调度每个协程运行,实际上是从应用程序的进程(Process)所属的线程池(Thread Pool)中分配一个线程执行这个协程。每个协程只有分配到一个操作系统线程才能运行。
CSP是一种消息传递模型,通过在goroutine之间传递数据来传递消息,而不是对数据进行加锁来实现同步访问。

1. goroutine启动
Go语言的并发执行体称为goroutine,Go语言通过go关键字来启动一个goroutine。goroutine是一种非常轻量级的实现,可在单个进程里执行成千上万的并发任务。
Go程序从main包的main()函数开始,在程序启动时,Go程序就会为main()函数创建一个默认的goroutine。
Go程序中使用go关键字为一个函数创建一个goroutine。一个函数可以被创建多个goroutine,而一个goroutine必定对应一个函数。- 调度器不能保证多个
goroutine执行次序,且进程退出时不会等待它们结束。
1. 匿名函数启动
go func( 参数列表 ){
    函数体
}( 调用参数列表 )
package main
import (
	"runtime"
	"time"
)
func main() {
	go func() {
		sum := 0
		for i := 0; i <= 10000; i++ {
			sum += i
		}
		println("sum is: ", sum)
		time.Sleep(1 * time.Second)
	}()
	//NumGoroutine 可以返回当前程序的 goroutine 数目
	println("NumGoroutine=", runtime.NumGoroutine())
	// main goroutine 故意“ sleep ” 5 秒, 防止 main 提前退出
	time.Sleep(5 * time.Second)
}
2. 有名函数启动
package main
import (
	"runtime"
	"time"
)
func sum() {
	sum := 0
	for i := 0; i <= 10000; i++ {
		sum += i
	}
	println("sum is: ", sum)
	time.Sleep(1 * time.Second)
}
func main() {
	go sum()
	//NumGoroutine 可以返回当前程序的 goroutine 数目
	println("NumGoroutine=", runtime.NumGoroutine())
	// main goroutine 故意“ sleep ” 5 秒, 防止 main 提前退出
	time.Sleep(5 * time.Second)
}
2. goroutine特点
- go的执行是非阻塞的,不会等待;
- go后面的函数的返回值会被忽略;
- 调度器不能保证多个 goroutine的执行次序;
- 没有父子 goroutine的概念,所有的goroutine是平等地被调度和执行的;
- Go程序在执行时会单独为- main函数创建一个- goroutine,遇到其他- go关键字时再去创建其他的- goroutine;
- 主函数返回时,所有的 goroutine都会被直接打断,程序退出;
- Go没有暴露- goroutine id给用户,所以不能在一个- goroutine里面显式地操作另一个- goroutine, 不过- runtime包提供了一些函数访问和设置- goroutine的相关信息;
- runtime.NumGoroutine返回一个进程的所有- goroutine数,- main()的- goroutine也被算在里面。因此实际创建的- goroutine数量为扣除- main()的- goroutine数。
3. runtime 包函数
1. func GOMAXPROCS
runtime.GOMAXPROCS(逻辑CPU数量)
=0:查询当前的 GOMAXPROCS 值。
=1:设置单核心执行。
.> 1:设置多核并发执行。
package main
import (
	"runtime"
)
func main() {
	// 获取当前的 GOMAXPROCS 值
	println("GOMAXPROCS=", runtime.GOMAXPROCS(0))
	// 设置当前的 GOMAXPROCS 值
	runtime.GOMAXPROCS(2)
	// 获取当前的 GOMAXPROCS 值
	println("GOMAXPROCS=", runtime.GOMAXPROCS(0))
}
2. func Coexit
func Goexit () 是结束当前 goroutine 的运行, Goexit 在结束当前 goroutine 运行之前会调用当前 goroutine 已经注册的 defer 。
Goexit 并不会产生 panic ,所以该 goroutine defer 里面的 recover 调用都返回 nil 。
调用 runtime.Goexit 将立即终止当前 goroutine 执行,调度器确保所有已注册 defer 延迟调用被执行。
package main
import (
	"runtime"
	"sync"
)
func main() {
	wg := new(sync.WaitGroup)
	wg.Add(1)
	go func() {
		defer wg.Done()
		defer println("A.defer")
		func() {
			defer println("B.defer")
			runtime.Goexit() // 终止当前 goroutine
			println("B")     // 不会执行
		}()
		println("A") // 不会执行
	}()
	wg.Wait()
}
3. func Gosched
Gosched是放弃当前调度执行机会,将当前goroutine暂停,放回队列等待下次被调度执行。
- I/O、- select
- channel
- 等待锁
- 函数调用(有时)
- runetime.Gosched()
package main
import (
	"runtime"
	"sync"
)
func main() {
	wg := new(sync.WaitGroup)
	wg.Add(2)
	go func() {
		defer wg.Done()
		for i := 0; i < 5; i++ {
			println("Hello, World!")
		}
	}()
	go func() {
		defer wg.Done()
		for i := 0; i < 5; i++ {
			println(i)
			if i == 2 {
				runtime.Gosched()
			}
		}
	}()
	println("NumGoroutine is ", runtime.GOMAXPROCS(0))
	wg.Wait()
}
package main
import (
	"fmt"
	"runtime"
)
func say(s string) {
	for i := 0; i < 2; i++ {
		runtime.Gosched()
		fmt.Println(s)
	}
}
func main() {
	go say("world")
	say("hello")
}
//runtime.Gosched() 会在不同的 goroutine 之间切换,当 main goroutine 退出时,其它的 goroutine 都会直接退出
//输出: hello world hello
4. 一个 逻辑处理器处理 goroutine 时间较短
// 这个示例程序展示如何创建goroutine 以及调度器的行为
package main
import (
	"fmt"
	"runtime"
	"sync"
)
// main是所有Go程序的入口
func main() {
	// 分配一个逻辑处理器给调度器使用, 这个函数允许程序更改调度器可以使用的逻辑处理器的数量。
	runtime.GOMAXPROCS(1)
	// wg用来等待程序完成
	// WaitGroup是一个计数信号量,可以用来记录并维护运行的 goroutine。如果WaitGroup的值大于0,Wait方法就会阻塞。
	var wg sync.WaitGroup
	// 计数加2,表示要等待两个goroutine
	wg.Add(2)
	fmt.Println("Start Goroutines")
	// 声明一个匿名函数,并创建一个goroutine
	go func() {
		// 关键字defer会修改函数调用时机,在正在执行的函数返回时才真正调用defer声明的函数。
		// 关键字defer保证,每个 goroutine 一旦完成其工作就调用Done方法。
		// 在函数退出时调用Done来通知main函数工作已经完成
		defer wg.Done()
		// 显示字母表3次
		for count := 0; count < 3; count++ {
			for char := 'a'; char < 'a'+26; char++ {
				fmt.Printf("%c ", char)
			}
		}
	}()
	// 声明一个匿名函数,并创建一个goroutine
	go func() {
		// 在函数退出时调用Done来通知main函数工作已经完成
		defer wg.Done()
		// 显示字母表3次
		for count := 0; count < 3; count++ {
			for char := 'A'; char < 'A'+26; char++ {
				fmt.Printf("%c ", char)
			}
		}
	}()
	fmt.Println("Waiting To Finish")
	// 等待goroutine结束,否则 main 函数将直接继续往下走
	wg.Wait()
	fmt.Println("\nTerminating Program")
}
5. 一个 逻辑处理器处理 goroutine 时间较长
当
goroutine占用时间过长时,调度器会停止当前正运行的goroutine,并给其他可运行的goroutine运行的机会。
// 这个示例程序展示如何创建goroutine 以及调度器的行为
package main
import (
	"fmt"
	"runtime"
	"sync"
)
var wg sync.WaitGroup
// main是所有Go程序的入口
func main() {
	// 分配一个逻辑处理器给调度器使用, 这个函数允许程序更改调度器可以使用的逻辑处理器的数量。
	runtime.GOMAXPROCS(1)
	// wg用来等待程序完成
	// WaitGroup是一个计数信号量,可以用来记录并维护运行的 goroutine。如果WaitGroup的值大于0,Wait方法就会阻塞。
	// 计数加2,表示要等待两个goroutine
	wg.Add(2)
	fmt.Println("Start Goroutines")
	// 创建两个goroutine
	go printPrime("A")
	go printPrime("B")
	fmt.Println("Waiting To Finish")
	// 等待goroutine结束,否则 main 函数将直接继续往下走
	wg.Wait()
	fmt.Println("\nTerminating Program")
}
func printPrime(prefix string) {
	defer wg.Done()
next:
	for outer := 2; outer < 50000; outer++ {
		for inner := 2; inner < outer; inner++ {
			if outer%inner == 0 {
				continue next
			}
		}
		fmt.Printf("%s:%d\n", prefix, outer)
	}
	fmt.Println("Completed", prefix)
}
4. Context
Context是一个接口,它具备手动、定时、超时发出取消信号、传值等功能,主要用于控制多个goroutine之间的协作,尤其是取消操作。一旦取消指令下达,那么被Context跟踪的这些goroutine都会收到取消信号,就可以做清理和退出操作。
- 传递数据
- 主动取消
- 超时取消
1. 结口方法
type Context interface {
   Deadline() (deadline time.Time, ok bool)
   Done() <-chan struct{}
   Err() error
   Value(key interface{}) interface{}
}
- Deadline方法可以获取设置的截止时间,第一个返回值- deadline是截止时间,到了这个时间点,- Context会自动发起取消请求,第二个返回值- ok代表是否设置了截止时间。
- Done方法返回一个只读的- channel,类型为- struct{}。在- goroutine中,如果该方法返回的- chan可以读取,则意味着- Context已经发起了取消信号。通过- Done方法收到这个信号后,就可以做清理操作,然后退出- goroutine,释放资源。
- Err方法返回取消的错误原因,即因为什么原因- Context被取消。
- Value方法获取该- Context上绑定的值,是一个键值对,所以要通过一个- key才可以获取对应的值。
2. context树
- 
空 Context:不可取消,没有截止时间,主要用于 Context 树的根节点。 
- 
可取消的 Context:用于发出取消信号,当取消的时候,它的子 Context 也会取消。 
- 
可定时取消的 Context:多了一个定时的功能。 
- 
值 Context:用于存储一个 key-value 键值对。 
WithCancel(parent Context) (ctx Context, cancel CancelFunc):生成一个可取消的 Context。
WithDeadline(parent Context, d time.Time) (Context, CancelFunc):生成一个可定时取消的 Context,参数 d 为定时取消的具体时间。
WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc):生成一个可超时取消的 Context,参数 timeout 用于设置多久后取消
WithValue(parent Context, key, val interface{}) Context:生成一个可携带 key-value 键值对的 Context。
1. context.WithCancel 取消多个 goroutine
package main
import (
	"context"
	"fmt"
	"sync"
	"time"
)
func main() {
	var wg sync.WaitGroup
	wg.Add(3)
	ctx, stop := context.WithCancel(context.Background())
	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_1")
	}()
	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_2")
	}()
	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_3")
	}()
	time.Sleep(5 * time.Second)
	stop() //发停止指令
	wg.Wait()
}
func watchDog(ctx context.Context, name string) {
	//开启for select循环,一直后台监控
	for {
		select {
		case <-ctx.Done():
			fmt.Println(name, "receive stop cmd, will stop")
			return
		default:
			fmt.Println(name, "is running ……")
		}
		time.Sleep(1 * time.Second)
	}
}
2. context.WithValue 传值
package main
import (
	"context"
	"fmt"
	"sync"
	"time"
)
func main() {
	var wg sync.WaitGroup
	wg.Add(4)
	ctx, stop := context.WithCancel(context.Background())
	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_1")
	}()
	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_2")
	}()
	go func() {
		defer wg.Done()
		watchDog(ctx, "watchdog_3")
	}()
	valCtx := context.WithValue(ctx, "userId", 2)
	go func() {
		defer wg.Done()
		getUser(valCtx)
	}()
	time.Sleep(5 * time.Second)
	stop() //发停止指令
	wg.Wait()
}
func watchDog(ctx context.Context, name string) {
	//开启for select循环,一直后台监控
	for {
		select {
		case <-ctx.Done():
			fmt.Println(name, "receive stop cmd, will stop")
			return
		default:
			fmt.Println(name, "is running ……")
		}
		time.Sleep(1 * time.Second)
	}
}
func getUser(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			fmt.Println("user exit")
			return
		default:
			userId := ctx.Value("userId")   // 在这里获取value的取值
			fmt.Println("userId is", userId)
			time.Sleep(1 * time.Second)
		}
	}
}
3. context.WithTimeout 超时取消
package main
import (
    "fmt"
    "sync"
    "time"
    "golang.org/x/net/context"
)
var (
    wg sync.WaitGroup
)
func startTask(ctx context.Context) error {
    defer wg.Done()
    for i := 0; i < 30; i++ {
        select {
        case <-time.After(2 * time.Second):
            fmt.Printf("in goroutine do task %v\n", i)
        // we received the signal of cancelation in this channel
        case <-ctx.Done():
            fmt.Printf("cancel goroutine task %v\n", i)
            return ctx.Err()
        }
    }
    return nil
}
func main() {
    timeoutCtx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
    defer cancel()
    fmt.Println("startTask")
    wg.Add(1)
    go startTask(timeoutCtx)
    wg.Wait()
    fmt.Println("endTask")
}   
4. 调用 Context cancel 和 Done 取消任务
package main
import (
    "fmt"
    "time"
    "golang.org/x/net/context"
)
func startTask(ctx context.Context, task string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Println("stop goroutine startTask")
            return
        default:
            fmt.Println(task, "in goroutine do task")
            time.Sleep(2 * time.Second)
        }
    }
}
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    go startTask(ctx, "start allen.wu task 1")
    go startTask(ctx, "start allen.wu task 2")
    time.Sleep(6 * time.Second)
    fmt.Println("Now, call func cancel to stop all goroutines")
    cancel()
    time.Sleep(5 * time.Second)
}
3. 注意事项
- Context 不要放在结构体中,要以参数的方式传递。
- Context 作为函数的参数时,要放在第一位,也就是第一个参数。
- 要使用 context.Background 函数生成根节点的 Context,也就是最顶层的 Context。
- Context 传值要传递必须的值,而且要尽可能地少,不要什么都传。
- Context 多 goroutine 安全,可以在多个 goroutine 中放心使用。
- 可以把一个 Context 对象传递给任意个数的 Gorotuine,对它执行 取消 操作时,所有 goroutine 都会接收到取消信号。
- Context 一般是作为函数的参数进行传递,并且最优的做法是把 Context 作为第一个参数放到每个关键函数的参数中,并且变量名都建议统一命名,名为 ctx。
- 一般而言,把 context.Background() 作为第一个 parent Context
- Context 的 Value 中应该传递必须的核心元数据,不要什么数据都使用 Context 传递。
- 永远记住,只要传递 Context,就不要把 Context 设置为 nil 来传递。
5. sync.map
Go语言在 1.9 版本中提供了一种效率较高的并发安全的sync.Map,sync.Map和map不同,不是以语言原生形态提供,而是在sync包下的特殊结构。
sync.Map不能使用map的方式进行取值和设置等操作,而是使用sync.Map的方法进行调用,Store表示存储,Load表示获取,Delete表示删除。
package main
import (
      "fmt"
      "sync"
)
func main() {
    
	// 声明 scene,类型为 sync.Map,注意,sync.Map 不能使用 make 创建。
    var scene sync.Map
    // 将键值对保存到sync.Map
    // sync.Map 将键和值以 interface{} 类型进行保存。
    scene.Store("greece", 97)
    scene.Store("london", 100)
    scene.Store("egypt", 200)
    // 从sync.Map中根据键取值
    fmt.Println(scene.Load("london"))
    // 根据键删除对应的键值对
    scene.Delete("london")
    // 遍历所有sync.Map中的键值对
    // 遍历需要提供一个匿名函数,参数为 k、v,类型为 interface{},
    // 每次 Range() 在遍历一个元素时,都会调用这个匿名函数把结果返回。
    scene.Range(func(k, v interface{}) bool {
        fmt.Println("iterate:", k, v)
        return true
    })
}