Go笔记(二):面向对象、错误处理、协程、并发
面向对象编程
go 是面向对象的语言吗?
官方回答:Yes and no 即是也不是
结构的封装
定义一个 struct
type Employee struct {
Id string
Name string
Age int
}
type Employee struct {
Id string
Name string
Age int
}
struct 的初始化
e := Employee{"0", "Bob", 20}
e1 := Employee{Name: "Mike", Age: 30}
e11 := Employee{}
e2 := new(Employee) //返回指针 相当于 &Employee{}e2.Id = "22"
e2.Age = 22
e2.Name = "xiaoming"
e := Employee{"0", "Bob", 20}
e1 := Employee{Name: "Mike", Age: 30}
e11 := Employee{}
e2 := new(Employee) //返回指针 相当于 &Employee{}e2.Id = "22"
e2.Age = 22
e2.Name = "xiaoming"
行为的封装
行为和方法的定义,支持两种方法,如下:
// 1. 实例对应方法被调用时,实例的成员会进行值复制
func (e Employee) String() string {
return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)
}
// 2. 通常为了避免内存拷贝我们使用第二种定义方式
func (e *Employee) String() string {
// 通过指针访问,直接.就行
return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)
}
// 1. 实例对应方法被调用时,实例的成员会进行值复制
func (e Employee) String() string {
return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)
}
// 2. 通常为了避免内存拷贝我们使用第二种定义方式
func (e *Employee) String() string {
// 通过指针访问,直接.就行
return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)
}
尝试:
func (e *Employee) String() string {
fmt.Printf("address is %x\n", unsafe.Pointer(&e.Name))
// 通过指针访问,直接.就行
return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)
}
func TestStructOperation(t *testing.T) {
e := Employee{"1", "bob", 20}
fmt.Printf("address1 is %x \n", unsafe.Pointer(&e.Name))
t.Log(e.String())
}
//打印结果, 可以看到地址是相同的
// address1 is c0000723a0
// address is c0000723a0
func (e *Employee) String() string {
fmt.Printf("address is %x\n", unsafe.Pointer(&e.Name))
// 通过指针访问,直接.就行
return fmt.Sprintf("id:%s name:%s age:%d", e.Id, e.Name, e.Age)
}
func TestStructOperation(t *testing.T) {
e := Employee{"1", "bob", 20}
fmt.Printf("address1 is %x \n", unsafe.Pointer(&e.Name))
t.Log(e.String())
}
//打印结果, 可以看到地址是相同的
// address1 is c0000723a0
// address is c0000723a0
另外一种方式,打印出的地址则是不同的。即结构被复制,会有额外的开销。
接口
接口特性:
- 接口是非侵入性的,实现不依赖接口定义
- 所以接口的定义可以包含在接口使用者包内
type Programmer interface {
WriteHelloWorld() string
}
type GoProgrammer struct {
}
// "duck type" 方法签名看起来是一样的。
func (p *GoProgrammer) WriteHelloWorld() string {
return "hello world"
}
func TestInterface(t *testing.T) {
var p Programmer
p = new(GoProgrammer)
t.Log(p.WriteHelloWorld())
}
type Programmer interface {
WriteHelloWorld() string
}
type GoProgrammer struct {
}
// "duck type" 方法签名看起来是一样的。
func (p *GoProgrammer) WriteHelloWorld() string {
return "hello world"
}
func TestInterface(t *testing.T) {
var p Programmer
p = new(GoProgrammer)
t.Log(p.WriteHelloWorld())
}
go 中实现继承和 override
go 中是没有继承的,不过可以实现类似的效果。可以结合匿名嵌套类型实现。
type Pet struct {
}
func (p *Pet) Speak() {
fmt.Print("...")
}
func (p *Pet) SpeakTo(host string) {
p.Speak()
fmt.Println(" ", host)
}
// 匿名嵌套类型
/// 可以实现类似继承的效果,但是有所不同
type Dog1 struct {
Pet
}
type Pet struct {
}
func (p *Pet) Speak() {
fmt.Print("...")
}
func (p *Pet) SpeakTo(host string) {
p.Speak()
fmt.Println(" ", host)
}
// 匿名嵌套类型
/// 可以实现类似继承的效果,但是有所不同
type Dog1 struct {
Pet
}
根据需要求进行方法的重写:
func (p *Dog1) Speak() {
fmt.Print("dog1 ...")
}
func (p *Dog1) SpeakTo(host string) {
p.Speak()
fmt.Println("dog1 ", host)
}
func TestDog(t *testing.T) {
dog := new(Dog1)
dog.SpeakTo("hhh")
dog.Speak()
}
func (p *Dog1) Speak() {
fmt.Print("dog1 ...")
}
func (p *Dog1) SpeakTo(host string) {
p.Speak()
fmt.Println("dog1 ", host)
}
func TestDog(t *testing.T) {
dog := new(Dog1)
dog.SpeakTo("hhh")
dog.Speak()
}
多态
多态就是同一个接口,使用不同的实例而执行不同操作,如 interface 打印; 彩色相机和黑白相机执行效果不同
type MyProgrammer interface {
sayHello() string
}
type Golang struct {
}
type Java struct {
}
type Ruby struct {
}
func (p *Golang) sayHello() string {
return "go white hello world"
}
func (p *Java) sayHello() string {
return "java white hello world"
}
func callSayHello(p MyProgrammer) {
fmt.Println(p.sayHello())
}
func TestDuotai(t *testing.T) {
goProg := new(Golang)
javaProg := &Java{}
// 需要操作指针
callSayHello(goProg)
callSayHello(javaProg)
//callSayHello(new(Ruby)) 因为没有实现方法,这里编译报错
}
type MyProgrammer interface {
sayHello() string
}
type Golang struct {
}
type Java struct {
}
type Ruby struct {
}
func (p *Golang) sayHello() string {
return "go white hello world"
}
func (p *Java) sayHello() string {
return "java white hello world"
}
func callSayHello(p MyProgrammer) {
fmt.Println(p.sayHello())
}
func TestDuotai(t *testing.T) {
goProg := new(Golang)
javaProg := &Java{}
// 需要操作指针
callSayHello(goProg)
callSayHello(javaProg)
//callSayHello(new(Ruby)) 因为没有实现方法,这里编译报错
}
空接口和断言
- 空接口可以表示任何类型 类似 void* 或 java 的 Object
- 通过断言将空接口转换为定制类型: v,ok = p.(int)
func DoSomething(p interface{}) {
/**
if i, ok := p.(int); ok {
fmt.Println("int", i)
return
}
if i, ok := p.(string); ok {
fmt.Println("string", i)
return
}
fmt.Println("Unknown Type")
*/
//可以将上述代码进行简化
switch v := p.(type) {
case int:
fmt.Println("int", v)
case string:
fmt.Println("string", v)
default:
fmt.Println("unknown")
}
}
func TestConvert(t *testing.T) {
DoSomething(10)
DoSomething("aa")
DoSomething(true)
//int 10
//string aa
//unknown
}
func DoSomething(p interface{}) {
/**
if i, ok := p.(int); ok {
fmt.Println("int", i)
return
}
if i, ok := p.(string); ok {
fmt.Println("string", i)
return
}
fmt.Println("Unknown Type")
*/
//可以将上述代码进行简化
switch v := p.(type) {
case int:
fmt.Println("int", v)
case string:
fmt.Println("string", v)
default:
fmt.Println("unknown")
}
}
func TestConvert(t *testing.T) {
DoSomething(10)
DoSomething("aa")
DoSomething(true)
//int 10
//string aa
//unknown
}
最佳实践
Go 接口最佳实践:
1). 倾向于使用小的接口定义,很多接口只包含一个方法
2). 较大的接口定义,可以由多个小接口组合而成
type ReadWrite interface {
Reader
Write
}
type ReadWrite interface {
Reader
Write
}
3). 只依赖必要功能的最小接口. 方便复用
func StoreData(reader Reader) error{...}
func StoreData(reader Reader) error{...}
错误处理
Go 错误机制
- 没有异常机制
- error 类型实现了 error 接口
3.可以通过 errors.New 来快速创建错误实例
type error interface {
Error() string
}
func TestName(t *testing.T) {
errors.New("xx")
}
type error interface {
Error() string
}
func TestName(t *testing.T) {
errors.New("xx")
}
因为方法可以返回多个参数,可以使用如下方式,类似 swift 的 Result 库:
var LessThanTwoError = errors.New("n should not less than 2")
var LargerThan100Error = errors.New("n should not large than 100")
func GetFibonacci(n int) ([]int, error) {
//及早失败,避免嵌套
if n < 2 {
return nil, LessThanTwoError
}
if n > 100 {
return nil, LargerThan100Error
}
fibList := []int{1, 1}
for i := 2; i < n; i++ {
fibList = append(fibList, fibList[i-2]+fibList[i-1])
}
return fibList, nil
}
var LessThanTwoError = errors.New("n should not less than 2")
var LargerThan100Error = errors.New("n should not large than 100")
func GetFibonacci(n int) ([]int, error) {
//及早失败,避免嵌套
if n < 2 {
return nil, LessThanTwoError
}
if n > 100 {
return nil, LargerThan100Error
}
fibList := []int{1, 1}
for i := 2; i < n; i++ {
fibList = append(fibList, fibList[i-2]+fibList[i-1])
}
return fibList, nil
}
及早失败,避免嵌套,增加代码的可读性:
func GetFibonacci2(str string) {
var (
i int
err error
list []int
)
//及早失败,避免嵌套
if i, err = strconv.Atoi(str); err != nil {
fmt.Println("error", err)
return
}
if list, err = GetFibonacci(i); err != nil {
fmt.Println("err", err)
return
}
fmt.Println(list)
}
// 调用示例
func TestFab(t *testing.T) {
if v, err := GetFibonacci(1000); err != nil {
t.Log(err)
if err == LessThanTwoError {
t.Log("--2")
}
if err == LargerThan100Error {
t.Log("--100")
}
} else {
t.Log(v)
}
GetFibonacci2("1001")
}
func GetFibonacci2(str string) {
var (
i int
err error
list []int
)
//及早失败,避免嵌套
if i, err = strconv.Atoi(str); err != nil {
fmt.Println("error", err)
return
}
if list, err = GetFibonacci(i); err != nil {
fmt.Println("err", err)
return
}
fmt.Println(list)
}
// 调用示例
func TestFab(t *testing.T) {
if v, err := GetFibonacci(1000); err != nil {
t.Log(err)
if err == LessThanTwoError {
t.Log("--2")
}
if err == LargerThan100Error {
t.Log("--100")
}
} else {
t.Log(v)
}
GetFibonacci2("1001")
}
panic
panic
- panic 用于不可以恢复的错误
- panic 退出前会执行 defer 指定的内容
os.Exit
- 不会调用 defer 指定函数
- 不会打印栈的信息
func TestPanicVxExit(t *testing.T) {
defer func() {
// panic会执行defer的代码,而且打印调用栈的信息
// os.Exit不会
fmt.Println("finally")
}()
fmt.Println("start")
panic(errors.New("Something error"))
//os.Exit(-1)
}
func TestPanicVxExit(t *testing.T) {
defer func() {
// panic会执行defer的代码,而且打印调用栈的信息
// os.Exit不会
fmt.Println("finally")
}()
fmt.Println("start")
panic(errors.New("Something error"))
//os.Exit(-1)
}
recover
recover 可以 java 的类比 try{}catch{},recover 用于错误的恢复:
defer func() {
if error := recover(); err != nil {
//恢复错误
}
}()
defer func() {
if error := recover(); err != nil {
//恢复错误
}
}()
注意: 不要滥用
- 滥用会形成僵尸服务进程,导致 health check 失效
- "let it crash!"往往是我们恢复不确定性错误的好方法
func TestPanicVxExit2(t *testing.T) {
defer func() {
if err := recover(); err != nil {
// 程序没有打印调用栈,正常退出
fmt.Println("final:recover", err) // final:recover Something error
}
}()
fmt.Println("start")
panic(errors.New("Something error"))
}
func TestPanicVxExit2(t *testing.T) {
defer func() {
if err := recover(); err != nil {
// 程序没有打印调用栈,正常退出
fmt.Println("final:recover", err) // final:recover Something error
}
}()
fmt.Println("start")
panic(errors.New("Something error"))
}
package
- 基本复用模块单元:以首字母大写来约定可以被包外代码访问
- 代码的 package 可以和所在的目录不一致
- 同一目录的 go 代码的 package 要保持一致
package
- 通过 go get 来获取远程依赖 go get -u 强制从网路更新远程依赖
- 注意代码在 github 上的组织形式,以适应 go get。 直接从代码路径开始,不要有 src
go path 的问题:
- 可以在 ide 中设置 GOPATH 的,但是可能会导致在 ide 外调试时不一致、 某些 ide 可能会有些问题
- 作者建议使用.bash_profile(macos)中设置
(笔者用的 goland,直接从 GOPATH 中设置 PROJECT GOPATH 为当前的项目目录。
Global GOPATH 设置为默认安装目录/User/用户名/go)
import (
"lession/series"
// 可以起一个别名
cm "src/github.com/easierway/concurrent_map"
"testing"
)
func TestPackage(t *testing.T) {
t.Log(series.GetFibonacciSeries(10))
//t.Log(series.getFibonacciSeries(10))
}
import (
"lession/series"
// 可以起一个别名
cm "src/github.com/easierway/concurrent_map"
"testing"
)
func TestPackage(t *testing.T) {
t.Log(series.GetFibonacciSeries(10))
//t.Log(series.getFibonacciSeries(10))
}
init 方法
- main 被执行前,所有依赖的 package 的 init 方法都会被执行
- 不同包的 init 函数按照包导入的依赖关系决定执行顺序
- 每个包可以有多个 init 函数
- 每个源文件可有多个 init 函数
使用远程的 package
使用远程的 package 如: https://github.com/easierway/concurrent_map.git
go get -u github.com/easierway/concurrent_map
(发现目录多了 pkg 、src 目录)
如果自己的代码提交到 github 并且适应 go get:
- 直接以代码路径开始,不要有 src
问题
Go 存在的依赖问题
- 同一个环境下,不同项目使用同一包的不同版本 (因为 go get 下来会放到同一个 go 目录)
- 无法管理对包的特定版本的依赖
为了解决这些问题
随着 Go 1.5 release 版本发布,vendor 目录被添加到除了 GoPATH 和 GOROOT 之外的依赖目录查找的解决方案。
在 Go 1.6 之前,需要手动设置环境变量。
查找依赖包路径的优先级如下:
- 当前包下的 vendor 目录
- 想上级目录查找,知道找到 src 下的 vendor 目录
- 在 GOPATH 下面查找依赖包
- 在 GOROOT 目录下查找
常用的依赖管理工具
- godep
- glide
brew install glide
- dep
glide 说明
- 安装 brew install glide
- 进入目录,执行 glide init
- glide install
groutine 协程
Thead vs Groutine
1). 创建时默认的 stack(栈)的大小
- JDK5 以后默认 Java Thread stack 默认是 1M
- Groutine 的 Stack 初始化大小为 2k,创建起来也更快
2). 和 系统线程 KSE (Kernel Space Entity)的对应关系
- java thread 是 1:1
- Groutine 是 M:N 多对多
如果 thread 和 KSE 是 1:1 关系的话。KSE 是 CPU 直接调度,效率很高,但是如果线程之间发生切换,会牵扯到内核对象的切换,开销很大。
如果多个协程都在和一个内核对应,那么久减少了开销,go 就是这么做的。
P 为 go 语言的协程处理器。
P 都在系统线程里,每个 P 都挂着一些携程队列 G。有的是正在运行状态的,如 G0.
如果有一个 G 非常占用时间,那么队列的 G 会不会被延迟很久?
不会。Go 有守护线程,进行计数,计算每个 P 完成 G 的数量,如果某一个 P 一段时间完成的数量没有变化。
就会往携程的任务栈里插入特别的标记,当 G 运行的时候遇到非内联函数(?)就会读到这个标记,把自己中断,放到队尾。
?另外一个机制,当某个 G 被系统中断了,io 需要等待时(?),P 将自己移到另一个可使用的线程中,继续执行他的队列的 G.
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
// Go的方法调用传递时都是值传递,i被复制了一份,地址是不同的。因此可以执行
go func(i int) {
fmt.Println(i)
}(i)
}
time.Sleep(time.Millisecond * 50)
// 可以看到打印是乱序的 (调用程序一定要用function(){}(参数))
}
func TestGroutine(t *testing.T) {
for i := 0; i < 10; i++ {
// Go的方法调用传递时都是值传递,i被复制了一份,地址是不同的。因此可以执行
go func(i int) {
fmt.Println(i)
}(i)
}
time.Sleep(time.Millisecond * 50)
// 可以看到打印是乱序的 (调用程序一定要用function(){}(参数))
}
共享内存并发机制(锁)
类比其他语言:使用锁进行并发控制。
Lock lock = ...
lock.lock();
try {
//process (thread-safe)
}catch(e){
}
finally{
lock.unlock()
}
Lock lock = ...
lock.lock();
try {
//process (thread-safe)
}catch(e){
}
finally{
lock.unlock()
}
go:
package sync
Mutex
: 可 lock 和 unlockRWLock
: 对读锁和写锁进行分开。 当共享的内存被读锁锁住时,另外一个读锁去锁它时候,不是互斥锁。当写锁遇到它的时候才是互斥的。 比完全互斥锁的情况效率高一些。
代码示例
非线程安全
func TestCounter(t *testing.T) {
counter := 0
for i := 0; i < 5000; i++ {
go func() {
counter++
}()
}
time.Sleep(1 * time.Second)
t.Log(counter) // 线程不安全 结果不是5000
}
func TestCounter(t *testing.T) {
counter := 0
for i := 0; i < 5000; i++ {
go func() {
counter++
}()
}
time.Sleep(1 * time.Second)
t.Log(counter) // 线程不安全 结果不是5000
}
线程安全
func TestCounterThreadSafe(t *testing.T) {
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
}()
}
time.Sleep(1 * time.Second)
t.Log(counter) // 线程安全 结果5000
}
func TestCounterThreadSafe(t *testing.T) {
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
}()
}
time.Sleep(1 * time.Second)
t.Log(counter) // 线程安全 结果5000
}
wait 等待线程完成在往下执行。只有当 wait 的完成完后才往下执行
func TestWaitGroup(t *testing.T) {
// 等待线程完成在往下执行。
// 只有当wait的完成完后才往下执行
/**
var wg sync.WaitGroup
for i := 0; i < 5000; i++ {
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
//do some thing
}()
}
wg.Wait()
*/
var wg sync.WaitGroup
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
//增加等待的量
wg.Add(1)
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
//告知任务结束
wg.Done()
}()
}
//等待都执行完成
wg.Wait()
t.Log(counter) // 线程安全 结果5000
}
func TestWaitGroup(t *testing.T) {
// 等待线程完成在往下执行。
// 只有当wait的完成完后才往下执行
/**
var wg sync.WaitGroup
for i := 0; i < 5000; i++ {
wg.Add(1)
go func() {
defer func() {
wg.Done()
}()
//do some thing
}()
}
wg.Wait()
*/
var wg sync.WaitGroup
var mut sync.Mutex
counter := 0
for i := 0; i < 5000; i++ {
//增加等待的量
wg.Add(1)
go func() {
defer func() {
mut.Unlock()
}()
mut.Lock()
counter++
//告知任务结束
wg.Done()
}()
}
//等待都执行完成
wg.Wait()
t.Log(counter) // 线程安全 结果5000
}
CSP 并发机制(channel)
CPS (communicating sequential process)并发机制
依赖通道来完成两个实体间的通信。
和 Actor 的直接通信不同,gsp 模式则是通过 channel 进行通信的,更松散一些。
Go 中的 channel 是有容量限制并且独立于处理 Groutine ,而如 erlang,actor 模式中的 mailbox 容量是无限的,接收进行总是被动的处理消息。
channel 分为两种模式,如下图:
方式 1. 通信的两端必须同时在 chanel 上,一方不在的话就会进行等待阻塞。
方式 2. bufferChannel 容量有限制
普通串行
func service() string {
time.Sleep(time.Millisecond * 50)
return "Done"
}
func otherTask() {
fmt.Println("---1s working on something else ")
time.Sleep(time.Millisecond * 1000)
fmt.Println("---1s Task is done.")
}
func TestService(t *testing.T) {
fmt.Println(service())
otherTask()
/*
=== RUN TestService
Done
---1s working on something else
---1s Task is done.
--- PASS: TestService (1.06s)
PASS
可以发现他们是串行的
*/
}
func service() string {
time.Sleep(time.Millisecond * 50)
return "Done"
}
func otherTask() {
fmt.Println("---1s working on something else ")
time.Sleep(time.Millisecond * 1000)
fmt.Println("---1s Task is done.")
}
func TestService(t *testing.T) {
fmt.Println(service())
otherTask()
/*
=== RUN TestService
Done
---1s working on something else
---1s Task is done.
--- PASS: TestService (1.06s)
PASS
可以发现他们是串行的
*/
}
不带 buffer
/*
实现类似java futureService的运行方式
*/
func AsyncService() chan string {
// 没有使用buffer
retCh := make(chan string)
// 使用buffer的情况
//retCh := make(chan string, 1)
// 调用时启动另外一个协程,不阻塞当前协程
go func() {
ret := service() // "Done" 50ms
fmt.Println("return result.")
//运行完将结果放到channel
retCh <- ret
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?
//否。 没有使用buffer. 实际上会被阻塞
fmt.Println("service exited.")
}()
// 返回channel,在channel上等待
return retCh
}
/*
实现类似java futureService的运行方式
*/
func AsyncService() chan string {
// 没有使用buffer
retCh := make(chan string)
// 使用buffer的情况
//retCh := make(chan string, 1)
// 调用时启动另外一个协程,不阻塞当前协程
go func() {
ret := service() // "Done" 50ms
fmt.Println("return result.")
//运行完将结果放到channel
retCh <- ret
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?
//否。 没有使用buffer. 实际上会被阻塞
fmt.Println("service exited.")
}()
// 返回channel,在channel上等待
return retCh
}
调用:
func TestAsyncService(t *testing.T) {
retCh := AsyncService()
otherTask() // 1 s
fmt.Println(<-retCh) //从channel取数据
/*
=== RUN TestAsyncService
---1s working on something else
return result.
---1s Task is done.
Done
service exited.
--- PASS: TestAsyncService (1.00s)
PASS
*/
}
func TestAsyncService(t *testing.T) {
retCh := AsyncService()
otherTask() // 1 s
fmt.Println(<-retCh) //从channel取数据
/*
=== RUN TestAsyncService
---1s working on something else
return result.
---1s Task is done.
Done
service exited.
--- PASS: TestAsyncService (1.00s)
PASS
*/
}
带 buffer
func AsyncBufferService() chan string {
// 使用buffer的情况
retCh := make(chan string, 1)
// 调用时启动另外一个协程,不阻塞当前协程
go func() {
ret := service() // "Done" 50ms
fmt.Println("return result.")
//运行完将结果放到channel
retCh <- ret
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?
//使用了buffer,buffer内会直接执行
fmt.Println("service exited.")
}()
// 返回channel,在channel上等待
return retCh
}
func AsyncBufferService() chan string {
// 使用buffer的情况
retCh := make(chan string, 1)
// 调用时启动另外一个协程,不阻塞当前协程
go func() {
ret := service() // "Done" 50ms
fmt.Println("return result.")
//运行完将结果放到channel
retCh <- ret
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?
//使用了buffer,buffer内会直接执行
fmt.Println("service exited.")
}()
// 返回channel,在channel上等待
return retCh
}
调用示例:
func TestAsyncService(t *testing.T) {
retChBuffer := AsyncBufferService()
otherTask()
fmt.Println(<-retChBuffer)
/*
=== RUN TestAsyncService
---1s working on something else
return result.
service exited.
---1s Task is done.
Done
--- PASS: TestAsyncService (1.00s)
*/
}
func TestAsyncService(t *testing.T) {
retChBuffer := AsyncBufferService()
otherTask()
fmt.Println(<-retChBuffer)
/*
=== RUN TestAsyncService
---1s working on something else
return result.
service exited.
---1s Task is done.
Done
--- PASS: TestAsyncService (1.00s)
*/
}
多路选择和超时
多渠道选择:
任何一个不阻塞,就会执行 case 语句,和 case 的写的顺序无关。
如果都没准备好,直接default
超时的控制:
time.After(...)
time.After(...)
示例代码:
func AsyncService() chan string {
// 没有使用buffer
retCh := make(chan string)
// 使用buffer的情况
//retCh := make(chan string, 1)
// 调用时启动另外一个协程,不阻塞当前协程
go func() {
ret := service() // "Done" 50ms
fmt.Println("return result.")
//运行完将结果放到channel
retCh <- ret
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?
//否。 没有使用buffer. 实际上会被阻塞
fmt.Println("service exited.")
}()
// 返回channel,在channel上等待
return retCh
}
func TestSelect(t *testing.T) {
ret := <-AsyncService() // 50ms
t.Log(ret)
}
func TestTimeoutSelect(t *testing.T) {
select {
case ret := <-AsyncService(): // 50ms
t.Log(ret)
case <-time.After(time.Millisecond * 30):
t.Error("time out") // 因为是超过30ms 执行此处
}
}
func AsyncService() chan string {
// 没有使用buffer
retCh := make(chan string)
// 使用buffer的情况
//retCh := make(chan string, 1)
// 调用时启动另外一个协程,不阻塞当前协程
go func() {
ret := service() // "Done" 50ms
fmt.Println("return result.")
//运行完将结果放到channel
retCh <- ret
//是否计算完成并将结果返回给了channel,是否就退出,并做下一步的处理?
//否。 没有使用buffer. 实际上会被阻塞
fmt.Println("service exited.")
}()
// 返回channel,在channel上等待
return retCh
}
func TestSelect(t *testing.T) {
ret := <-AsyncService() // 50ms
t.Log(ret)
}
func TestTimeoutSelect(t *testing.T) {
select {
case ret := <-AsyncService(): // 50ms
t.Log(ret)
case <-time.After(time.Millisecond * 30):
t.Error("time out") // 因为是超过30ms 执行此处
}
}
Context 与任务取消
任务可能是关联的,任务又包括子任务。 1.9 以后引入了 context
- 根 context: 通过 context.Background()创建
- 子 context:context.WithCancel(parentContext)
- ctx,cancel "= context.WithCancel(context.Background())
- 当前 Context 被取消时,基于它的子 context 都会取消
- 接收取消通知 <-ctx.Done()
func isCanceled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
func TestContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 5; i++ {
go func(i int, ctx context.Context) {
for {
if isCanceled(ctx) {
break
}
time.Sleep(time.Millisecond * 5)
}
// 上面的死循环被打破,才会打印
fmt.Println(i, "Canceled")
}(i, ctx)
}
cancel()
time.Sleep(time.Second * 1)
}
=== RUN TestContextCancel
3 Canceled
4 Canceled
1 Canceled
0 Canceled
2 Canceled
--- PASS: TestContextCancel (1.00s)
func isCanceled(ctx context.Context) bool {
select {
case <-ctx.Done():
return true
default:
return false
}
}
func TestContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < 5; i++ {
go func(i int, ctx context.Context) {
for {
if isCanceled(ctx) {
break
}
time.Sleep(time.Millisecond * 5)
}
// 上面的死循环被打破,才会打印
fmt.Println(i, "Canceled")
}(i, ctx)
}
cancel()
time.Sleep(time.Second * 1)
}
=== RUN TestContextCancel
3 Canceled
4 Canceled
1 Canceled
0 Canceled
2 Canceled
--- PASS: TestContextCancel (1.00s)
典型的并发任务举例
仅执行一次 (单例模式:懒汉式,线程安全)
使用 sync.Once 的 Do 方法
var once sync.Once
var obj *SingletonObj
func GetSingletonObj() * SingletonObj {
once.Do(func(){
obj = &SingletonObj{}
})
return obj
}
var once sync.Once
var obj *SingletonObj
func GetSingletonObj() * SingletonObj {
once.Do(func(){
obj = &SingletonObj{}
})
return obj
}
示例
type Singleton struct {
}
var singleInstance *Singleton
var once sync.Once
func GetSingletonObj() *Singleton {
once.Do(func() {
fmt.Println("Create obj")
singleInstance = new(Singleton)
})
return singleInstance
}
func TestOnce(t *testing.T) {
var xx = GetSingletonObj()
var xx1 = GetSingletonObj()
t.Log(xx)
t.Log(xx1)
fmt.Printf("---%x\n", unsafe.Pointer(xx))
fmt.Printf("---%x\n", unsafe.Pointer(xx1))
}
type Singleton struct {
}
var singleInstance *Singleton
var once sync.Once
func GetSingletonObj() *Singleton {
once.Do(func() {
fmt.Println("Create obj")
singleInstance = new(Singleton)
})
return singleInstance
}
func TestOnce(t *testing.T) {
var xx = GetSingletonObj()
var xx1 = GetSingletonObj()
t.Log(xx)
t.Log(xx1)
fmt.Printf("---%x\n", unsafe.Pointer(xx))
fmt.Printf("---%x\n", unsafe.Pointer(xx1))
}
仅需任意任务完成
仅需任意任务完成,使用 channel 。需要使用 buffer 否则会阻塞协程。
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("the result is from %d", id)
}
func FirstResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
// 一旦有数据,不阻塞,直接执行。 随后会阻塞,等待消息被取走
// buffer解耦
return <-ch
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine()) // 2
t.Log(FirstResponse())
time.Sleep(time.Second * 2)
t.Log("After:", runtime.NumGoroutine()) // no buffer 11. buffer 2
}
func runTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("the result is from %d", id)
}
func FirstResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := runTask(i)
ch <- ret
}(i)
}
// 一旦有数据,不阻塞,直接执行。 随后会阻塞,等待消息被取走
// buffer解耦
return <-ch
}
func TestFirstResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine()) // 2
t.Log(FirstResponse())
time.Sleep(time.Second * 2)
t.Log("After:", runtime.NumGoroutine()) // no buffer 11. buffer 2
}
所有的任务完成
func myTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("the result is from %d", id)
}
func AllResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := myTask(i)
ch <- ret
}(i)
}
finalRet := ""
for j := 0; j < numOfRunner; j++ {
finalRet += <-ch + "\n"
}
return finalRet
}
func TestAllResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine()) // 2
t.Log("xxx:", AllResponse(), "xxx\n")
time.Sleep(time.Second * 2)
t.Log("After:", runtime.NumGoroutine()) // no buffer 11. buffer 2
}
func myTask(id int) string {
time.Sleep(10 * time.Millisecond)
return fmt.Sprintf("the result is from %d", id)
}
func AllResponse() string {
numOfRunner := 10
ch := make(chan string, numOfRunner)
for i := 0; i < numOfRunner; i++ {
go func(i int) {
ret := myTask(i)
ch <- ret
}(i)
}
finalRet := ""
for j := 0; j < numOfRunner; j++ {
finalRet += <-ch + "\n"
}
return finalRet
}
func TestAllResponse(t *testing.T) {
t.Log("Before:", runtime.NumGoroutine()) // 2
t.Log("xxx:", AllResponse(), "xxx\n")
time.Sleep(time.Second * 2)
t.Log("After:", runtime.NumGoroutine()) // no buffer 11. buffer 2
}
对象池
经常遇见,例如数据库连接,网络连接。池化,避免重复创建。
GO: 使用 buffered channel 实现对象池
type ReusableObj struct {
}
type ObjPool struct {
bufChan chan *ReusableObj
}
func NewObjPool(numObj int) *ObjPool {
objPool := ObjPool{}
objPool.bufChan = make(chan *ReusableObj, numObj)
for i := 0; i < 10; i++ {
objPool.bufChan <- &ReusableObj{}
}
return &objPool
}
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
select {
case ret := <-p.bufChan:
return ret, nil
case <-time.After(timeout): // 超时控制
return nil, errors.New("time out")
}
}
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select {
// size已经满了的情况是放不进去的
case p.bufChan <- obj:
return nil
default:
return errors.New("overflow")
}
}
func TestPool(t *testing.T) {
pool := NewObjPool(10)
for i := 0; i < 11; i++ {
if v, err := pool.GetObj(time.Second * 1); err != nil {
fmt.Println(err)
} else {
fmt.Printf("v:%T \n", v)
if err := pool.ReleaseObj(v); err != nil {
t.Error(err)
}
}
}
fmt.Printf("Done")
}
type ReusableObj struct {
}
type ObjPool struct {
bufChan chan *ReusableObj
}
func NewObjPool(numObj int) *ObjPool {
objPool := ObjPool{}
objPool.bufChan = make(chan *ReusableObj, numObj)
for i := 0; i < 10; i++ {
objPool.bufChan <- &ReusableObj{}
}
return &objPool
}
func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
select {
case ret := <-p.bufChan:
return ret, nil
case <-time.After(timeout): // 超时控制
return nil, errors.New("time out")
}
}
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select {
// size已经满了的情况是放不进去的
case p.bufChan <- obj:
return nil
default:
return errors.New("overflow")
}
}
func TestPool(t *testing.T) {
pool := NewObjPool(10)
for i := 0; i < 11; i++ {
if v, err := pool.GetObj(time.Second * 1); err != nil {
fmt.Println(err)
} else {
fmt.Printf("v:%T \n", v)
if err := pool.ReleaseObj(v); err != nil {
t.Error(err)
}
}
}
fmt.Printf("Done")
}
sync.pool 对象缓存
- 尝试从私有对象获取
- 私有对象不存在,尝试从当前的 Processor 的共享池获取
- 如果当前 Processor 共享池也是空的,那么尝试从 Processor 的共享池获取
- 如果所有的子池都是空的,最后就用用户指定的 New 函数产生一个新的对象返回
对象的放回
- 如果私有对象不存在则保存为私有对象
- 如果私有对象存在,放入当前 Processor 子池的共享池中
声明周期
- GC 会清除 sync.pool 缓存的对象
- 对象的缓存有效期为下一次 GC 之前
总结
- 适用于通过复用,降低复杂对象的创建和 GC
- 协程安全,会有锁的开销
- 声明周期受 GC 影响,不适合于做连接池等需要自己管理声明周期的资源的池化
func TestSyncPool(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("create a new project.")
return 100
},
}
v := pool.Get().(int)
fmt.Println(v)
pool.Put(3)
//runtime.GC() //GC会清除sync.pool中的缓存对象 调用后v1 = 100 (重新创建)
v1, _ := pool.Get().(int)
fmt.Println(v1) // 3
}
func TestSyncPool(t *testing.T) {
pool := &sync.Pool{
New: func() interface{} {
fmt.Println("create a new project.")
return 100
},
}
v := pool.Get().(int)
fmt.Println(v)
pool.Put(3)
//runtime.GC() //GC会清除sync.pool中的缓存对象 调用后v1 = 100 (重新创建)
v1, _ := pool.Get().(int)
fmt.Println(v1) // 3
}