传统的并发形式:多线程共享内存,这也是Java、C#或者C++等语言中的多线程开发的常规方法,其实golang语言也支持这种传统模式,另外一种是Go语言特有的,也是Go语言推荐的:CSP(communicating sequential processes)并发模型。不同于传统的多线程通过共享内存来通信,CSP讲究的是“以通信的方式来共享内存”。
“不要以共享内存的方式来通信,相反,要通过通信来共享内存。”
go语言使用MPG模式来实现CSP
在传统的并发中起很多线程只会加大CPU和内存的开销,太多的线程会大量的消耗计算机硬件资源,造成并发量的瓶颈。
M指的是Machine,一个M直接关联了一个内核线程。
P指的是”processor”,代表了M所需的上下文环境,也是处理用户级代码逻辑的处理器。
G指的是Goroutine,其实本质上也是一种轻量级的线程。
M关联了一个内核线程,通过调度器P(上下文)的调度,可以连接1个或者多个G,相当于把一个内核线程切分成了了N个用户线程,M和P是一对一关系(但是实际调度中关系多变),通过P调度N个G(P和G是一对多关系),实现内核线程和G的多对多关系(M:N),通过这个方式,一个内核线程就可以起N个Goroutine,同样硬件配置的机器可用的用户线程就成几何级增长,并发性大幅提高。
https://i6448038.github.io/2017/12/04/golang-concurrency-principle/
MPG模式运行状态1
1)当前程序有三个M,如果三个M都在一个cpu运行,就是并发,如果在不同的cpu运行就是并行
2)M1,M2,M3正在执行一个G,M1的协程队列有三个,M2的协程队列有三个,M3的协程队列有两个
3)从上图可以看到:Go的协程是轻量级的线程,是逻辑态的,Go可以容易的起上万个协程
4)其他程序c/java的多线程,往往是内核态的,比较重量级,几千个线程就有可能耗光cpu资源
MPG模式运行状态2
1)分成两个部分来看
2)原来的情况是M1主线程正在执行G1协程,另外有三个协程在等待
3)如果G1协程阻塞,比如读取文件或者数据库
4)这时就会创建M2主线程(也可能是从已有的线程池中取出M2)并且将等待的三个协程挂到M2下执行,M1主线程下的G1仍然执行文件的读写
5)这样的MPG调度模式,既可以让G1执行,也不会让队列的其他协程一直阻塞
https://www.cnblogs.com/huangliang-hb/p/12559565.html
https://www.cnblogs.com/nima/p/11751393.html
https://studygolang.com/articles/1045
gopark函数
gopark函数在协程的实现上扮演着非常重要的角色,用于协程的切换,协程切换的原因一般有以下几种情况:
系统调用;
channel读写条件不满足;
抢占式调度时间片结束;
gopark函数做的主要事情分为两点:
解除当前goroutine的m的绑定关系,将当前goroutine状态机切换为等待状态;
调用一次schedule()函数,在局部调度器P发起一轮新的调度。
下面我们来研究一下gopark函数是怎么实现协程切换的。
先看看源码:
func gopark(unlockf func(g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw(“gopark: bad g status”)
}
mp.waitlock = lock
mp.waitunlockf = *(unsafe.Pointer)(unsafe.Pointer(&unlockf))
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can’t do anything that might move the G between Ms here.
mcall(park_m)
}
源码里面最重要的一行就是调用 mcall(park_m) 函数,park_m是一个函数指针。mcall在golang需要进行协程切换时被调用,做的主要工作是:
切换当前线程的堆栈从g的堆栈切换到g0的堆栈;
并在g0的堆栈上执行新的函数fn(g);
保存当前协程的信息( PC/SP存储到g->sched),当后续对当前协程调用goready函数时候能够恢复现场;
mcall函数执行原理
mcall的函数原型是:
func mcall(fn func(*g))
1
这里函数fn的参数g指的是在调用mcall之前正在运行的协程。
我们前面说到,mcall的主要作用是协程切换,它将当前正在执行的协程状态保存起来,然后在m->g0的堆栈上调用新的函数。 在新的函数内会将之前运行的协程放弃,然后调用一次schedule()来挑选新的协程运行。(也就是在fn函数里面会调用一次schedule()函数进行一次scheduler的重新调度,让m去运行其余的goroutine)
mcall函数是通过汇编实现的,在asm_amd64.s里面有64位机的实现,源码如下:
// func mcall(fn func(*g))
// Switch to m->g0’s stack, call fn(g).
// Fn must never return. It should gogo(&g->sched)
// to keep running g.
TEXT runtime·mcall(SB), NOSPLIT, $0-8
//DI中存储参数fn
MOVQ fn+0(FP), DI
get_tls(CX)
// 获取当前正在运行的协程g信息
// 将其状态保存在g.sched变量
MOVQ g(CX), AX // save state in g->sched
MOVQ 0(SP), BX // caller's PC
MOVQ BX, (g_sched+gobuf_pc)(AX)
LEAQ fn+0(FP), BX // caller's SP
MOVQ BX, (g_sched+gobuf_sp)(AX)
MOVQ AX, (g_sched+gobuf_g)(AX)
MOVQ BP, (g_sched+gobuf_bp)(AX)
// switch to m->g0 & its stack, call fn
MOVQ g(CX), BX
MOVQ g_m(BX), BX
MOVQ m_g0(BX), SI
CMPQ SI, AX // if g == m->g0 call badmcall
JNE 3(PC)
MOVQ $runtime·badmcall(SB), AX
JMP AX
MOVQ SI, g(CX) // g = m->g0
// 切换到m->g0堆栈
MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.sp
// 参数AX为之前运行的协程g
PUSHQ AX
MOVQ DI, DX
MOVQ 0(DI), DI
// 在m->g0堆栈上执行函数fn
CALL DI
POPQ AX
MOVQ $runtime·badmcall2(SB), AX
JMP AX
RET
上面的汇编代码我也不是很懂,但是能够大致能够推断出主要做的事情:
保存当前goroutine的状态(PC/SP)到g->sched中,方便下次调度;
切换到m->g0的栈;
然后g0的堆栈上调用fn;
回到gopark函数里面,我们知道mcall会切换到m->g0的栈,然后执行park_m函数,下面看一下park_m函数源码:
func park_m(gp *g) {
// g0
g := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
//线程安全更新gp的状态,置为_Gwaiting
casgstatus(gp, _Grunning, _Gwaiting)
// 移除gp与m的绑定关系
dropg()
if _g_.m.waitunlockf != nil {
fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
// 重新做一次调度
schedule() }
park_m函数主要做的几件事情就是:
线程安全更新goroutine的状态,置为_Gwaiting 等待状态;
解除goroutine与OS thread的绑定关系;
调用schedule()函数,调度器会重新调度选择一个goroutine去运行;
schedule函数里面主要调用路径就是:
schedule()–>execute()–>gogo()
1
gogo函数的作用正好相反,用来从gobuf中恢复出协程执行状态并跳转到上一次指令处继续执行。因此,其代码也相对比较容易理解,当然,其实现也是通过汇编代码实现的。
goready函数
goready函数相比gopark函数来说简单一些,主要功能就是唤醒某一个goroutine,该协程转换到runnable的状态,并将其放入P的local queue,等待调度。
func goready(gp *g, traceskip int) {
// 切换到g0的栈
systemstack(func() {
ready(gp, traceskip, true)
})
}
该函数主要就是切换到g0的栈空间然后执行ready函数。
下面我们看看ready函数源码(删除非主流程代码):
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()//g0
_g_.m.locks++ // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
//设置gp状态为runnable,然后加入到P的可运行local queue;
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
_g_.m.locks--
if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in Case we've cleared it in newstack
_g_.stackguard0 = stackPreempt
} }
代码的核心流程最主要工作就是将gp(goroutine)的状态机切换到runnnable,然后加入到P的局部调度器的local queue,等待P进行调度。
所以这里有一点需要我们注意到的是,对一个协程调用goready函数,这个协程不是可以马上就执行的,而是要等待调度器的调度执行。
https://blog.csdn.net/u010853261/article/details/85887948