Skip to content

Go笔记(二):面向对象、错误处理、协程、并发

面向对象编程

go 是面向对象的语言吗?
官方回答:Yes and no 即是也不是

结构的封装

定义一个 struct

go
type Employee struct {  
Id string  
Name string  
Age int  
}
type Employee struct {  
Id string  
Name string  
Age int  
}

struct 的初始化

go
e := Employee{"0", "Bob", 20}  
  
e1 := Employee{Name: "Mike", Age: 30}  
  
e11 := Employee{}  
  
e2 := new(Employee) //返回指针 相当于 &Employee{}e2.Id = "22"  
e2.Age = 22  
e2.Name = "xiaoming"
e := Employee{"0", "Bob", 20}  
  
e1 := Employee{Name: "Mike", Age: 30}  
  
e11 := Employee{}  
  
e2 := new(Employee) //返回指针 相当于 &Employee{}e2.Id = "22"  
e2.Age = 22  
e2.Name = "xiaoming"

行为的封装

行为和方法的定义,支持两种方法,如下:

go
// 1. 实例对应方法被调用时,实例的成员会进行值复制  
func (e Employee) String() string {  
    return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)  
}  
  
// 2. 通常为了避免内存拷贝我们使用第二种定义方式  
func (e *Employee) String() string {  
    // 通过指针访问,直接.就行  
    return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)     
}
// 1. 实例对应方法被调用时,实例的成员会进行值复制  
func (e Employee) String() string {  
    return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)  
}  
  
// 2. 通常为了避免内存拷贝我们使用第二种定义方式  
func (e *Employee) String() string {  
    // 通过指针访问,直接.就行  
    return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)     
}

尝试:

go
func (e *Employee) String() string {  
	fmt.Printf("address is %x\n", unsafe.Pointer(&e.Name))  
	// 通过指针访问,直接.就行  
	return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)  
}  
  
func TestStructOperation(t *testing.T) {  
	e := Employee{"1", "bob", 20}  
	fmt.Printf("address1 is %x \n", unsafe.Pointer(&e.Name))  
	t.Log(e.String())  
}  
//打印结果, 可以看到地址是相同的  
// address1 is c0000723a0  
// address is c0000723a0
func (e *Employee) String() string {  
	fmt.Printf("address is %x\n", unsafe.Pointer(&e.Name))  
	// 通过指针访问,直接.就行  
	return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)  
}  
  
func TestStructOperation(t *testing.T) {  
	e := Employee{"1", "bob", 20}  
	fmt.Printf("address1 is %x \n", unsafe.Pointer(&e.Name))  
	t.Log(e.String())  
}  
//打印结果, 可以看到地址是相同的  
// address1 is c0000723a0  
// address is c0000723a0

另外一种方式,打印出的地址则是不同的。即结构被复制,会有额外的开销。

接口

接口特性:

  1. 接口是非侵入性的,实现不依赖接口定义
  2. 所以接口的定义可以包含在接口使用者包内
go
type Programmer interface {  
WriteHelloWorld() string  
}  
type GoProgrammer struct {  
}  
// "duck type" 方法签名看起来是一样的。  
func (p *GoProgrammer) WriteHelloWorld() string {  
return "hello world"  
}  
  
func TestInterface(t *testing.T) {  
var p Programmer  
p = new(GoProgrammer)  
t.Log(p.WriteHelloWorld())  
}
type Programmer interface {  
WriteHelloWorld() string  
}  
type GoProgrammer struct {  
}  
// "duck type" 方法签名看起来是一样的。  
func (p *GoProgrammer) WriteHelloWorld() string {  
return "hello world"  
}  
  
func TestInterface(t *testing.T) {  
var p Programmer  
p = new(GoProgrammer)  
t.Log(p.WriteHelloWorld())  
}

go 中实现继承和 override

go 中是没有继承的,不过可以实现类似的效果。可以结合匿名嵌套类型实现。

go
type Pet struct {  
}  
  
func (p *Pet) Speak() {  
	fmt.Print("...")  
}  
  
func (p *Pet) SpeakTo(host string) {  
	p.Speak()  
	fmt.Println(" ", host)  
}  
  
// 匿名嵌套类型  
/// 可以实现类似继承的效果,但是有所不同  
type Dog1 struct {  
	Pet  
}
type Pet struct {  
}  
  
func (p *Pet) Speak() {  
	fmt.Print("...")  
}  
  
func (p *Pet) SpeakTo(host string) {  
	p.Speak()  
	fmt.Println(" ", host)  
}  
  
// 匿名嵌套类型  
/// 可以实现类似继承的效果,但是有所不同  
type Dog1 struct {  
	Pet  
}

根据需要求进行方法的重写:

go
func (p *Dog1) Speak() {  
	fmt.Print("dog1 ...")  
}  
  
func (p *Dog1) SpeakTo(host string) {  
	p.Speak()  
	fmt.Println("dog1 ", host)  
}  
  
func TestDog(t *testing.T) {  
	dog := new(Dog1)  
	dog.SpeakTo("hhh")  
	dog.Speak()  
}
func (p *Dog1) Speak() {  
	fmt.Print("dog1 ...")  
}  
  
func (p *Dog1) SpeakTo(host string) {  
	p.Speak()  
	fmt.Println("dog1 ", host)  
}  
  
func TestDog(t *testing.T) {  
	dog := new(Dog1)  
	dog.SpeakTo("hhh")  
	dog.Speak()  
}

多态

多态就是同一个接口,使用不同的实例而执行不同操作,如 interface 打印; 彩色相机和黑白相机执行效果不同

go
type MyProgrammer interface {  
sayHello() string  
}  
type Golang struct {  
}  
type Java struct {  
}  
type Ruby struct {  
}  
  
func (p *Golang) sayHello() string {  
	return "go white hello world"  
}  
  
func (p *Java) sayHello() string {  
	return "java white hello world"  
}  
  
func callSayHello(p MyProgrammer) {  
	fmt.Println(p.sayHello())  
}  
  
func TestDuotai(t *testing.T) {  
	goProg := new(Golang)  
	javaProg := &Java{}  
	// 需要操作指针  
	callSayHello(goProg)  
	callSayHello(javaProg)  
	//callSayHello(new(Ruby)) 因为没有实现方法,这里编译报错  
}
type MyProgrammer interface {  
sayHello() string  
}  
type Golang struct {  
}  
type Java struct {  
}  
type Ruby struct {  
}  
  
func (p *Golang) sayHello() string {  
	return "go white hello world"  
}  
  
func (p *Java) sayHello() string {  
	return "java white hello world"  
}  
  
func callSayHello(p MyProgrammer) {  
	fmt.Println(p.sayHello())  
}  
  
func TestDuotai(t *testing.T) {  
	goProg := new(Golang)  
	javaProg := &Java{}  
	// 需要操作指针  
	callSayHello(goProg)  
	callSayHello(javaProg)  
	//callSayHello(new(Ruby)) 因为没有实现方法,这里编译报错  
}

空接口和断言

  1. 空接口可以表示任何类型 类似 void* 或 java 的 Object
  2. 通过断言将空接口转换为定制类型: v,ok = p.(int)
go
func DoSomething(p interface{}) {  
  
/**  
if i, ok := p.(int); ok {  
fmt.Println("int", i)  
return  
}  
if i, ok := p.(string); ok {  
fmt.Println("string", i)  
return  
}  
fmt.Println("Unknown Type")  
*/  
  
//可以将上述代码进行简化  
	switch v := p.(type) {  
		case int:  
			fmt.Println("int", v)  
		case string:  
			fmt.Println("string", v)  
		default:  
			fmt.Println("unknown")  
	}  
}  
  
func TestConvert(t *testing.T) {  
	DoSomething(10)  
	DoSomething("aa")  
	DoSomething(true)  
//int 10  
//string aa  
//unknown  
}
func DoSomething(p interface{}) {  
  
/**  
if i, ok := p.(int); ok {  
fmt.Println("int", i)  
return  
}  
if i, ok := p.(string); ok {  
fmt.Println("string", i)  
return  
}  
fmt.Println("Unknown Type")  
*/  
  
//可以将上述代码进行简化  
	switch v := p.(type) {  
		case int:  
			fmt.Println("int", v)  
		case string:  
			fmt.Println("string", v)  
		default:  
			fmt.Println("unknown")  
	}  
}  
  
func TestConvert(t *testing.T) {  
	DoSomething(10)  
	DoSomething("aa")  
	DoSomething(true)  
//int 10  
//string aa  
//unknown  
}

最佳实践

Go 接口最佳实践:
1). 倾向于使用小的接口定义,很多接口只包含一个方法
2). 较大的接口定义,可以由多个小接口组合而成

go
type ReadWrite interface {  
	Reader  
	Write  
}
type ReadWrite interface {  
	Reader  
	Write  
}

3). 只依赖必要功能的最小接口. 方便复用

go
func StoreData(reader Reader) error{...}
func StoreData(reader Reader) error{...}

错误处理

Go 错误机制

  1. 没有异常机制
  2. error 类型实现了 error 接口
    3.可以通过 errors.New 来快速创建错误实例
go
type error interface {  
Error() string  
}  
  
func TestName(t *testing.T) {  
errors.New("xx")  
}
type error interface {  
Error() string  
}  
  
func TestName(t *testing.T) {  
errors.New("xx")  
}

因为方法可以返回多个参数,可以使用如下方式,类似 swift 的 Result 库:

go
var LessThanTwoError = errors.New("n should not less than 2")  
var LargerThan100Error = errors.New("n should not large than 100")  
  
func GetFibonacci(n int) ([]int, error) {  
//及早失败,避免嵌套  
if n < 2 {  
return nil, LessThanTwoError  
}  
if n > 100 {  
return nil, LargerThan100Error  
}  
fibList := []int{1, 1}  
for i := 2; i < n; i++ {  
fibList = append(fibList, fibList[i-2]+fibList[i-1])  
}  
return fibList, nil  
}
var LessThanTwoError = errors.New("n should not less than 2")  
var LargerThan100Error = errors.New("n should not large than 100")  
  
func GetFibonacci(n int) ([]int, error) {  
//及早失败,避免嵌套  
if n < 2 {  
return nil, LessThanTwoError  
}  
if n > 100 {  
return nil, LargerThan100Error  
}  
fibList := []int{1, 1}  
for i := 2; i < n; i++ {  
fibList = append(fibList, fibList[i-2]+fibList[i-1])  
}  
return fibList, nil  
}

及早失败,避免嵌套,增加代码的可读性:

go
func GetFibonacci2(str string) {  
var (  
i int  
err error  
list []int  
)  
//及早失败,避免嵌套  
if i, err = strconv.Atoi(str); err != nil {  
fmt.Println("error", err)  
return  
}  
if list, err = GetFibonacci(i); err != nil {  
fmt.Println("err", err)  
return  
}  
fmt.Println(list)  
}  
  
// 调用示例  
func TestFab(t *testing.T) {  
if v, err := GetFibonacci(1000); err != nil {  
t.Log(err)  
if err == LessThanTwoError {  
t.Log("--2")  
}  
if err == LargerThan100Error {  
t.Log("--100")  
}  
} else {  
t.Log(v)  
}  
GetFibonacci2("1001")  
}
func GetFibonacci2(str string) {  
var (  
i int  
err error  
list []int  
)  
//及早失败,避免嵌套  
if i, err = strconv.Atoi(str); err != nil {  
fmt.Println("error", err)  
return  
}  
if list, err = GetFibonacci(i); err != nil {  
fmt.Println("err", err)  
return  
}  
fmt.Println(list)  
}  
  
// 调用示例  
func TestFab(t *testing.T) {  
if v, err := GetFibonacci(1000); err != nil {  
t.Log(err)  
if err == LessThanTwoError {  
t.Log("--2")  
}  
if err == LargerThan100Error {  
t.Log("--100")  
}  
} else {  
t.Log(v)  
}  
GetFibonacci2("1001")  
}

panic

panic

  1. panic 用于不可以恢复的错误
  2. panic 退出前会执行 defer 指定的内容

os.Exit

  1. 不会调用 defer 指定函数
  2. 不会打印栈的信息
go
func TestPanicVxExit(t *testing.T) {  
defer func() {  
// panic会执行defer的代码,而且打印调用栈的信息  
// os.Exit不会  
fmt.Println("finally")  
}()  
fmt.Println("start")  
panic(errors.New("Something error"))  
//os.Exit(-1)  
}
func TestPanicVxExit(t *testing.T) {  
defer func() {  
// panic会执行defer的代码,而且打印调用栈的信息  
// os.Exit不会  
fmt.Println("finally")  
}()  
fmt.Println("start")  
panic(errors.New("Something error"))  
//os.Exit(-1)  
}

recover

recover 可以 java 的类比 try{}catch{},recover 用于错误的恢复:

go
defer func() {  
if error := recover(); err != nil {  
//恢复错误  
}  
}()
defer func() {  
if error := recover(); err != nil {  
//恢复错误  
}  
}()

注意: 不要滥用

  1. 滥用会形成僵尸服务进程,导致 health check 失效
  2. "let it crash!"往往是我们恢复不确定性错误的好方法
go
func TestPanicVxExit2(t *testing.T) {  
defer func() {  
if err := recover(); err != nil {  
// 程序没有打印调用栈,正常退出  
fmt.Println("final:recover", err) // final:recover Something error  
}  
}()  
fmt.Println("start")  
panic(errors.New("Something error"))  
}
func TestPanicVxExit2(t *testing.T) {  
defer func() {  
if err := recover(); err != nil {  
// 程序没有打印调用栈,正常退出  
fmt.Println("final:recover", err) // final:recover Something error  
}  
}()  
fmt.Println("start")  
panic(errors.New("Something error"))  
}

package

  1. 基本复用模块单元:以首字母大写来约定可以被包外代码访问
  2. 代码的 package 可以和所在的目录不一致
  3. 同一目录的 go 代码的 package 要保持一致

package

  1. 通过 go get 来获取远程依赖 go get -u 强制从网路更新远程依赖
  2. 注意代码在 github 上的组织形式,以适应 go get。 直接从代码路径开始,不要有 src

go path 的问题:

  1. 可以在 ide 中设置 GOPATH 的,但是可能会导致在 ide 外调试时不一致、 某些 ide 可能会有些问题
  2. 作者建议使用.bash_profile(macos)中设置

(笔者用的 goland,直接从 GOPATH 中设置 PROJECT GOPATH 为当前的项目目录。
Global GOPATH 设置为默认安装目录/User/用户名/go)

go
import (  
"lession/series"  
// 可以起一个别名  
cm "src/github.com/easierway/concurrent_map"  
"testing"  
)  
  
func TestPackage(t *testing.T) {  
t.Log(series.GetFibonacciSeries(10))  
//t.Log(series.getFibonacciSeries(10))  
}
import (  
"lession/series"  
// 可以起一个别名  
cm "src/github.com/easierway/concurrent_map"  
"testing"  
)  
  
func TestPackage(t *testing.T) {  
t.Log(series.GetFibonacciSeries(10))  
//t.Log(series.getFibonacciSeries(10))  
}

init 方法

  1. main 被执行前,所有依赖的 package 的 init 方法都会被执行
  2. 不同包的 init 函数按照包导入的依赖关系决定执行顺序
  3. 每个包可以有多个 init 函数
  4. 每个源文件可有多个 init 函数

使用远程的 package

使用远程的 package 如: https://github.com/easierway/concurrent_map.git

go get -u github.com/easierway/concurrent_map (发现目录多了 pkg 、src 目录)

如果自己的代码提交到 github 并且适应 go get:

  • 直接以代码路径开始,不要有 src

问题

Go 存在的依赖问题

  1. 同一个环境下,不同项目使用同一包的不同版本 (因为 go get 下来会放到同一个 go 目录)
  2. 无法管理对包的特定版本的依赖

为了解决这些问题
随着 Go 1.5 release 版本发布,vendor 目录被添加到除了 GoPATH 和 GOROOT 之外的依赖目录查找的解决方案。
在 Go 1.6 之前,需要手动设置环境变量。

查找依赖包路径的优先级如下:

  1. 当前包下的 vendor 目录
  2. 想上级目录查找,知道找到 src 下的 vendor 目录
  3. 在 GOPATH 下面查找依赖包
  4. 在 GOROOT 目录下查找

常用的依赖管理工具

  1. godep
  2. glide brew install glide
  3. dep

glide 说明

  1. 安装 brew install glide
  2. 进入目录,执行 glide init
  3. glide install

groutine 协程

Thead vs Groutine

1). 创建时默认的 stack(栈)的大小

  • JDK5 以后默认 Java Thread stack 默认是 1M
  • Groutine 的 Stack 初始化大小为 2k,创建起来也更快

2). 和 系统线程 KSE (Kernel Space Entity)的对应关系

  • java thread 是 1:1
  • Groutine 是 M:N 多对多

如果 thread 和 KSE 是 1:1 关系的话。KSE 是 CPU 直接调度,效率很高,但是如果线程之间发生切换,会牵扯到内核对象的切换,开销很大。
如果多个协程都在和一个内核对应,那么久减少了开销,go 就是这么做的。

P 为 go 语言的协程处理器。
P 都在系统线程里,每个 P 都挂着一些携程队列 G。有的是正在运行状态的,如 G0.

如果有一个 G 非常占用时间,那么队列的 G 会不会被延迟很久?
不会。Go 有守护线程,进行计数,计算每个 P 完成 G 的数量,如果某一个 P 一段时间完成的数量没有变化。
就会往携程的任务栈里插入特别的标记,当 G 运行的时候遇到非内联函数(?)就会读到这个标记,把自己中断,放到队尾。

?另外一个机制,当某个 G 被系统中断了,io 需要等待时(?),P 将自己移到另一个可使用的线程中,继续执行他的队列的 G.

go
func TestGroutine(t *testing.T) {  
for i := 0; i < 10; i++ {  
// Go的方法调用传递时都是值传递,i被复制了一份,地址是不同的。因此可以执行  
go func(i int) {  
fmt.Println(i)  
}(i)  
}  
time.Sleep(time.Millisecond * 50)  
// 可以看到打印是乱序的 (调用程序一定要用function(){}(参数))  
}
func TestGroutine(t *testing.T) {  
for i := 0; i < 10; i++ {  
// Go的方法调用传递时都是值传递,i被复制了一份,地址是不同的。因此可以执行  
go func(i int) {  
fmt.Println(i)  
}(i)  
}  
time.Sleep(time.Millisecond * 50)  
// 可以看到打印是乱序的 (调用程序一定要用function(){}(参数))  
}

共享内存并发机制(锁)

类比其他语言:使用锁进行并发控制。

java
Lock lock = ...  
lock.lock();  
try {  
//process (thread-safe)  
}catch(e){  
}  
finally{  
lock.unlock()  
}
Lock lock = ...  
lock.lock();  
try {  
//process (thread-safe)  
}catch(e){  
}  
finally{  
lock.unlock()  
}

go:

package sync
Mutex: 可 lock 和 unlock
RWLock: 对读锁和写锁进行分开。 当共享的内存被读锁锁住时,另外一个读锁去锁它时候,不是互斥锁。当写锁遇到它的时候才是互斥的。 比完全互斥锁的情况效率高一些。

代码示例

非线程安全

go
func TestCounter(t *testing.T) {  
counter := 0  
for i := 0; i < 5000; i++ {  
go func() {  
counter++  
}()  
}  
time.Sleep(1 * time.Second)  
t.Log(counter) // 线程不安全 结果不是5000  
}
func TestCounter(t *testing.T) {  
counter := 0  
for i := 0; i < 5000; i++ {  
go func() {  
counter++  
}()  
}  
time.Sleep(1 * time.Second)  
t.Log(counter) // 线程不安全 结果不是5000  
}

线程安全

go
func TestCounterThreadSafe(t *testing.T) {  
var mut sync.Mutex  
counter := 0  
for i := 0; i < 5000; i++ {  
go func() {  
defer func() {  
mut.Unlock()  
}()  
mut.Lock()  
counter++  
}()  
}  
time.Sleep(1 * time.Second)  
t.Log(counter) // 线程安全 结果5000  
}
func TestCounterThreadSafe(t *testing.T) {  
var mut sync.Mutex  
counter := 0  
for i := 0; i < 5000; i++ {  
go func() {  
defer func() {  
mut.Unlock()  
}()  
mut.Lock()  
counter++  
}()  
}  
time.Sleep(1 * time.Second)  
t.Log(counter) // 线程安全 结果5000  
}

wait 等待线程完成在往下执行。只有当 wait 的完成完后才往下执行

go
func TestWaitGroup(t *testing.T) {  
// 等待线程完成在往下执行。  
// 只有当wait的完成完后才往下执行  
/**  
var wg sync.WaitGroup  
for i := 0; i < 5000; i++ {  
wg.Add(1)  
go func() {  
defer func() {  
wg.Done()  
}()  
//do some thing  
}()  
}  
wg.Wait()  
*/  
var wg sync.WaitGroup  
var mut sync.Mutex  
counter := 0  
for i := 0; i < 5000; i++ {  
//增加等待的量  
wg.Add(1)  
go func() {  
defer func() {  
mut.Unlock()  
}()  
mut.Lock()  
counter++  
//告知任务结束  
wg.Done()  
}()  
}  
//等待都执行完成  
wg.Wait()  
t.Log(counter) // 线程安全 结果5000  
}
func TestWaitGroup(t *testing.T) {  
// 等待线程完成在往下执行。  
// 只有当wait的完成完后才往下执行  
/**  
var wg sync.WaitGroup  
for i := 0; i < 5000; i++ {  
wg.Add(1)  
go func() {  
defer func() {  
wg.Done()  
}()  
//do some thing  
}()  
}  
wg.Wait()  
*/  
var wg sync.WaitGroup  
var mut sync.Mutex  
counter := 0  
for i := 0; i < 5000; i++ {  
//增加等待的量  
wg.Add(1)  
go func() {  
defer func() {  
mut.Unlock()  
}()  
mut.Lock()  
counter++  
//告知任务结束  
wg.Done()  
}()  
}  
//等待都执行完成  
wg.Wait()  
t.Log(counter) // 线程安全 结果5000  
}

CSP 并发机制(channel)

CPS (communicating sequential process)并发机制

依赖通道来完成两个实体间的通信。

20200130158038447911577.png
20200130158038448661074.png

和 Actor 的直接通信不同,gsp 模式则是通过 channel 进行通信的,更松散一些。
Go 中的 channel 是有容量限制并且独立于处理 Groutine ,而如 erlang,actor 模式中的 mailbox 容量是无限的,接收进行总是被动的处理消息。

channel 分为两种模式,如下图:
20200130158038496929185.png

方式 1. 通信的两端必须同时在 chanel 上,一方不在的话就会进行等待阻塞。
方式 2. bufferChannel 容量有限制

普通串行

go
func service() string {  
time.Sleep(time.Millisecond * 50)  
return "Done"  
}  
  
func otherTask() {  
fmt.Println("---1s working on something else ")  
time.Sleep(time.Millisecond * 1000)  
fmt.Println("---1s Task is done.")  
}  
  
func TestService(t *testing.T) {  
fmt.Println(service())  
otherTask()  
/*  
=== RUN TestService  
Done  
---1s working on something else  
---1s Task is done.  
--- PASS: TestService (1.06s)  
PASS  
可以发现他们是串行的  
*/  
}
func service() string {  
time.Sleep(time.Millisecond * 50)  
return "Done"  
}  
  
func otherTask() {  
fmt.Println("---1s working on something else ")  
time.Sleep(time.Millisecond * 1000)  
fmt.Println("---1s Task is done.")  
}  
  
func TestService(t *testing.T) {  
fmt.Println(service())  
otherTask()  
/*  
=== RUN TestService  
Done  
---1s working on something else  
---1s Task is done.  
--- PASS: TestService (1.06s)  
PASS  
可以发现他们是串行的  
*/  
}

不带 buffer

go
/*  
实现类似java futureService的运行方式  
*/  
func AsyncService() chan string {  
// 没有使用buffer  
retCh := make(chan string)  
// 使用buffer的情况  
//retCh := make(chan string, 1)  
// 调用时启动另外一个协程,不阻塞当前协程  
go func() {  
ret := service() // "Done" 50ms  
fmt.Println("return result.")  
//运行完将结果放到channel  
retCh <- ret  
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?  
//否。 没有使用buffer. 实际上会被阻塞  
fmt.Println("service exited.")  
}()  
// 返回channel,在channel上等待  
return retCh  
}
/*  
实现类似java futureService的运行方式  
*/  
func AsyncService() chan string {  
// 没有使用buffer  
retCh := make(chan string)  
// 使用buffer的情况  
//retCh := make(chan string, 1)  
// 调用时启动另外一个协程,不阻塞当前协程  
go func() {  
ret := service() // "Done" 50ms  
fmt.Println("return result.")  
//运行完将结果放到channel  
retCh <- ret  
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?  
//否。 没有使用buffer. 实际上会被阻塞  
fmt.Println("service exited.")  
}()  
// 返回channel,在channel上等待  
return retCh  
}

调用:

go
func TestAsyncService(t *testing.T) {  
retCh := AsyncService()  
otherTask() // 1 s  
fmt.Println(<-retCh) //从channel取数据  
/*  
=== RUN TestAsyncService  
---1s working on something else  
return result.  
---1s Task is done.  
Done  
service exited.  
--- PASS: TestAsyncService (1.00s)  
PASS  
*/  
}
func TestAsyncService(t *testing.T) {  
retCh := AsyncService()  
otherTask() // 1 s  
fmt.Println(<-retCh) //从channel取数据  
/*  
=== RUN TestAsyncService  
---1s working on something else  
return result.  
---1s Task is done.  
Done  
service exited.  
--- PASS: TestAsyncService (1.00s)  
PASS  
*/  
}

带 buffer

go
func AsyncBufferService() chan string {  
// 使用buffer的情况  
retCh := make(chan string, 1)  
// 调用时启动另外一个协程,不阻塞当前协程  
go func() {  
ret := service() // "Done" 50ms  
fmt.Println("return result.")  
//运行完将结果放到channel  
retCh <- ret  
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?  
//使用了buffer,buffer内会直接执行  
fmt.Println("service exited.")  
}()  
// 返回channel,在channel上等待  
return retCh  
}
func AsyncBufferService() chan string {  
// 使用buffer的情况  
retCh := make(chan string, 1)  
// 调用时启动另外一个协程,不阻塞当前协程  
go func() {  
ret := service() // "Done" 50ms  
fmt.Println("return result.")  
//运行完将结果放到channel  
retCh <- ret  
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?  
//使用了buffer,buffer内会直接执行  
fmt.Println("service exited.")  
}()  
// 返回channel,在channel上等待  
return retCh  
}

调用示例:

go
func TestAsyncService(t *testing.T) {  
retChBuffer := AsyncBufferService()  
otherTask()  
fmt.Println(<-retChBuffer)  
  
/*  
=== RUN TestAsyncService  
---1s working on something else  
return result.  
service exited.  
---1s Task is done.  
Done  
--- PASS: TestAsyncService (1.00s)  
*/  
}
func TestAsyncService(t *testing.T) {  
retChBuffer := AsyncBufferService()  
otherTask()  
fmt.Println(<-retChBuffer)  
  
/*  
=== RUN TestAsyncService  
---1s working on something else  
return result.  
service exited.  
---1s Task is done.  
Done  
--- PASS: TestAsyncService (1.00s)  
*/  
}

多路选择和超时

20200130158038986977778.png

多渠道选择:
任何一个不阻塞,就会执行 case 语句,和 case 的写的顺序无关。
如果都没准备好,直接default

超时的控制:

time.After(...)
time.After(...)

示例代码:

go
func AsyncService() chan string {  
// 没有使用buffer  
retCh := make(chan string)  
// 使用buffer的情况  
//retCh := make(chan string, 1)  
// 调用时启动另外一个协程,不阻塞当前协程  
go func() {  
ret := service() // "Done" 50ms  
fmt.Println("return result.")  
//运行完将结果放到channel  
retCh <- ret  
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?  
//否。 没有使用buffer. 实际上会被阻塞  
fmt.Println("service exited.")  
}()  
// 返回channel,在channel上等待  
return retCh  
}  
  
func TestSelect(t *testing.T) {  
ret := <-AsyncService() // 50ms  
t.Log(ret)  
}  
  
func TestTimeoutSelect(t *testing.T) {  
select {  
case ret := <-AsyncService(): // 50ms  
t.Log(ret)  
case <-time.After(time.Millisecond * 30):  
t.Error("time out") // 因为是超过30ms 执行此处  
}  
}
func AsyncService() chan string {  
// 没有使用buffer  
retCh := make(chan string)  
// 使用buffer的情况  
//retCh := make(chan string, 1)  
// 调用时启动另外一个协程,不阻塞当前协程  
go func() {  
ret := service() // "Done" 50ms  
fmt.Println("return result.")  
//运行完将结果放到channel  
retCh <- ret  
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?  
//否。 没有使用buffer. 实际上会被阻塞  
fmt.Println("service exited.")  
}()  
// 返回channel,在channel上等待  
return retCh  
}  
  
func TestSelect(t *testing.T) {  
ret := <-AsyncService() // 50ms  
t.Log(ret)  
}  
  
func TestTimeoutSelect(t *testing.T) {  
select {  
case ret := <-AsyncService(): // 50ms  
t.Log(ret)  
case <-time.After(time.Millisecond * 30):  
t.Error("time out") // 因为是超过30ms 执行此处  
}  
}

Context 与任务取消

任务可能是关联的,任务又包括子任务。 1.9 以后引入了 context

  • 根 context: 通过 context.Background()创建
  • 子 context:context.WithCancel(parentContext)
  • ctx,cancel "= context.WithCancel(context.Background())
  • 当前 Context 被取消时,基于它的子 context 都会取消
  • 接收取消通知 <-ctx.Done()
go
func isCanceled(ctx context.Context) bool {  
select {  
case <-ctx.Done():  
return true  
default:  
return false  
}  
}  
  
func TestContextCancel(t *testing.T) {  
ctx, cancel := context.WithCancel(context.Background())  
for i := 0; i < 5; i++ {  
go func(i int, ctx context.Context) {  
for {  
if isCanceled(ctx) {  
break  
}  
time.Sleep(time.Millisecond * 5)  
}  
// 上面的死循环被打破,才会打印  
fmt.Println(i, "Canceled")  
}(i, ctx)  
}  
cancel()  
time.Sleep(time.Second * 1)  
}  
  
=== RUN TestContextCancel  
3 Canceled  
4 Canceled  
1 Canceled  
0 Canceled  
2 Canceled  
--- PASS: TestContextCancel (1.00s)
func isCanceled(ctx context.Context) bool {  
select {  
case <-ctx.Done():  
return true  
default:  
return false  
}  
}  
  
func TestContextCancel(t *testing.T) {  
ctx, cancel := context.WithCancel(context.Background())  
for i := 0; i < 5; i++ {  
go func(i int, ctx context.Context) {  
for {  
if isCanceled(ctx) {  
break  
}  
time.Sleep(time.Millisecond * 5)  
}  
// 上面的死循环被打破,才会打印  
fmt.Println(i, "Canceled")  
}(i, ctx)  
}  
cancel()  
time.Sleep(time.Second * 1)  
}  
  
=== RUN TestContextCancel  
3 Canceled  
4 Canceled  
1 Canceled  
0 Canceled  
2 Canceled  
--- PASS: TestContextCancel (1.00s)

典型的并发任务举例

仅执行一次 (单例模式:懒汉式,线程安全)

使用 sync.Once 的 Do 方法

go
var once sync.Once  
var obj *SingletonObj  
  
func GetSingletonObj() * SingletonObj {  
once.Do(func(){  
obj = &SingletonObj{}  
})  
return obj  
}
var once sync.Once  
var obj *SingletonObj  
  
func GetSingletonObj() * SingletonObj {  
once.Do(func(){  
obj = &SingletonObj{}  
})  
return obj  
}

示例

go
type Singleton struct {  
}  
  
var singleInstance *Singleton  
var once sync.Once  
  
func GetSingletonObj() *Singleton {  
once.Do(func() {  
fmt.Println("Create obj")  
singleInstance = new(Singleton)  
})  
return singleInstance  
}  
  
func TestOnce(t *testing.T) {  
var xx = GetSingletonObj()  
var xx1 = GetSingletonObj()  
t.Log(xx)  
t.Log(xx1)  
  
fmt.Printf("---%x\n", unsafe.Pointer(xx))  
fmt.Printf("---%x\n", unsafe.Pointer(xx1))  
}
type Singleton struct {  
}  
  
var singleInstance *Singleton  
var once sync.Once  
  
func GetSingletonObj() *Singleton {  
once.Do(func() {  
fmt.Println("Create obj")  
singleInstance = new(Singleton)  
})  
return singleInstance  
}  
  
func TestOnce(t *testing.T) {  
var xx = GetSingletonObj()  
var xx1 = GetSingletonObj()  
t.Log(xx)  
t.Log(xx1)  
  
fmt.Printf("---%x\n", unsafe.Pointer(xx))  
fmt.Printf("---%x\n", unsafe.Pointer(xx1))  
}

仅需任意任务完成

仅需任意任务完成,使用 channel 。需要使用 buffer 否则会阻塞协程。

go
func runTask(id int) string {  
time.Sleep(10 * time.Millisecond)  
return fmt.Sprintf("the result is from %d", id)  
}  
  
func FirstResponse() string {  
numOfRunner := 10  
ch := make(chan string, numOfRunner)  
for i := 0; i < numOfRunner; i++ {  
go func(i int) {  
ret := runTask(i)  
ch <- ret  
}(i)  
}  
// 一旦有数据,不阻塞,直接执行。 随后会阻塞,等待消息被取走  
// buffer解耦  
return <-ch  
}  
  
func TestFirstResponse(t *testing.T) {  
t.Log("Before:", runtime.NumGoroutine()) // 2  
t.Log(FirstResponse())  
time.Sleep(time.Second * 2)  
t.Log("After:", runtime.NumGoroutine()) // no buffer 11. buffer 2  
}
func runTask(id int) string {  
time.Sleep(10 * time.Millisecond)  
return fmt.Sprintf("the result is from %d", id)  
}  
  
func FirstResponse() string {  
numOfRunner := 10  
ch := make(chan string, numOfRunner)  
for i := 0; i < numOfRunner; i++ {  
go func(i int) {  
ret := runTask(i)  
ch <- ret  
}(i)  
}  
// 一旦有数据,不阻塞,直接执行。 随后会阻塞,等待消息被取走  
// buffer解耦  
return <-ch  
}  
  
func TestFirstResponse(t *testing.T) {  
t.Log("Before:", runtime.NumGoroutine()) // 2  
t.Log(FirstResponse())  
time.Sleep(time.Second * 2)  
t.Log("After:", runtime.NumGoroutine()) // no buffer 11. buffer 2  
}

所有的任务完成

go
func myTask(id int) string {  
time.Sleep(10 * time.Millisecond)  
return fmt.Sprintf("the result is from %d", id)  
}  
  
func AllResponse() string {  
numOfRunner := 10  
ch := make(chan string, numOfRunner)  
for i := 0; i < numOfRunner; i++ {  
go func(i int) {  
ret := myTask(i)  
ch <- ret  
}(i)  
}  
finalRet := ""  
for j := 0; j < numOfRunner; j++ {  
finalRet += <-ch + "\n"  
}  
return finalRet  
}  
  
func TestAllResponse(t *testing.T) {  
t.Log("Before:", runtime.NumGoroutine()) // 2  
t.Log("xxx:", AllResponse(), "xxx\n")  
time.Sleep(time.Second * 2)  
t.Log("After:", runtime.NumGoroutine()) // no buffer 11. buffer 2  
}
func myTask(id int) string {  
time.Sleep(10 * time.Millisecond)  
return fmt.Sprintf("the result is from %d", id)  
}  
  
func AllResponse() string {  
numOfRunner := 10  
ch := make(chan string, numOfRunner)  
for i := 0; i < numOfRunner; i++ {  
go func(i int) {  
ret := myTask(i)  
ch <- ret  
}(i)  
}  
finalRet := ""  
for j := 0; j < numOfRunner; j++ {  
finalRet += <-ch + "\n"  
}  
return finalRet  
}  
  
func TestAllResponse(t *testing.T) {  
t.Log("Before:", runtime.NumGoroutine()) // 2  
t.Log("xxx:", AllResponse(), "xxx\n")  
time.Sleep(time.Second * 2)  
t.Log("After:", runtime.NumGoroutine()) // no buffer 11. buffer 2  
}

对象池

经常遇见,例如数据库连接,网络连接。池化,避免重复创建。

GO: 使用 buffered channel 实现对象池

go
type ReusableObj struct {  
}  
  
type ObjPool struct {  
bufChan chan *ReusableObj  
}  
  
func NewObjPool(numObj int) *ObjPool {  
objPool := ObjPool{}  
objPool.bufChan = make(chan *ReusableObj, numObj)  
for i := 0; i < 10; i++ {  
objPool.bufChan <- &ReusableObj{}  
}  
return &objPool  
}  
  
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {  
select {  
case ret := <-p.bufChan:  
return ret, nil  
case <-time.After(timeout): // 超时控制  
return nil, errors.New("time out")  
}  
}  
  
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {  
select {  
// size已经满了的情况是放不进去的  
case p.bufChan <- obj:  
return nil  
default:  
return errors.New("overflow")  
}  
}  
  
func TestPool(t *testing.T) {  
pool := NewObjPool(10)  
for i := 0; i < 11; i++ {  
if v, err := pool.GetObj(time.Second * 1); err != nil {  
fmt.Println(err)  
} else {  
fmt.Printf("v:%T \n", v)  
if err := pool.ReleaseObj(v); err != nil {  
t.Error(err)  
}  
}  
}  
fmt.Printf("Done")  
}
type ReusableObj struct {  
}  
  
type ObjPool struct {  
bufChan chan *ReusableObj  
}  
  
func NewObjPool(numObj int) *ObjPool {  
objPool := ObjPool{}  
objPool.bufChan = make(chan *ReusableObj, numObj)  
for i := 0; i < 10; i++ {  
objPool.bufChan <- &ReusableObj{}  
}  
return &objPool  
}  
  
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {  
select {  
case ret := <-p.bufChan:  
return ret, nil  
case <-time.After(timeout): // 超时控制  
return nil, errors.New("time out")  
}  
}  
  
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {  
select {  
// size已经满了的情况是放不进去的  
case p.bufChan <- obj:  
return nil  
default:  
return errors.New("overflow")  
}  
}  
  
func TestPool(t *testing.T) {  
pool := NewObjPool(10)  
for i := 0; i < 11; i++ {  
if v, err := pool.GetObj(time.Second * 1); err != nil {  
fmt.Println(err)  
} else {  
fmt.Printf("v:%T \n", v)  
if err := pool.ReleaseObj(v); err != nil {  
t.Error(err)  
}  
}  
}  
fmt.Printf("Done")  
}

sync.pool 对象缓存

  • 尝试从私有对象获取
  • 私有对象不存在,尝试从当前的 Processor 的共享池获取
  • 如果当前 Processor 共享池也是空的,那么尝试从 Processor 的共享池获取
  • 如果所有的子池都是空的,最后就用用户指定的 New 函数产生一个新的对象返回

20200131158045064229745.png

对象的放回

  • 如果私有对象不存在则保存为私有对象
  • 如果私有对象存在,放入当前 Processor 子池的共享池中

声明周期

  • GC 会清除 sync.pool 缓存的对象
  • 对象的缓存有效期为下一次 GC 之前

总结

  • 适用于通过复用,降低复杂对象的创建和 GC
  • 协程安全,会有锁的开销
  • 声明周期受 GC 影响,不适合于做连接池等需要自己管理声明周期的资源的池化
go
func TestSyncPool(t *testing.T) {  
pool := &sync.Pool{  
New: func() interface{} {  
fmt.Println("create a new project.")  
return 100  
},  
}  
v := pool.Get().(int)  
fmt.Println(v)  
pool.Put(3)  
//runtime.GC() //GC会清除sync.pool中的缓存对象 调用后v1 = 100 (重新创建)  
v1, _ := pool.Get().(int)  
fmt.Println(v1) // 3  
}
func TestSyncPool(t *testing.T) {  
pool := &sync.Pool{  
New: func() interface{} {  
fmt.Println("create a new project.")  
return 100  
},  
}  
v := pool.Get().(int)  
fmt.Println(v)  
pool.Put(3)  
//runtime.GC() //GC会清除sync.pool中的缓存对象 调用后v1 = 100 (重新创建)  
v1, _ := pool.Get().(int)  
fmt.Println(v1) // 3  
}